Skip to content

Commit

Permalink
Implement csv reader
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Sep 7, 2023
1 parent b71fe50 commit 6dff06f
Show file tree
Hide file tree
Showing 39 changed files with 1,138 additions and 1,244 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.4 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.5 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
10 changes: 8 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ static void validateCopyNpyNotForRelTables(TableSchema* schema) {
}
}

expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
expression_vector Binder::bindColumnExpressions(
TableSchema* tableSchema, CopyDescription::FileType fileType) {
expression_vector columnExpressions;
if (tableSchema->tableType == TableType::REL) {
// For rel table, add FROM and TO columns as data columns to be read from file.
Expand All @@ -70,10 +71,15 @@ expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
auto isCopyNodeCSV =
tableSchema->tableType == TableType::NODE && fileType == CopyDescription::FileType::CSV;
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL ||
TableSchema::isReservedPropertyName(property->getName())) {
continue;
} else if (isCopyNodeCSV) {
columnExpressions.push_back(
createVariable(property->getName(), *property->getDataType()));
} else {
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
Expand Down Expand Up @@ -116,7 +122,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema, actualFileType);
// Bind expressions.
auto columnExpressions = bindColumnExpressions(tableSchema);
auto columnExpressions = bindColumnExpressions(tableSchema, actualFileType);
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
Expand Down
14 changes: 9 additions & 5 deletions src/common/type_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ namespace common {
bool StringCastUtils::tryCastToBoolean(const char* data, uint64_t length, bool& result) {
auto booleanStr = std::string{data, length};
booleanStr = StringUtils::rtrim(StringUtils::ltrim(booleanStr));
std::istringstream iss{booleanStr};
iss >> std::boolalpha >> result;
if (iss.fail()) {
return false;
// Try cast boolAlpha format data(TRUE, FALSE) to boolean.
std::istringstream boolAlpha{booleanStr};
boolAlpha >> std::boolalpha >> result;
if (!boolAlpha.fail()) {
return true;
}
return true;
// Try cast numeric format data(1, 0) to boolean.
std::istringstream boolNonAlpha{booleanStr};
boolNonAlpha >> std::noboolalpha >> result;
return !boolNonAlpha.fail();
}

bool StringCastUtils::castToBool(const char* data, uint64_t length) {
Expand Down
37 changes: 36 additions & 1 deletion src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,42 @@ void ValueVector::copyFromValue(uint64_t pos, const Value& value) {
*listEntry = ListVector::addList(this, numValues);
auto dstDataVector = ListVector::getDataVector(this);
for (auto i = 0u; i < numValues; ++i) {
dstDataVector->copyFromValue(listEntry->offset + i, *NestedVal::getChildVal(&value, i));
auto childVal = NestedVal::getChildVal(&value, i);
dstDataVector->setNull(listEntry->offset + i, childVal->isNull());
if (!childVal->isNull()) {
dstDataVector->copyFromValue(
listEntry->offset + i, *NestedVal::getChildVal(&value, i));
}
}
} break;
case PhysicalTypeID::FIXED_LIST: {
auto numValues = NestedVal::getChildrenSize(&value);
auto childType = FixedListType::getChildType(value.getDataType());
auto numBytesPerChildValue = getDataTypeSize(*childType);
auto bufferToWrite = valueBuffer.get() + pos * numBytesPerValue;
for (auto i = 0u; i < numValues; i++) {
auto val = NestedVal::getChildVal(&value, i);
switch (childType->getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(bufferToWrite, &val->getValueReference<int64_t>(), numBytesPerChildValue);
} break;
case PhysicalTypeID::INT32: {
memcpy(bufferToWrite, &val->getValueReference<int32_t>(), numBytesPerChildValue);
} break;
case PhysicalTypeID::INT16: {
memcpy(bufferToWrite, &val->getValueReference<int16_t>(), numBytesPerChildValue);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(bufferToWrite, &val->getValueReference<double_t>(), numBytesPerChildValue);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(bufferToWrite, &val->getValueReference<float_t>(), numBytesPerChildValue);
} break;
default: {
throw NotImplementedException{"FixedListColumnChunk::write"};
}
}
bufferToWrite += numBytesPerChildValue;
}
} break;
case PhysicalTypeID::STRUCT: {
Expand Down
3 changes: 2 additions & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class Binder {
/*** bind copy from/to ***/
static bool bindContainsSerial(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindColumnExpressions(catalog::TableSchema* tableSchema);
expression_vector bindColumnExpressions(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

Expand Down
1 change: 0 additions & 1 deletion src/include/common/data_chunk/sel_vector.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#pragma once

#include <memory>

#include "common/constants.h"
Expand Down
29 changes: 15 additions & 14 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,73 +42,74 @@ class Exception : public std::exception {

class ParserException : public Exception {
public:
explicit ParserException(const std::string& msg) : Exception("Parser exception: " + msg){};
explicit ParserException(const std::string& msg) : Exception("Parser exception: " + msg) {}
};

class BinderException : public Exception {
public:
explicit BinderException(const std::string& msg) : Exception("Binder exception: " + msg){};
explicit BinderException(const std::string& msg) : Exception("Binder exception: " + msg) {}
};

class ConversionException : public Exception {
public:
explicit ConversionException(const std::string& msg) : Exception(msg){};
explicit ConversionException(const std::string& msg)
: Exception("Conversion exception: " + msg) {}
};

class CopyException : public Exception {
public:
explicit CopyException(const std::string& msg) : Exception("Copy exception: " + msg){};
explicit CopyException(const std::string& msg) : Exception("Copy exception: " + msg) {}
};

class CatalogException : public Exception {
public:
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg){};
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg) {}
};

class StorageException : public Exception {
public:
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg){};
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg) {}
};

class BufferManagerException : public Exception {
public:
explicit BufferManagerException(const std::string& msg)
: Exception("Buffer manager exception: " + msg){};
: Exception("Buffer manager exception: " + msg) {}
};

class InternalException : public Exception {
public:
explicit InternalException(const std::string& msg) : Exception(msg){};
explicit InternalException(const std::string& msg) : Exception(msg) {}
};

class NotImplementedException : public Exception {
public:
explicit NotImplementedException(const std::string& msg) : Exception(msg){};
explicit NotImplementedException(const std::string& msg) : Exception(msg) {}
};

class RuntimeException : public Exception {
public:
explicit RuntimeException(const std::string& msg) : Exception("Runtime exception: " + msg){};
explicit RuntimeException(const std::string& msg) : Exception("Runtime exception: " + msg) {}
};

class ConnectionException : public Exception {
public:
explicit ConnectionException(const std::string& msg) : Exception(msg){};
explicit ConnectionException(const std::string& msg) : Exception(msg) {}
};

class TransactionManagerException : public Exception {
public:
explicit TransactionManagerException(const std::string& msg) : Exception(msg){};
explicit TransactionManagerException(const std::string& msg) : Exception(msg) {}
};

class InterruptException : public Exception {
public:
explicit InterruptException() : Exception("Interrupted."){};
explicit InterruptException() : Exception("Interrupted.") {}
};

class TestException : public Exception {
public:
explicit TestException(const std::string& msg) : Exception("Test exception: " + msg){};
explicit TestException(const std::string& msg) : Exception("Test exception: " + msg) {}
};

} // namespace common
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace common {

class FileInfo;

using sel_t = uint32_t;
using sel_t = uint16_t;
using hash_t = uint64_t;
using page_idx_t = uint32_t;
using frame_idx_t = page_idx_t;
Expand Down
91 changes: 91 additions & 0 deletions src/include/processor/operator/persistent/csv_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#pragma once

#include <cstdint>
#include <string>

#include "common/copier_config/copier_config.h"
#include "common/data_chunk/data_chunk.h"
#include "common/types/types.h"

namespace kuzu {
namespace processor {

enum class ParserMode : uint8_t { PARSING = 0, PARSING_HEADER = 1 };

class BaseCSVReader {
public:
BaseCSVReader(const std::string& filePath, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

virtual ~BaseCSVReader() = default;

inline bool isNewLine(char c) { return c == '\n' || c == '\r'; }

common::CSVReaderConfig csvReaderConfig;
std::string filePath;
catalog::TableSchema* tableSchema;
uint64_t numPropertiesToCopy;

uint64_t linenr = 0;
uint64_t bytesInChunk = 0;
bool bomChecked = false;
bool rowEmpty = false;

ParserMode mode;

protected:
void AddValue(common::DataChunk& resultChunk, std::string strVal,
common::column_id_t& columnIdx, std::vector<uint64_t>& escapePositions);
// Adds a row to the insert_chunk, returns true if the chunk is filled as a result of this row
// being added.
bool AddRow(common::DataChunk& resultChunk, common::column_id_t& column);

private:
void copyStringToVector(common::ValueVector* vector, std::string& strVal);

protected:
uint64_t rowToAdd;
};

//! Buffered CSV reader is a class that reads values from a stream and parses them as a CSV file
class BufferedCSVReader : public BaseCSVReader {
//! Initial buffer read size; can be extended for long lines
static constexpr uint64_t INITIAL_BUFFER_SIZE = 16384;
//! Larger buffer size for non disk files
static constexpr uint64_t INITIAL_BUFFER_SIZE_LARGE = 10000000; // 10MB

public:
BufferedCSVReader(const std::string& filePath, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

std::unique_ptr<char[]> buffer;
uint64_t bufferSize;
uint64_t position;
uint64_t start = 0;
std::unique_ptr<common::FileInfo> fileInfo;

std::vector<std::unique_ptr<char[]>> cachedBuffers;

public:
//! Extract a single DataChunk from the CSV file and stores it in insert_chunk
uint64_t ParseCSV(common::DataChunk& resultChunk);

private:
//! Initialize Parser
void Initialize();
//! Skips skip_rows, reads header row from input stream
void ReadHeader();
//! Resets the buffer
void ResetBuffer();
//! Extract a single DataChunk from the CSV file and stores it in insert_chunk
uint64_t TryParseCSV(common::DataChunk& resultChunk, std::string& errorMessage);
//! Parses a CSV file with a one-byte delimiter, escape and quote character
uint64_t TryParseSimpleCSV(common::DataChunk& resultChunk, std::string& errorMessage);
//! Reads a new buffer from the CSV file if the current one has been exhausted
bool ReadBuffer(uint64_t& start, uint64_t& lineStart);
//! Skip Empty lines for tables with over one column
void SkipEmptyLines();
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 6dff06f

Please sign in to comment.