Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement csv reader #1988

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.4 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.5 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
10 changes: 8 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ static void validateCopyNpyNotForRelTables(TableSchema* schema) {
}
}

expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
expression_vector Binder::bindColumnExpressions(
TableSchema* tableSchema, CopyDescription::FileType fileType) {
expression_vector columnExpressions;
if (tableSchema->tableType == TableType::REL) {
// For rel table, add FROM and TO columns as data columns to be read from file.
Expand All @@ -70,10 +71,15 @@ expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
auto isCopyNodeCSV =
tableSchema->tableType == TableType::NODE && fileType == CopyDescription::FileType::CSV;
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL ||
TableSchema::isReservedPropertyName(property->getName())) {
continue;
} else if (isCopyNodeCSV) {
columnExpressions.push_back(
createVariable(property->getName(), *property->getDataType()));
} else {
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
Expand Down Expand Up @@ -116,7 +122,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema, actualFileType);
// Bind expressions.
auto columnExpressions = bindColumnExpressions(tableSchema);
auto columnExpressions = bindColumnExpressions(tableSchema, actualFileType);
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
Expand Down
14 changes: 9 additions & 5 deletions src/common/type_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ namespace common {
bool StringCastUtils::tryCastToBoolean(const char* data, uint64_t length, bool& result) {
auto booleanStr = std::string{data, length};
booleanStr = StringUtils::rtrim(StringUtils::ltrim(booleanStr));
std::istringstream iss{booleanStr};
iss >> std::boolalpha >> result;
if (iss.fail()) {
return false;
// Try cast boolAlpha format data(TRUE, FALSE) to boolean.
std::istringstream boolAlpha{booleanStr};
boolAlpha >> std::boolalpha >> result;
if (!boolAlpha.fail()) {
return true;
}
return true;
// Try cast numeric format data(1, 0) to boolean.
std::istringstream boolNonAlpha{booleanStr};
boolNonAlpha >> std::noboolalpha >> result;
return !boolNonAlpha.fail();
}

bool StringCastUtils::castToBool(const char* data, uint64_t length) {
Expand Down
37 changes: 36 additions & 1 deletion src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,42 @@
*listEntry = ListVector::addList(this, numValues);
auto dstDataVector = ListVector::getDataVector(this);
for (auto i = 0u; i < numValues; ++i) {
dstDataVector->copyFromValue(listEntry->offset + i, *NestedVal::getChildVal(&value, i));
auto childVal = NestedVal::getChildVal(&value, i);
dstDataVector->setNull(listEntry->offset + i, childVal->isNull());
if (!childVal->isNull()) {
dstDataVector->copyFromValue(
listEntry->offset + i, *NestedVal::getChildVal(&value, i));
}
}
} break;
case PhysicalTypeID::FIXED_LIST: {
auto numValues = NestedVal::getChildrenSize(&value);
auto childType = FixedListType::getChildType(value.getDataType());
auto numBytesPerChildValue = getDataTypeSize(*childType);
auto bufferToWrite = valueBuffer.get() + pos * numBytesPerValue;
for (auto i = 0u; i < numValues; i++) {
auto val = NestedVal::getChildVal(&value, i);
switch (childType->getPhysicalType()) {
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
case PhysicalTypeID::INT64: {
memcpy(bufferToWrite, &val->getValueReference<int64_t>(), numBytesPerChildValue);
} break;
case PhysicalTypeID::INT32: {
memcpy(bufferToWrite, &val->getValueReference<int32_t>(), numBytesPerChildValue);

Check warning on line 185 in src/common/vector/value_vector.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/vector/value_vector.cpp#L184-L185

Added lines #L184 - L185 were not covered by tests
} break;
case PhysicalTypeID::INT16: {
memcpy(bufferToWrite, &val->getValueReference<int16_t>(), numBytesPerChildValue);

Check warning on line 188 in src/common/vector/value_vector.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/vector/value_vector.cpp#L187-L188

Added lines #L187 - L188 were not covered by tests
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(bufferToWrite, &val->getValueReference<double_t>(), numBytesPerChildValue);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(bufferToWrite, &val->getValueReference<float_t>(), numBytesPerChildValue);

Check warning on line 194 in src/common/vector/value_vector.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/vector/value_vector.cpp#L193-L194

Added lines #L193 - L194 were not covered by tests
} break;
default: {
throw NotImplementedException{"FixedListColumnChunk::write"};

Check warning on line 197 in src/common/vector/value_vector.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/vector/value_vector.cpp#L196-L197

Added lines #L196 - L197 were not covered by tests
}
}
bufferToWrite += numBytesPerChildValue;
}
} break;
case PhysicalTypeID::STRUCT: {
Expand Down
3 changes: 2 additions & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class Binder {
/*** bind copy from/to ***/
static bool bindContainsSerial(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindColumnExpressions(catalog::TableSchema* tableSchema);
expression_vector bindColumnExpressions(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

Expand Down
1 change: 0 additions & 1 deletion src/include/common/data_chunk/sel_vector.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#pragma once

#include <memory>

#include "common/constants.h"
Expand Down
29 changes: 15 additions & 14 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,73 +42,74 @@

class ParserException : public Exception {
public:
explicit ParserException(const std::string& msg) : Exception("Parser exception: " + msg){};
explicit ParserException(const std::string& msg) : Exception("Parser exception: " + msg) {}
};

class BinderException : public Exception {
public:
explicit BinderException(const std::string& msg) : Exception("Binder exception: " + msg){};
explicit BinderException(const std::string& msg) : Exception("Binder exception: " + msg) {}
};

class ConversionException : public Exception {
public:
explicit ConversionException(const std::string& msg) : Exception(msg){};
explicit ConversionException(const std::string& msg)
: Exception("Conversion exception: " + msg) {}
};

class CopyException : public Exception {
public:
explicit CopyException(const std::string& msg) : Exception("Copy exception: " + msg){};
explicit CopyException(const std::string& msg) : Exception("Copy exception: " + msg) {}
};

class CatalogException : public Exception {
public:
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg){};
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg) {}
};

class StorageException : public Exception {
public:
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg){};
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg) {}
};

class BufferManagerException : public Exception {
public:
explicit BufferManagerException(const std::string& msg)
: Exception("Buffer manager exception: " + msg){};
: Exception("Buffer manager exception: " + msg) {}

Check warning on line 77 in src/include/common/exception.h

View check run for this annotation

Codecov / codecov/patch

src/include/common/exception.h#L77

Added line #L77 was not covered by tests
};

class InternalException : public Exception {
public:
explicit InternalException(const std::string& msg) : Exception(msg){};
explicit InternalException(const std::string& msg) : Exception(msg) {}

Check warning on line 82 in src/include/common/exception.h

View check run for this annotation

Codecov / codecov/patch

src/include/common/exception.h#L82

Added line #L82 was not covered by tests
};

class NotImplementedException : public Exception {
public:
explicit NotImplementedException(const std::string& msg) : Exception(msg){};
explicit NotImplementedException(const std::string& msg) : Exception(msg) {}
};

class RuntimeException : public Exception {
public:
explicit RuntimeException(const std::string& msg) : Exception("Runtime exception: " + msg){};
explicit RuntimeException(const std::string& msg) : Exception("Runtime exception: " + msg) {}
};

class ConnectionException : public Exception {
public:
explicit ConnectionException(const std::string& msg) : Exception(msg){};
explicit ConnectionException(const std::string& msg) : Exception(msg) {}
};

class TransactionManagerException : public Exception {
public:
explicit TransactionManagerException(const std::string& msg) : Exception(msg){};
explicit TransactionManagerException(const std::string& msg) : Exception(msg) {}
};

class InterruptException : public Exception {
public:
explicit InterruptException() : Exception("Interrupted."){};
explicit InterruptException() : Exception("Interrupted.") {}
};

class TestException : public Exception {
public:
explicit TestException(const std::string& msg) : Exception("Test exception: " + msg){};
explicit TestException(const std::string& msg) : Exception("Test exception: " + msg) {}

Check warning on line 112 in src/include/common/exception.h

View check run for this annotation

Codecov / codecov/patch

src/include/common/exception.h#L112

Added line #L112 was not covered by tests
};

} // namespace common
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace common {

class FileInfo;

using sel_t = uint32_t;
using sel_t = uint16_t;
using hash_t = uint64_t;
using page_idx_t = uint32_t;
using frame_idx_t = page_idx_t;
Expand Down
93 changes: 93 additions & 0 deletions src/include/processor/operator/persistent/csv_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#pragma once

#include <cstdint>
#include <string>

#include "common/copier_config/copier_config.h"
#include "common/data_chunk/data_chunk.h"
#include "common/types/types.h"

namespace kuzu {
namespace processor {

enum class ParserMode : uint8_t { PARSING = 0, PARSING_HEADER = 1 };

class BaseCSVReader {
public:
BaseCSVReader(const std::string& filePath, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

virtual ~BaseCSVReader() = default;

inline bool isNewLine(char c) { return c == '\n' || c == '\r'; }

common::CSVReaderConfig csvReaderConfig;
std::string filePath;
catalog::TableSchema* tableSchema;
uint64_t numPropertiesToCopy;

uint64_t linenr = 0;
uint64_t bytesInChunk = 0;
bool bomChecked = false;
bool rowEmpty = false;

ParserMode mode;

protected:
void AddValue(common::DataChunk& resultChunk, std::string strVal,
common::column_id_t& columnIdx, std::vector<uint64_t>& escapePositions);
// Adds a row to the insert_chunk, returns true if the chunk is filled as a result of this row
// being added.
bool AddRow(common::DataChunk& resultChunk, common::column_id_t& column);

private:
void copyStringToVector(common::ValueVector* vector, std::string& strVal);

protected:
uint64_t rowToAdd;
};

//! Buffered CSV reader is a class that reads values from a stream and parses them as a CSV file
class BufferedCSVReader : public BaseCSVReader {
//! Initial buffer read size; can be extended for long lines
static constexpr uint64_t INITIAL_BUFFER_SIZE = 16384;
//! Larger buffer size for non disk files
static constexpr uint64_t INITIAL_BUFFER_SIZE_LARGE = 10000000; // 10MB

public:
BufferedCSVReader(const std::string& filePath, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

~BufferedCSVReader() override;

std::unique_ptr<char[]> buffer;
uint64_t bufferSize;
uint64_t position;
uint64_t start = 0;
int fd;

std::vector<std::unique_ptr<char[]>> cachedBuffers;

public:
//! Extract a single DataChunk from the CSV file and stores it in insert_chunk
uint64_t ParseCSV(common::DataChunk& resultChunk);

private:
//! Initialize Parser
void Initialize();
//! Skips skip_rows, reads header row from input stream
void ReadHeader();
//! Resets the buffer
void ResetBuffer();
//! Extract a single DataChunk from the CSV file and stores it in insert_chunk
uint64_t TryParseCSV(common::DataChunk& resultChunk, std::string& errorMessage);
//! Parses a CSV file with a one-byte delimiter, escape and quote character
uint64_t TryParseSimpleCSV(common::DataChunk& resultChunk, std::string& errorMessage);
//! Reads a new buffer from the CSV file if the current one has been exhausted
bool ReadBuffer(uint64_t& start, uint64_t& lineStart);
//! Skip Empty lines for tables with over one column
void SkipEmptyLines();
};

} // namespace processor
} // namespace kuzu
Loading
Loading