Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: implement parallel CSV reading #2070

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataset/csv-edge-case-tests/bom-and-data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
data,moredata
abc,bomdata
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/bom-and-header.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
col1,col2
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/bom.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 2 additions & 0 deletions dataset/csv-edge-case-tests/carriage-return-then-eof.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
abc,def
ghi,jkl
19 changes: 19 additions & 0 deletions dataset/csv-edge-case-tests/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
COPY `bom-and-data` FROM "dataset/csv-edge-case-tests/bom-and-data.csv";
COPY `bom-and-header` FROM "dataset/csv-edge-case-tests/bom-and-header.csv" (HEADER=TRUE);
COPY `bom` FROM "dataset/csv-edge-case-tests/bom.csv";
COPY `carriage-return-then-eof` FROM "dataset/csv-edge-case-tests/carriage-return-then-eof.csv";
COPY `delimiter-then-eof` FROM "dataset/csv-edge-case-tests/delimiter-then-eof.csv";
COPY `empty-first-line` FROM "dataset/csv-edge-case-tests/empty-first-line.csv";
COPY `empty-lines-multiple-columns` FROM "dataset/csv-edge-case-tests/empty-lines-multiple-columns.csv";
COPY `empty-lines-single-column` FROM "dataset/csv-edge-case-tests/empty-lines-single-column.csv";
COPY `empty` FROM "dataset/csv-edge-case-tests/empty.csv";
COPY `empty-with-header` FROM "dataset/csv-edge-case-tests/empty.csv" (HEADER=TRUE);
COPY `eof-after-unquote` FROM "dataset/csv-edge-case-tests/eof-after-unquote.csv";
COPY `escapes-in-quote` FROM "dataset/csv-edge-case-tests/escapes-in-quote.csv";
COPY `escapes-out-of-quote` FROM "dataset/csv-edge-case-tests/escapes-out-of-quote.csv";
COPY `mixed-empty-lines-multiple-columns` FROM "dataset/csv-edge-case-tests/mixed-empty-lines-multiple-columns.csv";
COPY `mixed-empty-lines-single-column` FROM "dataset/csv-edge-case-tests/mixed-empty-lines-single-column.csv";
COPY `mixed-newlines` FROM "dataset/csv-edge-case-tests/mixed-newlines.csv";
COPY `quoted-values` FROM "dataset/csv-edge-case-tests/quoted-values.csv";
COPY `tab-as-delim` FROM "dataset/csv-edge-case-tests/tab-as-delim.csv" (DELIM='\\t');
COPY `unquote-escape` FROM "dataset/csv-edge-case-tests/unquote-escape.csv" (ESCAPE='"');
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/delimiter-then-eof.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"abc",
5 changes: 5 additions & 0 deletions dataset/csv-edge-case-tests/empty-first-line.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@


abc,def

ghi,jkl
3 changes: 3 additions & 0 deletions dataset/csv-edge-case-tests/empty-lines-multiple-columns.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
abc,def

ghi,jkl
3 changes: 3 additions & 0 deletions dataset/csv-edge-case-tests/empty-lines-single-column.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
abc

def
Empty file.
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/eof-after-unquote.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"abc","def"
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/escapes-in-quote.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"abc\\\""
2 changes: 2 additions & 0 deletions dataset/csv-edge-case-tests/escapes-out-of-quote.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
a,b
\,\,
2 changes: 2 additions & 0 deletions dataset/csv-edge-case-tests/large-header.csv

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
abc,def

ghi,jkl
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
abc

