From 695fa482f3fa4ecdbefd25a12c846d93f67e4bfd Mon Sep 17 00:00:00 2001 From: Keenan Gugeler Date: Fri, 15 Sep 2023 16:22:49 -0400 Subject: [PATCH] reader: implement parallel CSV reading This also refactors the CSVReader class to enable this change. --- .../csv-edge-case-tests/bom-and-header.csv | 1 + dataset/csv-edge-case-tests/bom.csv | 1 + dataset/csv-edge-case-tests/empty.csv | 0 src/binder/bind/bind_file_scan.cpp | 52 +- src/binder/bind/bind_reading_clause.cpp | 5 +- src/common/copier_config/copier_config.cpp | 12 +- src/include/common/constants.h | 7 +- .../common/copier_config/copier_config.h | 20 +- .../persistent/reader/csv/base_csv_reader.h | 99 ++++ .../persistent/reader/csv/csv_reader.h | 99 ---- .../reader/csv/parallel_csv_reader.h | 25 + .../persistent/reader/csv/serial_csv_reader.h | 24 + .../operator/persistent/reader_functions.h | 42 +- .../operator/persistent/reader_state.h | 1 - src/processor/operator/persistent/reader.cpp | 22 +- .../persistent/reader/csv/CMakeLists.txt | 4 +- .../{csv_reader.cpp => base_csv_reader.cpp} | 495 ++++++++++-------- .../reader/csv/parallel_csv_reader.cpp | 87 +++ .../reader/csv/serial_csv_reader.cpp | 25 + .../operator/persistent/reader_functions.cpp | 154 +++--- .../operator/persistent/reader_state.cpp | 7 +- .../exceptions/binder/binder_error.test | 2 +- third_party/miniparquet/CMakeLists.txt | 2 + 23 files changed, 733 insertions(+), 453 deletions(-) create mode 100644 dataset/csv-edge-case-tests/bom-and-header.csv create mode 100644 dataset/csv-edge-case-tests/bom.csv create mode 100644 dataset/csv-edge-case-tests/empty.csv create mode 100644 src/include/processor/operator/persistent/reader/csv/base_csv_reader.h delete mode 100644 src/include/processor/operator/persistent/reader/csv/csv_reader.h create mode 100644 src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h create mode 100644 src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h rename src/processor/operator/persistent/reader/csv/{csv_reader.cpp => base_csv_reader.cpp} (59%) create mode 100644 src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp create mode 100644 src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp diff --git a/dataset/csv-edge-case-tests/bom-and-header.csv b/dataset/csv-edge-case-tests/bom-and-header.csv new file mode 100644 index 00000000000..36defdb42d7 --- /dev/null +++ b/dataset/csv-edge-case-tests/bom-and-header.csv @@ -0,0 +1 @@ +col1,col2 diff --git a/dataset/csv-edge-case-tests/bom.csv b/dataset/csv-edge-case-tests/bom.csv new file mode 100644 index 00000000000..5f282702bb0 --- /dev/null +++ b/dataset/csv-edge-case-tests/bom.csv @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/dataset/csv-edge-case-tests/empty.csv b/dataset/csv-edge-case-tests/empty.csv new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/binder/bind/bind_file_scan.cpp b/src/binder/bind/bind_file_scan.cpp index 6d60bc985df..2f0569659e4 100644 --- a/src/binder/bind/bind_file_scan.cpp +++ b/src/binder/bind/bind_file_scan.cpp @@ -19,9 +19,6 @@ FileType Binder::bindFileType(const std::string& filePath) { std::filesystem::path fileName(filePath); auto extension = FileUtils::getFileExtension(fileName); auto fileType = FileTypeUtils::getFileTypeFromExtension(extension); - if (fileType == FileType::UNKNOWN) { - throw CopyException("Unsupported file type: " + filePath); - } return fileType; } @@ -68,7 +65,16 @@ static char bindParsingOptionValue(std::string value) { return value[value.length() - 1]; } -static void bindStringParsingOptions( +static void bindBoolParsingOption( + CSVReaderConfig& csvReaderConfig, const std::string& optionName, bool optionValue) { + if (optionName == "HEADER") { + csvReaderConfig.hasHeader = optionValue; + } else if (optionName == "PARALLEL") { + csvReaderConfig.parallel = optionValue; + } +} + +static void bindStringParsingOption( CSVReaderConfig& csvReaderConfig, const std::string& optionName, std::string& optionValue) { auto parsingOptionValue = bindParsingOptionValue(optionValue); if (optionName == "ESCAPE") { @@ -84,13 +90,17 @@ static void bindStringParsingOptions( } } -static bool validateStringParsingOptionName(std::string& parsingOptionName) { - for (auto i = 0; i < std::size(CopyConstants::STRING_CSV_PARSING_OPTIONS); i++) { - if (parsingOptionName == CopyConstants::STRING_CSV_PARSING_OPTIONS[i]) { - return true; - } - } - return false; +template +static bool hasOption(const char* const (&arr)[size], const std::string& option) { + return std::find(std::begin(arr), std::end(arr), option) != std::end(arr); +} + +static bool validateBoolParsingOptionName(const std::string& parsingOptionName) { + return hasOption(CopyConstants::BOOL_CSV_PARSING_OPTIONS, parsingOptionName); +} + +static bool validateStringParsingOptionName(const std::string& parsingOptionName) { + return hasOption(CopyConstants::STRING_CSV_PARSING_OPTIONS, parsingOptionName); } std::unique_ptr Binder::bindParsingOptions( @@ -99,29 +109,31 @@ std::unique_ptr Binder::bindParsingOptions( for (auto& parsingOption : parsingOptions) { auto copyOptionName = parsingOption.first; StringUtils::toUpper(copyOptionName); + bool isValidStringParsingOption = validateStringParsingOptionName(copyOptionName); + bool isValidBoolParsingOption = validateBoolParsingOptionName(copyOptionName); + auto copyOptionExpression = parsingOption.second.get(); auto boundCopyOptionExpression = expressionBinder.bindExpression(*copyOptionExpression); assert(boundCopyOptionExpression->expressionType == LITERAL); - if (copyOptionName == "HEADER") { + if (isValidBoolParsingOption) { if (boundCopyOptionExpression->dataType.getLogicalTypeID() != LogicalTypeID::BOOL) { throw BinderException( - "The value type of parsing csv option " + copyOptionName + " must be boolean."); + "The type of csv parsing option " + copyOptionName + " must be a boolean."); } - csvReaderConfig->hasHeader = + auto copyOptionValue = ((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue(); - } else if (boundCopyOptionExpression->dataType.getLogicalTypeID() == - LogicalTypeID::STRING && - isValidStringParsingOption) { + bindBoolParsingOption(*csvReaderConfig, copyOptionName, copyOptionValue); + } else if (isValidStringParsingOption) { if (boundCopyOptionExpression->dataType.getLogicalTypeID() != LogicalTypeID::STRING) { throw BinderException( - "The value type of parsing csv option " + copyOptionName + " must be string."); + "The type of csv parsing option " + copyOptionName + " must be a string."); } auto copyOptionValue = ((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue(); - bindStringParsingOptions(*csvReaderConfig, copyOptionName, copyOptionValue); + bindStringParsingOption(*csvReaderConfig, copyOptionName, copyOptionValue); } else { - throw BinderException("Unrecognized parsing csv option: " + copyOptionName + "."); + throw BinderException("Unrecognized csv parsing option: " + copyOptionName + "."); } } return csvReaderConfig; diff --git a/src/binder/bind/bind_reading_clause.cpp b/src/binder/bind/bind_reading_clause.cpp index 1a508dd27df..5e9dba65b8a 100644 --- a/src/binder/bind/bind_reading_clause.cpp +++ b/src/binder/bind/bind_reading_clause.cpp @@ -8,7 +8,7 @@ #include "parser/query/reading_clause/in_query_call_clause.h" #include "parser/query/reading_clause/load_from.h" #include "parser/query/reading_clause/unwind_clause.h" -#include "processor/operator/persistent/reader/csv/csv_reader.h" +#include "processor/operator/persistent/reader/csv/serial_csv_reader.h" #include "processor/operator/persistent/reader/npy_reader.h" #include "processor/operator/persistent/reader/parquet/parquet_reader.h" @@ -139,8 +139,7 @@ std::unique_ptr Binder::bindLoadFrom( expression_vector columns; switch (fileType) { case FileType::CSV: { - auto csvReader = BufferedCSVReader( - readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/); + auto csvReader = SerialCSVReader(readerConfig->filePaths[0], *readerConfig); csvReader.SniffCSV(); auto numColumns = csvReader.getNumColumnsDetected(); auto stringType = LogicalType(LogicalTypeID::STRING); diff --git a/src/common/copier_config/copier_config.cpp b/src/common/copier_config/copier_config.cpp index a35ce92b7c2..e4ad8ca8041 100644 --- a/src/common/copier_config/copier_config.cpp +++ b/src/common/copier_config/copier_config.cpp @@ -1,19 +1,19 @@ #include "common/copier_config/copier_config.h" #include "common/exception/copy.h" -#include "utf8proc_wrapper.h" - -using namespace kuzu::utf8proc; namespace kuzu { namespace common { +const static std::unordered_map fileTypeMap{{".csv", FileType::CSV}, + {".parquet", FileType::PARQUET}, {".npy", FileType::NPY}, {".ttl", FileType::TURTLE}}; + FileType FileTypeUtils::getFileTypeFromExtension(const std::string& extension) { - FileType fileType = fileTypeMap[extension]; - if (fileType == FileType::UNKNOWN) { + auto entry = fileTypeMap.find(extension); + if (entry == fileTypeMap.end()) { throw CopyException("Unsupported file type " + extension); } - return fileType; + return entry->second; } std::string FileTypeUtils::toString(FileType fileType) { diff --git a/src/include/common/constants.h b/src/include/common/constants.h index fcd0d6a46c1..a3097cbc152 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -126,15 +126,18 @@ struct CopyConstants { // Size (in bytes) of the chunks to be read in Node/Rel Copier static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23; + static constexpr const char* BOOL_CSV_PARSING_OPTIONS[] = {"HEADER", "PARALLEL"}; + static constexpr bool DEFAULT_CSV_HAS_HEADER = false; + static constexpr bool DEFAULT_CSV_PARALLEL = true; + // Default configuration for csv file parsing - static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = { + static constexpr const char* STRING_CSV_PARSING_OPTIONS[] = { "ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"}; static constexpr char DEFAULT_CSV_ESCAPE_CHAR = '\\'; static constexpr char DEFAULT_CSV_DELIMITER = ','; static constexpr char DEFAULT_CSV_QUOTE_CHAR = '"'; static constexpr char DEFAULT_CSV_LIST_BEGIN_CHAR = '['; static constexpr char DEFAULT_CSV_LIST_END_CHAR = ']'; - static constexpr bool DEFAULT_CSV_HAS_HEADER = false; static constexpr char DEFAULT_CSV_LINE_BREAK = '\n'; }; diff --git a/src/include/common/copier_config/copier_config.h b/src/include/common/copier_config/copier_config.h index 57b9b7d0eff..2a20a2b2573 100644 --- a/src/include/common/copier_config/copier_config.h +++ b/src/include/common/copier_config/copier_config.h @@ -19,6 +19,7 @@ struct CSVReaderConfig { char listBeginChar; char listEndChar; bool hasHeader; + bool parallel; CSVReaderConfig() : escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR}, @@ -26,11 +27,13 @@ struct CSVReaderConfig { quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR}, listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR}, listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR}, - hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {} + hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER}, + parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {} + CSVReaderConfig(const CSVReaderConfig& other) : escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar}, listBeginChar{other.listBeginChar}, - listEndChar{other.listEndChar}, hasHeader{other.hasHeader} {} + listEndChar{other.listEndChar}, hasHeader{other.hasHeader}, parallel{other.parallel} {} inline std::unique_ptr copy() const { return std::make_unique(*this); @@ -40,10 +43,6 @@ struct CSVReaderConfig { enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 }; struct FileTypeUtils { - inline static std::unordered_map fileTypeMap{ - {"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET}, - {".npy", FileType::NPY}, {".ttl", FileType::TURTLE}}; - static FileType getFileTypeFromExtension(const std::string& extension); static std::string toString(FileType fileType); }; @@ -73,8 +72,13 @@ struct ReaderConfig { } } - inline bool parallelRead() const { - return fileType != FileType::CSV && fileType != FileType::TURTLE; + inline bool csvParallelRead(TableType tableType) const { + return tableType != TableType::REL && csvReaderConfig->parallel; + } + + inline bool parallelRead(TableType tableType) const { + return (fileType != FileType::CSV || csvParallelRead(tableType)) && + fileType != FileType::TURTLE; } inline uint32_t getNumFiles() const { return filePaths.size(); } inline uint32_t getNumColumns() const { return columnNames.size(); } diff --git a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h new file mode 100644 index 00000000000..bd904ce47b8 --- /dev/null +++ b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h @@ -0,0 +1,99 @@ +#pragma once + +#include +#include + +#include "common/copier_config/copier_config.h" +#include "common/data_chunk/data_chunk.h" +#include "common/types/types.h" + +namespace kuzu { +namespace processor { + +enum class ParserMode : uint8_t { + PARSING = 0, + PARSING_HEADER = 1, + SNIFFING_DIALECT = 2, + INVALID = 255 +}; + +class BaseCSVReader { +protected: + //! Initial buffer read size; can be extended for long values. + static constexpr uint64_t INITIAL_BUFFER_SIZE = 16384; + +public: + BaseCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig); + + virtual ~BaseCSVReader(); + + uint64_t ParseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk); + + uint64_t CountRows(); + +protected: + void AddValue(common::DataChunk& resultChunk, std::string strVal, common::column_id_t columnIdx, + std::vector& escapePositions); + void AddRow(common::DataChunk&, common::column_id_t column); + + //! If this finds a BOM, it advances `position`. + void ReadBOM(); + void ReadHeader(); + //! Reads a new buffer from the CSV file. + //! Uses the start value to ensure the current value stays within the buffer. + //! Modifies the start value to point to the new start of the current value. + //! If start is NULL, none of the buffer is kept. + //! Returns false if the file has been exhausted. + bool ReadBuffer(uint64_t* start); + + //! Like ReadBuffer, but only reads if position >= bufferSize. + //! If this returns true, buffer[position] is a valid character that we can read. + inline bool MaybeReadBuffer(uint64_t* start) { + return position < bufferSize || ReadBuffer(start); + } + + uint64_t ParseCSV(common::DataChunk& resultChunk); + + inline bool isNewLine(char c) { return c == '\n' || c == '\r'; } + + // Get the file offset of the current buffer position. + uint64_t getFileOffset() const; + uint64_t getLineNumber(); + +protected: + //! Called when starting the parsing of a new block. + virtual void parseBlockHook() = 0; + virtual bool finishedBlockDetail() const = 0; + virtual void handleQuotedNewline() = 0; + +private: + void copyStringToVector(common::ValueVector*, std::string); + //! Called after a row is finished to determine if we should keep processing. + inline bool finishedBlock() { + return mode != ParserMode::PARSING || rowToAdd >= common::DEFAULT_VECTOR_CAPACITY || + finishedBlockDetail(); + } + +protected: + std::string filePath; + common::CSVReaderConfig& csvReaderConfig; + + uint64_t expectedNumColumns; + uint64_t numColumnsDetected; + int fd; + + common::block_idx_t currentBlockIdx; + + std::unique_ptr buffer; + uint64_t bufferSize; + uint64_t position; + + bool rowEmpty = false; + + ParserMode mode; + + uint64_t rowToAdd; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/reader/csv/csv_reader.h b/src/include/processor/operator/persistent/reader/csv/csv_reader.h deleted file mode 100644 index 8ff76cdc783..00000000000 --- a/src/include/processor/operator/persistent/reader/csv/csv_reader.h +++ /dev/null @@ -1,99 +0,0 @@ -#pragma once - -#include -#include - -#include "common/copier_config/copier_config.h" -#include "common/data_chunk/data_chunk.h" -#include "common/types/types.h" - -namespace kuzu { -namespace processor { - -enum class ParserMode : uint8_t { PARSING = 0, PARSING_HEADER = 1, SNIFFING_DIALECT = 2 }; - -class BaseCSVReader { -public: - BaseCSVReader(const std::string& filePath, common::CSVReaderConfig csvReaderConfig, - uint64_t expectedNumColumns); - - virtual ~BaseCSVReader() = default; - - inline bool isNewLine(char c) { return c == '\n' || c == '\r'; } - - inline uint64_t getNumColumnsDetected() const { return numColumnsDetected; } - - common::CSVReaderConfig csvReaderConfig; - std::string filePath; - const uint64_t expectedNumColumns; - - uint64_t linenr = 0; - uint64_t bytesInChunk = 0; - bool bomChecked = false; - bool rowEmpty = false; - - ParserMode mode; - -protected: - void AddValue(common::DataChunk& resultChunk, std::string strVal, - common::column_id_t& columnIdx, std::vector& escapePositions); - // Adds a row to the insert_chunk, returns true if the chunk is filled as a result of this row - // being added. - bool AddRow(common::DataChunk& resultChunk, common::column_id_t& column); - -private: - void copyStringToVector(common::ValueVector* vector, std::string& strVal); - -protected: - uint64_t rowToAdd; - uint64_t numColumnsDetected; -}; - -//! Buffered CSV reader is a class that reads values from a stream and parses them as a CSV file -class BufferedCSVReader : public BaseCSVReader { - //! Initial buffer read size; can be extended for long lines - static constexpr uint64_t INITIAL_BUFFER_SIZE = 16384; - //! Larger buffer size for non disk files - static constexpr uint64_t INITIAL_BUFFER_SIZE_LARGE = 10000000; // 10MB - -public: - BufferedCSVReader(const std::string& filePath, common::CSVReaderConfig csvReaderConfig, - uint64_t expectedNumColumns); - - ~BufferedCSVReader() override; - - std::unique_ptr buffer; - uint64_t bufferSize; - uint64_t position; - uint64_t start = 0; - int fd; - - std::vector> cachedBuffers; - -public: - //! Extract a single DataChunk from the CSV file and stores it in insert_chunk - uint64_t ParseCSV(common::DataChunk& resultChunk); - //! Sniffs CSV dialect and determines skip rows, header row, column types and column names - void SniffCSV(); - -private: - //! Initialize Parser - void Initialize(); - //! Jumps back to the beginning of input stream and resets necessary internal states - void JumpToBeginning(bool skipHeader); - //! Skips skip_rows, reads header row from input stream - void ReadHeader(); - //! Resets the buffer - void ResetBuffer(); - //! Extract a single DataChunk from the CSV file and stores it in insert_chunk - uint64_t TryParseCSV(common::DataChunk& resultChunk, std::string& errorMessage); - //! Parses a CSV file with a one-byte delimiter, escape and quote character - uint64_t TryParseSimpleCSV(common::DataChunk& resultChunk, std::string& errorMessage); - //! Reads a new buffer from the CSV file if the current one has been exhausted - bool ReadBuffer(uint64_t& start, uint64_t& lineStart); - //! Skip Empty lines for tables with over one column - void SkipEmptyLines(); -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h new file mode 100644 index 00000000000..9e935b27dea --- /dev/null +++ b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h @@ -0,0 +1,25 @@ +#pragma once + +#include "base_csv_reader.h" + +namespace kuzu { +namespace processor { + +//! ParallelCSVReader is a class that reads values from a stream in parallel. +class ParallelCSVReader final : public BaseCSVReader { +public: + static const uint64_t PARALLEL_BLOCK_SIZE; + + ParallelCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig); + + bool hasMoreToRead() const; + uint64_t ContinueBlock(common::DataChunk& resultChunk); + +protected: + void parseBlockHook() override; + void handleQuotedNewline() override; + bool finishedBlockDetail() const override; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h new file mode 100644 index 00000000000..8da2856e6ae --- /dev/null +++ b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h @@ -0,0 +1,24 @@ +#pragma once + +#include "base_csv_reader.h" + +namespace kuzu { +namespace processor { + +//! Serial CSV reader is a class that reads values from a stream in a single thread. +class SerialCSVReader final : public BaseCSVReader { +public: + SerialCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig); + + //! Sniffs CSV dialect and determines skip rows, header row, column types and column names + void SniffCSV(); + inline uint64_t getNumColumnsDetected() const { return numColumnsDetected; } + +protected: + void parseBlockHook() override {} + void handleQuotedNewline() override {} + bool finishedBlockDetail() const override; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/reader_functions.h b/src/include/processor/operator/persistent/reader_functions.h index 32fcbc6aed3..906c197ebd2 100644 --- a/src/include/processor/operator/persistent/reader_functions.h +++ b/src/include/processor/operator/persistent/reader_functions.h @@ -1,6 +1,7 @@ #pragma once -#include "processor/operator/persistent/reader/csv/csv_reader.h" +#include "processor/operator/persistent/reader/csv/parallel_csv_reader.h" +#include "processor/operator/persistent/reader/csv/serial_csv_reader.h" #include "processor/operator/persistent/reader/npy_reader.h" #include "processor/operator/persistent/reader/parquet/parquet_reader.h" #include "processor/operator/persistent/reader/rdf/rdf_reader.h" @@ -17,15 +18,24 @@ struct ReaderFunctionData { virtual ~ReaderFunctionData() = default; + virtual inline bool emptyBlockImpliesDone() const { return false; } virtual inline bool hasMoreToRead() const { return false; } }; struct RelCSVReaderFunctionData : public ReaderFunctionData { std::shared_ptr reader = nullptr; + + inline bool emptyBlockImpliesDone() const override { return true; } +}; + +struct SerialCSVReaderFunctionData : public ReaderFunctionData { + std::unique_ptr reader = nullptr; }; -struct BufferedCSVReaderFunctionData : public ReaderFunctionData { - std::unique_ptr reader = nullptr; +struct ParallelCSVReaderFunctionData : public ReaderFunctionData { + std::unique_ptr reader = nullptr; + + inline bool hasMoreToRead() const override { return reader->hasMoreToRead(); } }; struct RelParquetReaderFunctionData : public ReaderFunctionData { @@ -36,10 +46,7 @@ struct ParquetReaderFunctionData : public ReaderFunctionData { std::unique_ptr reader = nullptr; std::unique_ptr state = nullptr; - inline bool hasMoreToRead() const override { - return !reinterpret_cast(this) - ->state->groupIdxList.empty(); - } + inline bool hasMoreToRead() const override { return !state->groupIdxList.empty(); } }; struct NPYReaderFunctionData : public ReaderFunctionData { @@ -67,12 +74,13 @@ using read_rows_func_t = std::function getReadFuncData( - common::FileType fileType, common::TableType tableType); + const common::ReaderConfig& config, common::TableType tableType); static inline void validateNoOp(const common::ReaderConfig& config) { // DO NOTHING. @@ -81,7 +89,9 @@ struct ReaderFunctions { static std::vector countRowsNoOp( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInNodeCSVFile( + static std::vector countRowsInSerialCSVFile( + const common::ReaderConfig& config, storage::MemoryManager* memoryManager); + static std::vector countRowsInParallelCSVFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); static std::vector countRowsInRelParquetFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); @@ -94,7 +104,9 @@ struct ReaderFunctions { static void initRelCSVReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx, const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static void initBufferedCSVReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx, + static void initParallelCSVReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx, + const common::ReaderConfig& config, storage::MemoryManager* memoryManager); + static void initSerialCSVReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx, const common::ReaderConfig& config, storage::MemoryManager* memoryManager); static void initRelParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx, const common::ReaderConfig& config, storage::MemoryManager* memoryManager); @@ -107,7 +119,9 @@ struct ReaderFunctions { static void readRowsFromRelCSVFile(const ReaderFunctionData& funcData, common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); - static void readRowsWithBufferedCSVReader(const ReaderFunctionData& funcData, + static void readRowsFromSerialCSVFile(const ReaderFunctionData& funcData, + common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); + static void readRowsFromParallelCSVFile(const ReaderFunctionData& funcData, common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); static void readRowsFromRelParquetFile(const ReaderFunctionData& funcData, common::block_idx_t blockIdx, common::DataChunk* vectorsToRead); diff --git a/src/include/processor/operator/persistent/reader_state.h b/src/include/processor/operator/persistent/reader_state.h index 2af550df37b..1f598348527 100644 --- a/src/include/processor/operator/persistent/reader_state.h +++ b/src/include/processor/operator/persistent/reader_state.h @@ -76,7 +76,6 @@ class ReaderSharedState { validate_func_t validateFunc; init_reader_data_func_t initFunc; count_blocks_func_t countBlocksFunc; - read_rows_func_t readFunc; std::shared_ptr readFuncData; common::row_idx_t numRows; diff --git a/src/processor/operator/persistent/reader.cpp b/src/processor/operator/persistent/reader.cpp index 5b745b075a4..aa0b3854837 100644 --- a/src/processor/operator/persistent/reader.cpp +++ b/src/processor/operator/persistent/reader.cpp @@ -22,28 +22,24 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont for (auto i = 0u; i < info->getNumColumns(); i++) { dataChunk->insert(i, resultSet->getValueVector(info->dataColumnsPos[i])); } - initFunc = - ReaderFunctions::getInitDataFunc(sharedState->readerConfig->fileType, info->tableType); - readFunc = - ReaderFunctions::getReadRowsFunc(sharedState->readerConfig->fileType, info->tableType); + initFunc = ReaderFunctions::getInitDataFunc(*sharedState->readerConfig, info->tableType); + readFunc = ReaderFunctions::getReadRowsFunc(*sharedState->readerConfig, info->tableType); if (info->nodeOffsetPos.dataChunkPos != INVALID_DATA_CHUNK_POS) { offsetVector = resultSet->getValueVector(info->nodeOffsetPos).get(); } assert(!sharedState->readerConfig->filePaths.empty()); - switch (sharedState->readerConfig->fileType) { - case FileType::CSV: { + if (sharedState->readerConfig->fileType == FileType::CSV && + !sharedState->readerConfig->csvParallelRead(info->tableType)) { readFuncData = sharedState->readFuncData; - } break; - default: { + } else { readFuncData = - ReaderFunctions::getReadFuncData(sharedState->readerConfig->fileType, info->tableType); - initFunc(*readFuncData, 0, *sharedState->readerConfig, context->memoryManager); - } + ReaderFunctions::getReadFuncData(*sharedState->readerConfig, info->tableType); + initFunc(*readFuncData, 0, *sharedState->readerConfig, memoryManager); } } bool Reader::getNextTuplesInternal(ExecutionContext* context) { - sharedState->readerConfig->parallelRead() ? + sharedState->readerConfig->parallelRead(info->tableType) ? readNextDataChunk() : readNextDataChunk(); return dataChunk->state->selVector->selectedSize != 0; @@ -84,7 +80,7 @@ void Reader::readNextDataChunk() { readFunc(*readFuncData, morsel->blockIdx, dataChunk.get()); if (dataChunk->state->selVector->selectedSize > 0) { leftArrowArrays.appendFromDataChunk(dataChunk.get()); - } else { + } else if (readFuncData->emptyBlockImpliesDone()) { sharedState->moveToNextFile(); } } diff --git a/src/processor/operator/persistent/reader/csv/CMakeLists.txt b/src/processor/operator/persistent/reader/csv/CMakeLists.txt index be253fad464..8ab47058608 100644 --- a/src/processor/operator/persistent/reader/csv/CMakeLists.txt +++ b/src/processor/operator/persistent/reader/csv/CMakeLists.txt @@ -1,6 +1,8 @@ add_library(kuzu_processor_operator_csv_reader OBJECT - csv_reader.cpp) + base_csv_reader.cpp + parallel_csv_reader.cpp + serial_csv_reader.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/processor/operator/persistent/reader/csv/csv_reader.cpp b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp similarity index 59% rename from src/processor/operator/persistent/reader/csv/csv_reader.cpp rename to src/processor/operator/persistent/reader/csv/base_csv_reader.cpp index 7332004c204..dbc57a04706 100644 --- a/src/processor/operator/persistent/reader/csv/csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp @@ -1,10 +1,10 @@ -#include "processor/operator/persistent/reader/csv/csv_reader.h" +#include "processor/operator/persistent/reader/csv/base_csv_reader.h" #include +#include "common/data_chunk/data_chunk.h" #include "common/exception/copy.h" #include "common/exception/message.h" -#include "common/exception/not_implemented.h" #include "common/exception/parser.h" #include "common/string_utils.h" #include "common/type_utils.h" @@ -17,13 +17,129 @@ using namespace kuzu::common; namespace kuzu { namespace processor { -BaseCSVReader::BaseCSVReader( - const std::string& filePath, CSVReaderConfig csvReaderConfig, uint64_t expectedNumColumns) - : csvReaderConfig{std::move(csvReaderConfig)}, filePath{filePath}, - expectedNumColumns{expectedNumColumns}, rowToAdd{0} {} +BaseCSVReader::BaseCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig) + : filePath{filePath}, csvReaderConfig{*readerConfig.csvReaderConfig}, + expectedNumColumns(readerConfig.getNumColumns()), numColumnsDetected(-1), fd(-1), + buffer(nullptr), bufferSize(0), position(0), rowEmpty(false), mode(ParserMode::INVALID), + rowToAdd(0) { + // TODO(Ziyi): should we wrap this fd using kuzu file handler? + fd = open(filePath.c_str(), O_RDONLY); + if (fd == -1) { + throw CopyException( + StringUtils::string_format("Could not open file {}: {}", filePath, strerror(errno))); + } +} + +BaseCSVReader::~BaseCSVReader() { + if (fd != -1) { + close(fd); + } +} + +uint64_t BaseCSVReader::ParseBlock(common::block_idx_t blockIdx, DataChunk& resultChunk) { + currentBlockIdx = blockIdx; + parseBlockHook(); + if (blockIdx == 0) { + ReadBOM(); + if (csvReaderConfig.hasHeader) { + ReadHeader(); + } + } + // Are we done after reading the header and executing the block hook? + if (finishedBlockDetail()) { + return 0; + } + mode = ParserMode::PARSING; + return ParseCSV(resultChunk); +} -void BaseCSVReader::AddValue(DataChunk& resultChunk, std::string strVal, column_id_t& columnIdx, - std::vector& escapePositions) { +uint64_t BaseCSVReader::CountRows() { + uint64_t rows = 0; + ReadBOM(); + if (csvReaderConfig.hasHeader) { + ReadHeader(); + } + +line_start: + // Pass bufferSize as start to avoid keeping any portion of the buffer. + if (!MaybeReadBuffer(nullptr)) { + return rows; + } + + // If the number of columns is 1, every line start indicates a row. + if (expectedNumColumns == 1) { + rows++; + } + + if (buffer[position] == '\r') { + position++; + goto carriage_return; + } else if (buffer[position] == '\n') { + position++; + goto line_start; + } else { + // If we have more than one column, every non-empty line is a row. + if (expectedNumColumns != 1) { + rows++; + } + goto normal; + } +normal: + do { + if (buffer[position] == '\r') { + position++; + goto carriage_return; + } else if (buffer[position] == '\n') { + position++; + goto line_start; + } else if (buffer[position] == csvReaderConfig.quoteChar) { + position++; + goto in_quotes; + } else { + position++; + // Just a normal character of some kind. + } + } while (MaybeReadBuffer(nullptr)); + return rows; + +carriage_return: + if (!MaybeReadBuffer(nullptr)) { + return rows; + } + + if (buffer[position] == '\n') { + position++; + } + goto line_start; + +in_quotes: + if (!MaybeReadBuffer(nullptr)) { + return rows; + } + + do { + if (buffer[position] == csvReaderConfig.quoteChar) { + position++; + goto normal; + } else if (buffer[position] == csvReaderConfig.escapeChar) { + position++; + goto escape; + } else { + position++; + } + } while (MaybeReadBuffer(nullptr)); + return rows; + +escape: + if (!MaybeReadBuffer(nullptr)) { + return rows; + } + position++; + goto in_quotes; +} + +void BaseCSVReader::AddValue(DataChunk& resultChunk, std::string strVal, + const column_id_t columnIdx, std::vector& escapePositions) { if (mode == ParserMode::PARSING_HEADER) { return; } @@ -39,13 +155,12 @@ void BaseCSVReader::AddValue(DataChunk& resultChunk, std::string strVal, column_ } if (mode == ParserMode::SNIFFING_DIALECT) { // Do not copy data while sniffing csv. - columnIdx++; return; } if (columnIdx >= expectedNumColumns) { throw CopyException(StringUtils::string_format( - "Error in file {}, on line {}: expected {} values per row, but got more. ", filePath, - linenr, expectedNumColumns)); + "Error in file {}, on line {}: expected {} values per row, but got more.", filePath, + getLineNumber(), expectedNumColumns)); } // insert the line number into the chunk @@ -63,47 +178,35 @@ void BaseCSVReader::AddValue(DataChunk& resultChunk, std::string strVal, column_ escapePositions.clear(); strVal = newVal; } - copyStringToVector(resultChunk.getValueVector(columnIdx).get(), strVal); - // move to the next columnIdx - columnIdx++; + copyStringToVector(resultChunk.getValueVector(columnIdx).get(), std::move(strVal)); } -bool BaseCSVReader::AddRow(DataChunk& resultChunk, column_id_t& column) { - rowToAdd++; +void BaseCSVReader::AddRow(DataChunk& resultChunk, column_id_t column) { if (mode == ParserMode::PARSING_HEADER) { - return true; + return; } - linenr++; if (rowEmpty) { rowEmpty = false; if (expectedNumColumns != 1) { - if (mode == ParserMode::PARSING) { - // Set This position to be null - ; - } - column = 0; - return false; + return; } + // Otherwise, treat it as null. } + rowToAdd++; if (column < expectedNumColumns && mode != ParserMode::SNIFFING_DIALECT) { - // Number of column mismatch. We don't error while sniffing dialect because number of - // columns is only known after reading the first row. + // Column number mismatch. We don't error while sniffing dialect because number of columns + // is only known after reading the first row. throw CopyException(StringUtils::string_format( - "Error in file {} on line {}: expected {} values per row, but got {}", filePath, linenr, - expectedNumColumns, column)); - } - if (mode == ParserMode::PARSING && rowToAdd >= DEFAULT_VECTOR_CAPACITY) { - return true; + "Error in file {} on line {}: expected {} values per row, but got {}", filePath, + getLineNumber(), expectedNumColumns, column)); } if (mode == ParserMode::SNIFFING_DIALECT) { - numColumnsDetected = column; // Only read one row while sniffing csv. - return true; + numColumnsDetected = column; // Use the first row to determine the number of columns. + return; } - column = 0; - return false; } -void BaseCSVReader::copyStringToVector(ValueVector* vector, std::string& strVal) { +void BaseCSVReader::copyStringToVector(common::ValueVector* vector, std::string strVal) { auto& type = vector->dataType; if (strVal.empty()) { vector->setNull(rowToAdd, true /* isNull */); @@ -224,69 +327,36 @@ void BaseCSVReader::copyStringToVector(ValueVector* vector, std::string& strVal) StructVector::getFieldVector(vector, UnionType::TAG_FIELD_IDX) ->setNull(rowToAdd, false /* isNull */); } break; - default: { - throw NotImplementedException("BaseCSVReader::AddValue"); + default: { // LCOV_EXCL_START + throw NotImplementedException("BaseCSVReader::copyStringToVector"); + } // LCOV_EXCL_STOP } - } -} - -BufferedCSVReader::BufferedCSVReader( - const std::string& filePath, CSVReaderConfig csvReaderConfig, uint64_t expectedNumColumns) - : BaseCSVReader{filePath, csvReaderConfig, expectedNumColumns}, - bufferSize{0}, position{0}, start{0} { - Initialize(); } -BufferedCSVReader::~BufferedCSVReader() { - if (fd != -1) { - close(fd); +void BaseCSVReader::ReadBOM() { + if (!MaybeReadBuffer(nullptr)) { + return; } -} - -void BufferedCSVReader::Initialize() { - // TODO(Ziyi): should we wrap this fd using kuzu file handler? - fd = open(filePath.c_str(), O_RDONLY); - JumpToBeginning(csvReaderConfig.hasHeader); - mode = ParserMode::PARSING; -} - -void BufferedCSVReader::JumpToBeginning(bool skipHeader) { - ResetBuffer(); - if (skipHeader) { - ReadHeader(); + if (bufferSize >= 3 && buffer[0] == '\xEF' && buffer[1] == '\xBB' && buffer[2] == '\xBF') { + position = 3; } } -void BufferedCSVReader::ResetBuffer() { - buffer.reset(); - bufferSize = 0; - position = 0; - start = 0; - cachedBuffers.clear(); -} - -void BufferedCSVReader::ReadHeader() { - // ignore the first line as a header line +void BaseCSVReader::ReadHeader() { mode = ParserMode::PARSING_HEADER; DataChunk dummyChunk(0); ParseCSV(dummyChunk); } -void BufferedCSVReader::SniffCSV() { - mode = ParserMode::SNIFFING_DIALECT; - DataChunk dummyChunk(0); - ParseCSV(dummyChunk); - JumpToBeginning(csvReaderConfig.hasHeader); -} - -bool BufferedCSVReader::ReadBuffer(uint64_t& start, uint64_t& lineStart) { - if (start > bufferSize) { - return false; - } - auto oldBuffer = std::move(buffer); +bool BaseCSVReader::ReadBuffer(uint64_t* start) { + std::unique_ptr oldBuffer = std::move(buffer); // the remaining part of the last buffer - uint64_t remaining = bufferSize - start; + uint64_t remaining = 0; + if (start != nullptr) { + assert(*start <= bufferSize); + remaining = bufferSize - *start; + } uint64_t bufferReadSize = INITIAL_BUFFER_SIZE; @@ -298,7 +368,8 @@ bool BufferedCSVReader::ReadBuffer(uint64_t& start, uint64_t& lineStart) { bufferSize = remaining + bufferReadSize; if (remaining > 0) { // remaining from last buffer: copy it here - memcpy(buffer.get(), oldBuffer.get() + start, remaining); + assert(start != nullptr); + memcpy(buffer.get(), oldBuffer.get() + *start, remaining); } uint64_t readCount = read(fd, buffer.get() + remaining, bufferReadSize); @@ -307,74 +378,50 @@ bool BufferedCSVReader::ReadBuffer(uint64_t& start, uint64_t& lineStart) { "Could not read from file {}: {}", filePath, strerror(errno))); } - bytesInChunk += readCount; bufferSize = remaining + readCount; buffer[bufferSize] = '\0'; - if (oldBuffer) { - cachedBuffers.push_back(std::move(oldBuffer)); + if (start != nullptr) { + *start = 0; } - start = 0; position = remaining; - if (!bomChecked) { - bomChecked = true; - if (readCount >= 3 && buffer[0] == '\xEF' && buffer[1] == '\xBB' && buffer[2] == '\xBF') { - start += 3; - position += 3; - } - } - lineStart = start; - return readCount > 0; } -void BufferedCSVReader::SkipEmptyLines() { - if (expectedNumColumns == 1) { - // Empty lines are null data. - return; - } - for (; position < bufferSize; position++) { - if (!isNewLine(buffer[position])) { - return; - } - } -} - -uint64_t BufferedCSVReader::TryParseSimpleCSV(DataChunk& resultChunk, std::string& errorMessage) { +uint64_t BaseCSVReader::ParseCSV(DataChunk& resultChunk) { // used for parsing algorithm rowToAdd = 0; - bool finishedChunk = false; column_id_t column = 0; - uint64_t offset = 0; + uint64_t start = position; bool hasQuotes = false; std::vector escapePositions; - uint64_t lineStart = position; // read values into the buffer (if any) - if (position >= bufferSize) { - if (!ReadBuffer(start, lineStart)) { - return 0; - } + if (!MaybeReadBuffer(&start)) { + return 0; } // start parsing the first value goto value_start; value_start: - offset = 0; /* state: value_start */ // this state parses the first character of a value if (buffer[position] == csvReaderConfig.quoteChar) { + [[unlikely]] // quote: actual value starts in the next position // move to in_quotes state start = position + 1; + hasQuotes = true; goto in_quotes; } else { // no quote, move to normal parsing state start = position; + hasQuotes = false; goto normal; } normal: /* state: normal parsing state */ - // this state parses the remainder of a non-quoted value until we reach a delimiter or newline + // this state parses the remainder of a non-quoted value until we reach a delimiter or + // newline do { for (; position < bufferSize; position++) { if (buffer[position] == csvReaderConfig.delimiter) { @@ -385,65 +432,59 @@ uint64_t BufferedCSVReader::TryParseSimpleCSV(DataChunk& resultChunk, std::strin goto add_row; } } - } while (ReadBuffer(start, lineStart)); + } while (ReadBuffer(&start)); + + [[unlikely]] // file ends during normal scan: go to end state goto final_state; add_value: - AddValue(resultChunk, std::string(buffer.get() + start, position - start - offset), column, + // We get here after we have a delimiter. + assert(buffer[position] == csvReaderConfig.delimiter); + // Trim one character if we have quotes. + AddValue(resultChunk, std::string(buffer.get() + start, position - start - hasQuotes), column, escapePositions); - // increase position by 1 and move start to the new position - offset = 0; - hasQuotes = false; - start = ++position; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { - // file ends right after delimiter, go to final state + column++; + + // Move past the delimiter. + ++position; + // Adjust start for MaybeReadBuffer. + start = position; + if (!MaybeReadBuffer(&start)) { + [[unlikely]] + // File ends right after delimiter, go to final state goto final_state; } goto value_start; add_row : { - // check type of newline (\r or \n) - bool carriageReturn = buffer[position] == '\r'; - AddValue(resultChunk, std::string(buffer.get() + start, position - start - offset), column, + // We get here after we have a newline. + assert(isNewLine(buffer[position])); + bool isCarriageReturn = buffer[position] == '\r'; + AddValue(resultChunk, std::string(buffer.get() + start, position - start - hasQuotes), column, escapePositions); - if (!errorMessage.empty()) { - return -1; - } - finishedChunk = AddRow(resultChunk, column); - if (!errorMessage.empty()) { - return -1; - } - // increase position by 1 and move start to the new position - offset = 0; - hasQuotes = false; + column++; + + AddRow(resultChunk, column); + + column = 0; position++; + // Adjust start for ReadBuffer. start = position; - lineStart = position; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { - // file ends right after delimiter, go to final state + if (!MaybeReadBuffer(&start)) { + // File ends right after newline, go to final state. goto final_state; } - if (carriageReturn) { + if (isCarriageReturn) { // \r newline, go to special state that parses an optional \n afterwards goto carriage_return; } else { - SkipEmptyLines(); - start = position; - lineStart = position; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { - // file ends right after delimiter, go to final state - goto final_state; - } - // \n newline, move to value start - if (finishedChunk) { + if (finishedBlock()) { return rowToAdd; } goto value_start; } } in_quotes: - /* state: in_quotes */ - // this state parses the remainder of a quoted value - hasQuotes = true; + // this state parses the remainder of a quoted value. position++; do { for (; position < bufferSize; position++) { @@ -454,21 +495,24 @@ add_row : { // escape: store the escaped position and move to handle_escape state escapePositions.push_back(position - start); goto handle_escape; + } else if (isNewLine(buffer[position])) { + [[unlikely]] handleQuotedNewline(); } } - } while (ReadBuffer(start, lineStart)); + } while (ReadBuffer(&start)); + [[unlikely]] // still in quoted state at the end of the file, error: throw CopyException(StringUtils::string_format( - "Error in file {} on line {}: unterminated quotes. ", filePath, linenr)); + "Error in file {} on line {}: unterminated quotes.", filePath, getLineNumber())); unquote: - /* state: unquote */ + assert(hasQuotes && buffer[position] == csvReaderConfig.quoteChar); // this state handles the state directly after we unquote - // in this state we expect either another quote (entering the quoted state again, and escaping - // the quote) or a delimiter/newline, ending the current value and moving on to the next value + // in this state we expect either another quote (entering the quoted state again, and + // escaping the quote) or a delimiter/newline, ending the current value and moving on to the + // next value position++; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { + if (!MaybeReadBuffer(&start)) { // file ends right after unquote, go to final state - offset = 1; goto final_state; } if (buffer[position] == csvReaderConfig.quoteChar && @@ -478,93 +522,114 @@ add_row : { goto in_quotes; } else if (buffer[position] == csvReaderConfig.delimiter) { // delimiter, add value - offset = 1; goto add_value; } else if (isNewLine(buffer[position])) { - offset = 1; goto add_row; } else { - errorMessage = StringUtils::string_format( - "Error in file {} on line {}: quote should be followed by end of value, end of " - "row or another quote.", - filePath, linenr); - return -1; + [[unlikely]] throw CopyException( + StringUtils::string_format("Error in file {} on line {}: quote should be followed by " + "end of file, end of value, end of " + "row or another quote.", + filePath, getLineNumber())); } handle_escape: /* state: handle_escape */ // escape should be followed by a quote or another escape character position++; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { - errorMessage = StringUtils::string_format( - "Error in file {} on line {}: neither QUOTE nor ESCAPE is proceeded by ESCAPE.", - filePath, linenr); - return -1; + if (!MaybeReadBuffer(&start)) { + [[unlikely]] throw CopyException(StringUtils::string_format( + "Error in file {} on line {}: escape at end of file.", filePath, getLineNumber())); } if (buffer[position] != csvReaderConfig.quoteChar && buffer[position] != csvReaderConfig.escapeChar) { - errorMessage = StringUtils::string_format( + [[unlikely]] throw CopyException(StringUtils::string_format( "Error in file {} on line {}}: neither QUOTE nor ESCAPE is proceeded by ESCAPE.", - filePath, linenr); - return -1; + filePath, getLineNumber())); } // escape was followed by quote or escape, go back to quoted state goto in_quotes; carriage_return: - /* state: carriage_return */ - // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as - // a single line + // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted + // as a single line + + // position points to the character after the carriage return. if (buffer[position] == '\n') { // newline after carriage return: skip // increase position by 1 and move start to the new position start = ++position; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { - // file ends right after delimiter, go to final state + if (!MaybeReadBuffer(&start)) { + // file ends right after newline, go to final state goto final_state; } } - if (finishedChunk) { + if (finishedBlock()) { return rowToAdd; } - SkipEmptyLines(); - start = position; - lineStart = position; - if (position >= bufferSize && !ReadBuffer(start, lineStart)) { - // file ends right after delimiter, go to final state - goto final_state; - } goto value_start; final_state: - if (finishedChunk) { - return rowToAdd; - } - + // We get here when the file ends. + // If we were mid-value, add the remaining value to the chunk. if (column > 0 || position > start) { - // remaining values to be added to the chunk - AddValue(resultChunk, std::string(buffer.get() + start, position - start - offset), column, - escapePositions); - finishedChunk = AddRow(resultChunk, column); - SkipEmptyLines(); - if (!errorMessage.empty()) { - return -1; - } + // Add remaining value to chunk. + AddValue(resultChunk, std::string(buffer.get() + start, position - start - hasQuotes), + column, escapePositions); + column++; + AddRow(resultChunk, column); } - return rowToAdd; } -uint64_t BufferedCSVReader::ParseCSV(DataChunk& resultChunk) { - std::string errorMessage; - auto numRowsRead = TryParseCSV(resultChunk, errorMessage); - if (numRowsRead == -1) { - throw CopyException(errorMessage); +uint64_t BaseCSVReader::getFileOffset() const { + uint64_t offset = lseek(fd, 0, SEEK_CUR); + if (offset == -1) { + // LCOV_EXCL_START + throw CopyException(StringUtils::string_format( + "Could not get current file position for file {}: {}", filePath, strerror(errno))); + // LCOV_EXCL_END } - resultChunk.state->selVector->selectedSize = numRowsRead; - return numRowsRead; + assert(offset >= bufferSize); + return offset - bufferSize + position; } -uint64_t BufferedCSVReader::TryParseCSV(DataChunk& resultChunk, std::string& errorMessage) { - return TryParseSimpleCSV(resultChunk, errorMessage); +uint64_t BaseCSVReader::getLineNumber() { + uint64_t offset = getFileOffset(); + uint64_t lineNumber = 1; + const uint64_t BUF_SIZE = 4096; + char buf[BUF_SIZE]; + if (lseek(fd, 0, SEEK_SET) == -1) { + // LCOV_EXCL_START + throw CopyException(StringUtils::string_format( + "Could not seek to beginning of file {}: {}", filePath, strerror(errno))); + // LCOV_EXCL_END + } + + bool carriageReturn = false; + uint64_t totalBytes = 0; + do { + uint64_t bytesRead = read(fd, buf, std::min(BUF_SIZE, offset - totalBytes)); + if (bytesRead == -1) { + // LCOV_EXCL_START + throw CopyException(StringUtils::string_format( + "Could not read from file {}: {}", filePath, strerror(errno))); + // LCOV_EXCL_END + } + totalBytes += bytesRead; + + for (uint64_t i = 0; i < bytesRead; i++) { + if (buf[i] == '\n') { + lineNumber++; + carriageReturn = false; + } else if (carriageReturn) { + lineNumber++; + carriageReturn = false; + } + if (buf[i] == '\r') { + carriageReturn = true; + } + } + } while (totalBytes < offset); + return lineNumber + carriageReturn; } } // namespace processor diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp new file mode 100644 index 00000000000..1fe80f57025 --- /dev/null +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -0,0 +1,87 @@ +#include "processor/operator/persistent/reader/csv/parallel_csv_reader.h" + +#include "common/exception/copy.h" +#include "common/string_utils.h" + +using namespace kuzu::common; + +namespace kuzu { +namespace processor { + +// This means that we will usually read the entirety of the contents of the file we need for a block +// in one read request. +// It is also very small, which means we can parallelize small files efficiently. +const uint64_t ParallelCSVReader::PARALLEL_BLOCK_SIZE = BaseCSVReader::INITIAL_BUFFER_SIZE / 2; + +ParallelCSVReader::ParallelCSVReader( + const std::string& filePath, const common::ReaderConfig& readerConfig) + : BaseCSVReader{filePath, readerConfig} {} + +bool ParallelCSVReader::hasMoreToRead() const { + // If we haven't started the first block yet or are done our block, get the next block. + return buffer != nullptr && !finishedBlockDetail(); +} + +uint64_t ParallelCSVReader::ContinueBlock(common::DataChunk& resultChunk) { + assert(buffer != nullptr && !finishedBlockDetail() && mode == ParserMode::PARSING); + return ParseCSV(resultChunk); +} + +void ParallelCSVReader::parseBlockHook() { + // Seek to the proper location in the file. + if (lseek(fd, currentBlockIdx * PARALLEL_BLOCK_SIZE, SEEK_SET) == -1) { + // LCOV_EXCL_START + throw CopyException(StringUtils::string_format("Failed to seek to block {} in file {}: {}", + currentBlockIdx, filePath, strerror(errno))); + // LCOV_EXCL_END + } + + if (currentBlockIdx == 0) { + // First block doesn't search for a newline. + return; + } + + // Reset the buffer. + position = 0; + bufferSize = 0; + buffer.reset(); + if (!ReadBuffer(nullptr)) { + return; + } + + // Find the start of the next line. + do { + for (; position < bufferSize; position++) { + if (buffer[position] == '\r') { + position++; + if (!MaybeReadBuffer(nullptr)) { + return; + } + if (buffer[position] == '\n') { + position++; + } + return; + } else if (buffer[position] == '\n') { + position++; + return; + } + } + } while (ReadBuffer(nullptr)); +} + +void ParallelCSVReader::handleQuotedNewline() { + throw CopyException( + StringUtils::string_format("Quoted newlines are not supported in parallel CSV reader " + "(while parsing {} on line {}). Please " + "specify PARALLEL=FALSE in the options.", + filePath, getLineNumber())); +} + +bool ParallelCSVReader::finishedBlockDetail() const { + // Only stop if we've ventured into the next block by at least a byte. + // Use `>` because `position` points to just past the newline right now. + return getFileOffset() > (currentBlockIdx + 1) * PARALLEL_BLOCK_SIZE; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp new file mode 100644 index 00000000000..2e01d795b9f --- /dev/null +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -0,0 +1,25 @@ +#include "processor/operator/persistent/reader/csv/serial_csv_reader.h" + +using namespace kuzu::common; + +namespace kuzu { +namespace processor { + +SerialCSVReader::SerialCSVReader( + const std::string& filePath, const common::ReaderConfig& readerConfig) + : BaseCSVReader{filePath, readerConfig} {} + +void SerialCSVReader::SniffCSV() { + ReadBOM(); + mode = ParserMode::SNIFFING_DIALECT; + DataChunk dummyChunk(0); + ParseCSV(dummyChunk); +} + +bool SerialCSVReader::finishedBlockDetail() const { + // Never stop until we fill the chunk. + return false; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/reader_functions.cpp b/src/processor/operator/persistent/reader_functions.cpp index f13a3622949..09ad7847923 100644 --- a/src/processor/operator/persistent/reader_functions.cpp +++ b/src/processor/operator/persistent/reader_functions.cpp @@ -1,5 +1,6 @@ #include "processor/operator/persistent/reader_functions.h" +#include "common/exception/copy.h" #include using namespace kuzu::common; @@ -22,17 +23,15 @@ validate_func_t ReaderFunctions::getValidateFunc(FileType fileType) { } } -count_blocks_func_t ReaderFunctions::getCountBlocksFunc(FileType fileType, TableType tableType) { - switch (fileType) { +count_blocks_func_t ReaderFunctions::getCountBlocksFunc( + const ReaderConfig& config, TableType tableType) { + switch (config.fileType) { case FileType::CSV: { - switch (tableType) { - case TableType::NODE: - return countRowsInNodeCSVFile; - case TableType::UNKNOWN: - case TableType::REL: + if (tableType == TableType::REL) { return countRowsNoOp; - default: - throw NotImplementedException{"ReaderFunctions::getCountBlocksFunc"}; + } else { + return config.csvReaderConfig->parallel ? countRowsInParallelCSVFile : + countRowsInSerialCSVFile; } } case FileType::PARQUET: { @@ -52,23 +51,21 @@ count_blocks_func_t ReaderFunctions::getCountBlocksFunc(FileType fileType, Table case FileType::TURTLE: { return countRowsInRDFFile; } - default: { - throw NotImplementedException{"ReaderFunctions::getRowsCounterFunc"}; - } + default: { // LCOV_EXCL_START + throw NotImplementedException{"ReaderFunctions::getCountBlocksFunc"}; + } // LCOV_EXCL_END } } -init_reader_data_func_t ReaderFunctions::getInitDataFunc(FileType fileType, TableType tableType) { - switch (fileType) { +init_reader_data_func_t ReaderFunctions::getInitDataFunc( + const ReaderConfig& config, TableType tableType) { + switch (config.fileType) { case FileType::CSV: { - switch (tableType) { - case TableType::NODE: - case TableType::UNKNOWN: - return initBufferedCSVReadData; - case TableType::REL: + if (tableType == TableType::REL) { return initRelCSVReadData; - default: - throw NotImplementedException{"ReaderFunctions::getInitDataFunc"}; + } else { + return config.csvReaderConfig->parallel ? initParallelCSVReadData : + initSerialCSVReadData; } } case FileType::PARQUET: { @@ -94,17 +91,15 @@ init_reader_data_func_t ReaderFunctions::getInitDataFunc(FileType fileType, Tabl } } -read_rows_func_t ReaderFunctions::getReadRowsFunc(FileType fileType, common::TableType tableType) { - switch (fileType) { +read_rows_func_t ReaderFunctions::getReadRowsFunc( + const ReaderConfig& config, common::TableType tableType) { + switch (config.fileType) { case FileType::CSV: { - switch (tableType) { - case TableType::NODE: - case TableType::UNKNOWN: - return readRowsWithBufferedCSVReader; - case TableType::REL: + if (tableType == TableType::REL) { return readRowsFromRelCSVFile; - default: - throw NotImplementedException{"ReaderFunctions::getReadRowsFunc"}; + } else { + return config.csvReaderConfig->parallel ? readRowsFromParallelCSVFile : + readRowsFromSerialCSVFile; } } case FileType::PARQUET: { @@ -131,17 +126,15 @@ read_rows_func_t ReaderFunctions::getReadRowsFunc(FileType fileType, common::Tab } std::shared_ptr ReaderFunctions::getReadFuncData( - FileType fileType, TableType tableType) { - switch (fileType) { + const ReaderConfig& config, TableType tableType) { + switch (config.fileType) { case FileType::CSV: { - switch (tableType) { - case TableType::NODE: - case TableType::UNKNOWN: - return std::make_shared(); - case TableType::REL: + if (tableType == TableType::REL) { return std::make_shared(); - default: - throw NotImplementedException{"ReaderFunctions::getReadFuncData"}; + } else if (config.csvReaderConfig->parallel) { + return std::make_shared(); + } else { + return std::make_shared(); } } case FileType::PARQUET: { @@ -187,33 +180,41 @@ std::vector ReaderFunctions::countRowsNoOp( return fileInfos; } -static std::unique_ptr createBufferedCSVReader( - const std::string& path, const ReaderConfig& config) { - return std::make_unique( - path, *config.csvReaderConfig, config.getNumColumns()); +std::vector ReaderFunctions::countRowsInSerialCSVFile( + const common::ReaderConfig& config, storage::MemoryManager* memoryManager) { + std::vector fileInfos; + fileInfos.reserve(config.getNumFiles()); + for (const auto& path : config.filePaths) { + auto reader = make_unique(path, config); + row_idx_t numRowsInFile = reader->CountRows(); + block_idx_t numBlocks = + (numRowsInFile + DEFAULT_VECTOR_CAPACITY - 1) / DEFAULT_VECTOR_CAPACITY; + FileBlocksInfo fileBlocksInfo{numRowsInFile, numBlocks}; + fileInfos.push_back(fileBlocksInfo); + } + return fileInfos; } -std::vector ReaderFunctions::countRowsInNodeCSVFile( - const common::ReaderConfig& config, MemoryManager* memoryManager) { +std::vector ReaderFunctions::countRowsInParallelCSVFile( + const common::ReaderConfig& config, storage::MemoryManager* memoryManager) { std::vector fileInfos; fileInfos.reserve(config.getNumFiles()); - auto dataChunk = getDataChunkToRead(config, memoryManager); - // We should add a countNumRows() API to csvReader, so that it doesn't need to read data to - // valueVector when counting the csv file. for (const auto& path : config.filePaths) { - auto reader = createBufferedCSVReader(path, config); - row_idx_t numRowsInFile = 0; - block_idx_t numBlocks = 0; - while (true) { - dataChunk->state->selVector->selectedSize = 0; - dataChunk->resetAuxiliaryBuffer(); - auto numRowsRead = reader->ParseCSV(*dataChunk); - if (numRowsRead == 0) { - break; - } - numRowsInFile += numRowsRead; - numBlocks++; + int fd = open(path.c_str(), O_RDONLY); + if (fd == -1) { + throw CopyException( + StringUtils::string_format("Failed to open file {}: {}", path, strerror(errno))); + } + uint64_t length = lseek(fd, 0, SEEK_END); + close(fd); + if (length == -1) { + throw CopyException(StringUtils::string_format( + "Failed to seek to end of file {}: {}", path, strerror(errno))); } + auto reader = make_unique(path, config); + row_idx_t numRowsInFile = reader->CountRows(); + block_idx_t numBlocks = (length + ParallelCSVReader::PARALLEL_BLOCK_SIZE - 1) / + ParallelCSVReader::PARALLEL_BLOCK_SIZE; FileBlocksInfo fileBlocksInfo{numRowsInFile, numBlocks}; fileInfos.push_back(fileBlocksInfo); } @@ -290,12 +291,20 @@ void ReaderFunctions::initRelCSVReadData(ReaderFunctionData& funcData, vector_id TableCopyUtils::createRelTableCSVReader(config.filePaths[fileIdx], config); } -void ReaderFunctions::initBufferedCSVReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx, +void ReaderFunctions::initSerialCSVReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx, const common::ReaderConfig& config, MemoryManager* memoryManager) { assert(fileIdx < config.getNumFiles()); funcData.fileIdx = fileIdx; - reinterpret_cast(funcData).reader = - createBufferedCSVReader(config.filePaths[fileIdx], config); + reinterpret_cast(funcData).reader = + std::make_unique(config.filePaths[fileIdx], config); +} + +void ReaderFunctions::initParallelCSVReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx, + const common::ReaderConfig& config, MemoryManager* memoryManager) { + assert(fileIdx < config.getNumFiles()); + funcData.fileIdx = fileIdx; + reinterpret_cast(funcData).reader = + std::make_unique(config.filePaths[fileIdx], config); } void ReaderFunctions::initRelParquetReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx, @@ -346,10 +355,23 @@ void ReaderFunctions::readRowsFromRelCSVFile(const kuzu::processor::ReaderFuncti dataChunkToRead->state->selVector->selectedSize = recordBatch->num_rows(); } -void ReaderFunctions::readRowsWithBufferedCSVReader( +void ReaderFunctions::readRowsFromSerialCSVFile( + const ReaderFunctionData& functionData, block_idx_t blockIdx, DataChunk* dataChunkToRead) { + auto& readerData = reinterpret_cast(functionData); + uint64_t numRows = readerData.reader->ParseBlock(blockIdx, *dataChunkToRead); + dataChunkToRead->state->selVector->selectedSize = numRows; +} + +void ReaderFunctions::readRowsFromParallelCSVFile( const ReaderFunctionData& functionData, block_idx_t blockIdx, DataChunk* dataChunkToRead) { - auto& readerData = reinterpret_cast(functionData); - readerData.reader->ParseCSV(*dataChunkToRead); + auto& readerData = reinterpret_cast(functionData); + uint64_t numRows; + if (blockIdx == UINT64_MAX) { + numRows = readerData.reader->ContinueBlock(*dataChunkToRead); + } else { + numRows = readerData.reader->ParseBlock(blockIdx, *dataChunkToRead); + } + dataChunkToRead->state->selVector->selectedSize = numRows; } void ReaderFunctions::readRowsFromRelParquetFile(const ReaderFunctionData& functionData, diff --git a/src/processor/operator/persistent/reader_state.cpp b/src/processor/operator/persistent/reader_state.cpp index b351d3c0920..3fd5a5a8ddf 100644 --- a/src/processor/operator/persistent/reader_state.cpp +++ b/src/processor/operator/persistent/reader_state.cpp @@ -66,10 +66,9 @@ void LeftArrowArrays::appendToDataChunk(common::DataChunk* dataChunk, uint64_t n void ReaderSharedState::initialize(TableType tableType) { validateFunc = ReaderFunctions::getValidateFunc(readerConfig->fileType); - initFunc = ReaderFunctions::getInitDataFunc(readerConfig->fileType, tableType); - countBlocksFunc = ReaderFunctions::getCountBlocksFunc(readerConfig->fileType, tableType); - readFunc = ReaderFunctions::getReadRowsFunc(readerConfig->fileType, tableType); - readFuncData = ReaderFunctions::getReadFuncData(readerConfig->fileType, tableType); + initFunc = ReaderFunctions::getInitDataFunc(*readerConfig, tableType); + countBlocksFunc = ReaderFunctions::getCountBlocksFunc(*readerConfig, tableType); + readFuncData = ReaderFunctions::getReadFuncData(*readerConfig, tableType); } void ReaderSharedState::validate() const { diff --git a/test/test_files/exceptions/binder/binder_error.test b/test/test_files/exceptions/binder/binder_error.test index 39f86f768d9..70e9542688b 100644 --- a/test/test_files/exceptions/binder/binder_error.test +++ b/test/test_files/exceptions/binder/binder_error.test @@ -285,7 +285,7 @@ Binder exception: No file found that matches the pattern: wrong_path.parquet. -LOG CopyCSVInvalidParsingOption -STATEMENT COPY person FROM "person_0_0.csv" (pk=",") ---- error -Binder exception: Unrecognized parsing csv option: PK. +Binder exception: Unrecognized csv parsing option: PK. -LOG CopyCSVInvalidSchemaName -STATEMENT COPY university FROM "person_0_0.csv" (pk=",") diff --git a/third_party/miniparquet/CMakeLists.txt b/third_party/miniparquet/CMakeLists.txt index a4e2af57711..7b4056b713e 100644 --- a/third_party/miniparquet/CMakeLists.txt +++ b/third_party/miniparquet/CMakeLists.txt @@ -21,6 +21,8 @@ include_directories(src/parquet src/snappy src/thrift) +add_compile_definitions(HAVE_STDINT_H) + add_library(miniparquet STATIC src/parquet/parquet_constants.cpp src/parquet/parquet_types.cpp