Skip to content

Commit

Permalink
reader: implement parallel CSV reading
Browse files Browse the repository at this point in the history
This also refactors the CSVReader class to enable this change.
  • Loading branch information
Riolku committed Sep 27, 2023
1 parent a21e52d commit 695fa48
Show file tree
Hide file tree
Showing 23 changed files with 733 additions and 453 deletions.
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/bom-and-header.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
col1,col2
1 change: 1 addition & 0 deletions dataset/csv-edge-case-tests/bom.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Empty file.
52 changes: 32 additions & 20 deletions src/binder/bind/bind_file_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ FileType Binder::bindFileType(const std::string& filePath) {
std::filesystem::path fileName(filePath);
auto extension = FileUtils::getFileExtension(fileName);
auto fileType = FileTypeUtils::getFileTypeFromExtension(extension);
if (fileType == FileType::UNKNOWN) {
throw CopyException("Unsupported file type: " + filePath);
}
return fileType;
}

Expand Down Expand Up @@ -68,7 +65,16 @@ static char bindParsingOptionValue(std::string value) {
return value[value.length() - 1];
}

static void bindStringParsingOptions(
static void bindBoolParsingOption(
CSVReaderConfig& csvReaderConfig, const std::string& optionName, bool optionValue) {
if (optionName == "HEADER") {
csvReaderConfig.hasHeader = optionValue;
} else if (optionName == "PARALLEL") {
csvReaderConfig.parallel = optionValue;

Check warning on line 73 in src/binder/bind/bind_file_scan.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_file_scan.cpp#L72-L73

Added lines #L72 - L73 were not covered by tests
}
}

static void bindStringParsingOption(
CSVReaderConfig& csvReaderConfig, const std::string& optionName, std::string& optionValue) {
auto parsingOptionValue = bindParsingOptionValue(optionValue);
if (optionName == "ESCAPE") {
Expand All @@ -84,13 +90,17 @@ static void bindStringParsingOptions(
}
}

static bool validateStringParsingOptionName(std::string& parsingOptionName) {
for (auto i = 0; i < std::size(CopyConstants::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyConstants::STRING_CSV_PARSING_OPTIONS[i]) {
return true;
}
}
return false;
template<uint64_t size>
static bool hasOption(const char* const (&arr)[size], const std::string& option) {
return std::find(std::begin(arr), std::end(arr), option) != std::end(arr);
}

static bool validateBoolParsingOptionName(const std::string& parsingOptionName) {
return hasOption(CopyConstants::BOOL_CSV_PARSING_OPTIONS, parsingOptionName);
}

static bool validateStringParsingOptionName(const std::string& parsingOptionName) {
return hasOption(CopyConstants::STRING_CSV_PARSING_OPTIONS, parsingOptionName);
}

std::unique_ptr<CSVReaderConfig> Binder::bindParsingOptions(
Expand All @@ -99,29 +109,31 @@ std::unique_ptr<CSVReaderConfig> Binder::bindParsingOptions(
for (auto& parsingOption : parsingOptions) {
auto copyOptionName = parsingOption.first;
StringUtils::toUpper(copyOptionName);

bool isValidStringParsingOption = validateStringParsingOptionName(copyOptionName);
bool isValidBoolParsingOption = validateBoolParsingOptionName(copyOptionName);

auto copyOptionExpression = parsingOption.second.get();
auto boundCopyOptionExpression = expressionBinder.bindExpression(*copyOptionExpression);
assert(boundCopyOptionExpression->expressionType == LITERAL);
if (copyOptionName == "HEADER") {
if (isValidBoolParsingOption) {
if (boundCopyOptionExpression->dataType.getLogicalTypeID() != LogicalTypeID::BOOL) {
throw BinderException(
"The value type of parsing csv option " + copyOptionName + " must be boolean.");
"The type of csv parsing option " + copyOptionName + " must be a boolean.");

Check warning on line 122 in src/binder/bind/bind_file_scan.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_file_scan.cpp#L122

Added line #L122 was not covered by tests
}
csvReaderConfig->hasHeader =
auto copyOptionValue =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<bool>();
} else if (boundCopyOptionExpression->dataType.getLogicalTypeID() ==
LogicalTypeID::STRING &&
isValidStringParsingOption) {
bindBoolParsingOption(*csvReaderConfig, copyOptionName, copyOptionValue);
} else if (isValidStringParsingOption) {
if (boundCopyOptionExpression->dataType.getLogicalTypeID() != LogicalTypeID::STRING) {
throw BinderException(
"The value type of parsing csv option " + copyOptionName + " must be string.");
"The type of csv parsing option " + copyOptionName + " must be a string.");

Check warning on line 130 in src/binder/bind/bind_file_scan.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_file_scan.cpp#L130

Added line #L130 was not covered by tests
}
auto copyOptionValue =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<std::string>();
bindStringParsingOptions(*csvReaderConfig, copyOptionName, copyOptionValue);
bindStringParsingOption(*csvReaderConfig, copyOptionName, copyOptionValue);
} else {
throw BinderException("Unrecognized parsing csv option: " + copyOptionName + ".");
throw BinderException("Unrecognized csv parsing option: " + copyOptionName + ".");
}
}
return csvReaderConfig;
Expand Down
5 changes: 2 additions & 3 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "parser/query/reading_clause/in_query_call_clause.h"
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"
#include "processor/operator/persistent/reader/csv/serial_csv_reader.h"
#include "processor/operator/persistent/reader/npy_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"

Expand Down Expand Up @@ -139,8 +139,7 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
expression_vector columns;
switch (fileType) {
case FileType::CSV: {
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
auto csvReader = SerialCSVReader(readerConfig->filePaths[0], *readerConfig);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
auto stringType = LogicalType(LogicalTypeID::STRING);
Expand Down
12 changes: 6 additions & 6 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#include "common/copier_config/copier_config.h"

#include "common/exception/copy.h"
#include "utf8proc_wrapper.h"

using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {

const static std::unordered_map<std::string, FileType> fileTypeMap{{".csv", FileType::CSV},
{".parquet", FileType::PARQUET}, {".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

FileType FileTypeUtils::getFileTypeFromExtension(const std::string& extension) {
FileType fileType = fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
auto entry = fileTypeMap.find(extension);
if (entry == fileTypeMap.end()) {
throw CopyException("Unsupported file type " + extension);
}
return fileType;
return entry->second;
}

std::string FileTypeUtils::toString(FileType fileType) {
Expand Down
7 changes: 5 additions & 2 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,18 @@ struct CopyConstants {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

static constexpr const char* BOOL_CSV_PARSING_OPTIONS[] = {"HEADER", "PARALLEL"};
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
static constexpr bool DEFAULT_CSV_PARALLEL = true;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
static constexpr const char* STRING_CSV_PARSING_OPTIONS[] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
static constexpr char DEFAULT_CSV_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_CSV_DELIMITER = ',';
static constexpr char DEFAULT_CSV_QUOTE_CHAR = '"';
static constexpr char DEFAULT_CSV_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_CSV_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
static constexpr char DEFAULT_CSV_LINE_BREAK = '\n';
};

Expand Down
20 changes: 12 additions & 8 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@ struct CSVReaderConfig {
char listBeginChar;
char listEndChar;
bool hasHeader;
bool parallel;

CSVReaderConfig()
: escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR},
delimiter{CopyConstants::DEFAULT_CSV_DELIMITER},
quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR},
listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR},
listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR},
hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {}
hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER},
parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {}

CSVReaderConfig(const CSVReaderConfig& other)
: escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar},
listBeginChar{other.listBeginChar},
listEndChar{other.listEndChar}, hasHeader{other.hasHeader} {}
listEndChar{other.listEndChar}, hasHeader{other.hasHeader}, parallel{other.parallel} {}

inline std::unique_ptr<CSVReaderConfig> copy() const {
return std::make_unique<CSVReaderConfig>(*this);
Expand All @@ -40,10 +43,6 @@ struct CSVReaderConfig {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };

struct FileTypeUtils {
inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);
static std::string toString(FileType fileType);
};
Expand Down Expand Up @@ -73,8 +72,13 @@ struct ReaderConfig {
}
}

inline bool parallelRead() const {
return fileType != FileType::CSV && fileType != FileType::TURTLE;
inline bool csvParallelRead(TableType tableType) const {
return tableType != TableType::REL && csvReaderConfig->parallel;
}

inline bool parallelRead(TableType tableType) const {
return (fileType != FileType::CSV || csvParallelRead(tableType)) &&
fileType != FileType::TURTLE;
}
inline uint32_t getNumFiles() const { return filePaths.size(); }
inline uint32_t getNumColumns() const { return columnNames.size(); }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include <cstdint>
#include <string>

#include "common/copier_config/copier_config.h"
#include "common/data_chunk/data_chunk.h"
#include "common/types/types.h"

namespace kuzu {
namespace processor {

enum class ParserMode : uint8_t {
PARSING = 0,
PARSING_HEADER = 1,
SNIFFING_DIALECT = 2,
INVALID = 255
};

class BaseCSVReader {
protected:
//! Initial buffer read size; can be extended for long values.
static constexpr uint64_t INITIAL_BUFFER_SIZE = 16384;

public:
BaseCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig);

virtual ~BaseCSVReader();

uint64_t ParseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk);

uint64_t CountRows();

protected:
void AddValue(common::DataChunk& resultChunk, std::string strVal, common::column_id_t columnIdx,
std::vector<uint64_t>& escapePositions);
void AddRow(common::DataChunk&, common::column_id_t column);

//! If this finds a BOM, it advances `position`.
void ReadBOM();
void ReadHeader();
//! Reads a new buffer from the CSV file.
//! Uses the start value to ensure the current value stays within the buffer.
//! Modifies the start value to point to the new start of the current value.
//! If start is NULL, none of the buffer is kept.
//! Returns false if the file has been exhausted.
bool ReadBuffer(uint64_t* start);

//! Like ReadBuffer, but only reads if position >= bufferSize.
//! If this returns true, buffer[position] is a valid character that we can read.
inline bool MaybeReadBuffer(uint64_t* start) {
return position < bufferSize || ReadBuffer(start);
}

uint64_t ParseCSV(common::DataChunk& resultChunk);

inline bool isNewLine(char c) { return c == '\n' || c == '\r'; }

// Get the file offset of the current buffer position.
uint64_t getFileOffset() const;
uint64_t getLineNumber();

protected:
//! Called when starting the parsing of a new block.
virtual void parseBlockHook() = 0;
virtual bool finishedBlockDetail() const = 0;
virtual void handleQuotedNewline() = 0;

private:
void copyStringToVector(common::ValueVector*, std::string);
//! Called after a row is finished to determine if we should keep processing.
inline bool finishedBlock() {
return mode != ParserMode::PARSING || rowToAdd >= common::DEFAULT_VECTOR_CAPACITY ||
finishedBlockDetail();
}

protected:
std::string filePath;
common::CSVReaderConfig& csvReaderConfig;

uint64_t expectedNumColumns;
uint64_t numColumnsDetected;
int fd;

common::block_idx_t currentBlockIdx;

std::unique_ptr<char[]> buffer;
uint64_t bufferSize;
uint64_t position;

bool rowEmpty = false;

ParserMode mode;

uint64_t rowToAdd;
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 695fa48

Please sign in to comment.