def
ghi
Expand Down
2 changes: 2 additions & 0 deletions dataset/csv-edge-case-tests/mixed-newlines.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
abcdef
ghi
Expand Down
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/quoted-values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"abc","bcd"
19 changes: 19 additions & 0 deletions dataset/csv-edge-case-tests/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE NODE TABLE `bom-and-data`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `bom-and-header`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `bom`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `carriage-return-then-eof`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `delimiter-then-eof`(A STRING, PRIMARY KEY(A));
CREATE NODE TABLE `empty-first-line`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `empty-lines-multiple-columns`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `empty-lines-single-column`(A SERIAL, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `empty`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `empty-with-header`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `eof-after-unquote`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `escapes-in-quote`(A STRING, PRIMARY KEY(A));
CREATE NODE TABLE `escapes-out-of-quote`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `mixed-empty-lines-multiple-columns`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `mixed-empty-lines-single-column`(A SERIAL, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `mixed-newlines`(A STRING, PRIMARY KEY(A));
CREATE NODE TABLE `quoted-values`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `tab-as-delim`(A STRING, B STRING, PRIMARY KEY(A));
CREATE NODE TABLE `unquote-escape`(A STRING, PRIMARY KEY(A));
3 changes: 3 additions & 0 deletions dataset/csv-edge-case-tests/tab-as-delim.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
abc def

ghi jkl
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/unquote-escape.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"abc""def"
2 changes: 2 additions & 0 deletions dataset/csv-error-tests/escape-then-bad-char.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
valid-header
"abc\6"
2 changes: 2 additions & 0 deletions dataset/csv-error-tests/escape-then-eof.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
valid-header
"abc\
6 changes: 6 additions & 0 deletions dataset/csv-error-tests/mixed-line-count-test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
a

12
3

4,4,5,6,7,8
Expand Down
2 changes: 2 additions & 0 deletions dataset/csv-error-tests/quote-then-eof.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
valid-header
"abc
2 changes: 2 additions & 0 deletions dataset/csv-error-tests/too-few-values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
a,b
a
2 changes: 2 additions & 0 deletions dataset/csv-error-tests/too-many-values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
a
a,b,c
2 changes: 2 additions & 0 deletions dataset/csv-multiline-quote-tests/basic.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"abc
def"
2 changes: 2 additions & 0 deletions dataset/csv-multiline-quote-tests/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
COPY basic FROM "dataset/csv-multiline-quote-tests/basic.csv" (PARALLEL=FALSE);
COPY `mixed-newlines` FROM "dataset/csv-multiline-quote-tests/mixed-newlines.csv" (PARALLEL=FALSE);
2 changes: 2 additions & 0 deletions dataset/csv-multiline-quote-tests/mixed-newlines.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"abcdef","ghi
jkl"
Expand Down
2 changes: 2 additions & 0 deletions dataset/csv-multiline-quote-tests/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE NODE TABLE basic(A STRING, PRIMARY KEY(A));
CREATE NODE TABLE `mixed-newlines`(A STRING, B STRING, PRIMARY KEY(A));
56 changes: 34 additions & 22 deletions src/binder/bind/bind_file_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ FileType Binder::bindFileType(const std::string& filePath) {
std::filesystem::path fileName(filePath);
auto extension = FileUtils::getFileExtension(fileName);
auto fileType = FileTypeUtils::getFileTypeFromExtension(extension);
if (fileType == FileType::UNKNOWN) {
throw CopyException("Unsupported file type: " + filePath);
}
return fileType;
}

Expand Down Expand Up @@ -61,14 +58,23 @@ static char bindParsingOptionValue(std::string value) {
if (value == "\\t") {
return '\t';
}
if (value.length() > 2 || (value.length() == 2 && value[0] != '\\')) {
throw BinderException("Copy csv option value can only be a single character with an "
if (value.length() < 1 || value.length() > 2 || (value.length() == 2 && value[0] != '\\')) {
throw BinderException("Copy csv option value must be a single character with an "
"optional escape character.");
}
return value[value.length() - 1];
}

static void bindStringParsingOptions(
static void bindBoolParsingOption(
CSVReaderConfig& csvReaderConfig, const std::string& optionName, bool optionValue) {
if (optionName == "HEADER") {
csvReaderConfig.hasHeader = optionValue;
} else if (optionName == "PARALLEL") {
csvReaderConfig.parallel = optionValue;
}
}

static void bindStringParsingOption(
CSVReaderConfig& csvReaderConfig, const std::string& optionName, std::string& optionValue) {
auto parsingOptionValue = bindParsingOptionValue(optionValue);
if (optionName == "ESCAPE") {
Expand All @@ -84,13 +90,17 @@ static void bindStringParsingOptions(
}
}

static bool validateStringParsingOptionName(std::string& parsingOptionName) {
for (auto i = 0; i < std::size(CopyConstants::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyConstants::STRING_CSV_PARSING_OPTIONS[i]) {
return true;
}
}
return false;
template<uint64_t size>
static bool hasOption(const char* const (&arr)[size], const std::string& option) {
return std::find(std::begin(arr), std::end(arr), option) != std::end(arr);
}

static bool validateBoolParsingOptionName(const std::string& parsingOptionName) {
return hasOption(CopyConstants::BOOL_CSV_PARSING_OPTIONS, parsingOptionName);
}

static bool validateStringParsingOptionName(const std::string& parsingOptionName) {
return hasOption(CopyConstants::STRING_CSV_PARSING_OPTIONS, parsingOptionName);
}

std::unique_ptr<CSVReaderConfig> Binder::bindParsingOptions(
Expand All @@ -99,29 +109,31 @@ std::unique_ptr<CSVReaderConfig> Binder::bindParsingOptions(
for (auto& parsingOption : parsingOptions) {
auto copyOptionName = parsingOption.first;
StringUtils::toUpper(copyOptionName);

bool isValidStringParsingOption = validateStringParsingOptionName(copyOptionName);
bool isValidBoolParsingOption = validateBoolParsingOptionName(copyOptionName);

auto copyOptionExpression = parsingOption.second.get();
auto boundCopyOptionExpression = expressionBinder.bindExpression(*copyOptionExpression);
assert(boundCopyOptionExpression->expressionType == LITERAL);
if (copyOptionName == "HEADER") {
if (isValidBoolParsingOption) {
if (boundCopyOptionExpression->dataType.getLogicalTypeID() != LogicalTypeID::BOOL) {
throw BinderException(
"The value type of parsing csv option " + copyOptionName + " must be boolean.");
"The type of csv parsing option " + copyOptionName + " must be a boolean.");
}
csvReaderConfig->hasHeader =
auto copyOptionValue =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<bool>();
} else if (boundCopyOptionExpression->dataType.getLogicalTypeID() ==
LogicalTypeID::STRING &&
isValidStringParsingOption) {
bindBoolParsingOption(*csvReaderConfig, copyOptionName, copyOptionValue);
} else if (isValidStringParsingOption) {
if (boundCopyOptionExpression->dataType.getLogicalTypeID() != LogicalTypeID::STRING) {
throw BinderException(
"The value type of parsing csv option " + copyOptionName + " must be string.");
"The type of csv parsing option " + copyOptionName + " must be a string.");
}
auto copyOptionValue =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<std::string>();
bindStringParsingOptions(*csvReaderConfig, copyOptionName, copyOptionValue);
bindStringParsingOption(*csvReaderConfig, copyOptionName, copyOptionValue);
} else {
throw BinderException("Unrecognized parsing csv option: " + copyOptionName + ".");
throw BinderException("Unrecognized csv parsing option: " + copyOptionName + ".");
}
}
return csvReaderConfig;
Expand Down
7 changes: 3 additions & 4 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "parser/query/reading_clause/in_query_call_clause.h"
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"
#include "processor/operator/persistent/reader/csv/serial_csv_reader.h"
#include "processor/operator/persistent/reader/npy_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"

Expand Down Expand Up @@ -139,9 +139,8 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
expression_vector columns;
switch (fileType) {
case FileType::CSV: {
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto csvReader = SerialCSVReader(readerConfig->filePaths[0], *readerConfig);
csvReader.sniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
Expand Down
12 changes: 6 additions & 6 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#include "common/copier_config/copier_config.h"

#include "common/exception/copy.h"
#include "utf8proc_wrapper.h"

using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {

const static std::unordered_map<std::string, FileType> fileTypeMap{{".csv", FileType::CSV},
{".parquet", FileType::PARQUET}, {".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

FileType FileTypeUtils::getFileTypeFromExtension(const std::string& extension) {
FileType fileType = fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
auto entry = fileTypeMap.find(extension);
if (entry == fileTypeMap.end()) {
throw CopyException("Unsupported file type " + extension);
}
return fileType;
return entry->second;
}

std::string FileTypeUtils::toString(FileType fileType) {
Expand Down
14 changes: 12 additions & 2 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,25 @@ struct CopyConstants {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Initial size of buffer for CSV Reader.
static constexpr uint64_t INITIAL_BUFFER_SIZE = 16384;
// This means that we will usually read the entirety of the contents of the file we need for a
// block in one read request. It is also very small, which means we can parallelize small files
// efficiently.
static const uint64_t PARALLEL_BLOCK_SIZE = INITIAL_BUFFER_SIZE / 2;

static constexpr const char* BOOL_CSV_PARSING_OPTIONS[] = {"HEADER", "PARALLEL"};
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
static constexpr bool DEFAULT_CSV_PARALLEL = true;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
static constexpr const char* STRING_CSV_PARSING_OPTIONS[] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
static constexpr char DEFAULT_CSV_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_CSV_DELIMITER = ',';
static constexpr char DEFAULT_CSV_QUOTE_CHAR = '"';
static constexpr char DEFAULT_CSV_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_CSV_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
static constexpr char DEFAULT_CSV_LINE_BREAK = '\n';
};

Expand Down
20 changes: 12 additions & 8 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@ struct CSVReaderConfig {
char listBeginChar;
char listEndChar;
bool hasHeader;
bool parallel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this PR. I wonder if we should merge CSVReaderConfig with ReaderConfig at some point. Each reader can access a subset of fields from ReaderConfig class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some attributes can be shared. Parallel definitely should be. Others though... don't make sense, right?


CSVReaderConfig()
: escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR},
delimiter{CopyConstants::DEFAULT_CSV_DELIMITER},
quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR},
listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR},
listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR},
hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {}
hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER},
parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {}

CSVReaderConfig(const CSVReaderConfig& other)
: escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar},
listBeginChar{other.listBeginChar},
listEndChar{other.listEndChar}, hasHeader{other.hasHeader} {}
listEndChar{other.listEndChar}, hasHeader{other.hasHeader}, parallel{other.parallel} {}

inline std::unique_ptr<CSVReaderConfig> copy() const {
return std::make_unique<CSVReaderConfig>(*this);
Expand All @@ -40,10 +43,6 @@ struct CSVReaderConfig {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };

struct FileTypeUtils {
inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);
static std::string toString(FileType fileType);
};
Expand Down Expand Up @@ -73,8 +72,13 @@ struct ReaderConfig {
}
}

inline bool parallelRead() const {
return fileType != FileType::CSV && fileType != FileType::TURTLE;
inline bool csvParallelRead(TableType tableType) const {
Riolku marked this conversation as resolved.
Show resolved Hide resolved
return tableType != TableType::REL && csvReaderConfig->parallel;
}

inline bool parallelRead(TableType tableType) const {
return (fileType != FileType::CSV || csvParallelRead(tableType)) &&
fileType != FileType::TURTLE;
}
inline uint32_t getNumFiles() const { return filePaths.size(); }
inline uint32_t getNumColumns() const { return columnNames.size(); }
Expand Down
Loading
Loading