From b1bd21f6857c45cb2996943f1d50def6842b8483 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Fri, 3 Mar 2023 00:14:57 -0500 Subject: [PATCH] Remove csv reader --- dataset/copy-node-property-test/vPerson.csv | 6 +- src/common/CMakeLists.txt | 2 +- src/common/copier_config/CMakeLists.txt | 7 + src/common/copier_config/copier_config.cpp | 79 ++++ src/common/csv_reader/CMakeLists.txt | 7 - src/common/csv_reader/csv_reader.cpp | 399 ------------------ src/include/binder/binder.h | 2 +- src/include/binder/copy/bound_copy.h | 2 +- .../common/copier_config/copier_config.h | 52 +++ src/include/common/csv_reader/csv_reader.h | 106 ----- .../logical_operator/logical_copy.h | 2 +- src/include/processor/operator/copy/copy.h | 2 +- .../copy_arrow/copy_structures_arrow.h | 2 +- test/copy/copy_test.cpp | 31 +- 14 files changed, 169 insertions(+), 530 deletions(-) create mode 100644 src/common/copier_config/CMakeLists.txt create mode 100644 src/common/copier_config/copier_config.cpp delete mode 100644 src/common/csv_reader/CMakeLists.txt delete mode 100644 src/common/csv_reader/csv_reader.cpp create mode 100644 src/include/common/copier_config/copier_config.h delete mode 100644 src/include/common/csv_reader/csv_reader.h diff --git a/dataset/copy-node-property-test/vPerson.csv b/dataset/copy-node-property-test/vPerson.csv index 48e283e826..eab620810a 100644 --- a/dataset/copy-node-property-test/vPerson.csv +++ b/dataset/copy-node-property-test/vPerson.csv @@ -1,12 +1,12 @@ 0, 1,"ozwhvnetnq" -2,"kuk,qg\\nrspmk" -3,"wmz,1234\lamo" +2,"kuk,qgnrspmk" +3,"wmz,1234lamo" 4,"tudoojdduf" 5,"qifidjufri" 6,"gqpnpbdmrb" 7,"dgzbiqjkaz" -8,"ebf,,uq\buqma" +8,"ebf,,uqbuqma" 9,"rwhnybogfy" 10,"enqpnymvdb" 11,"axgwwhhohf" diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 6a31ecd383..ce68ce79c5 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -1,5 +1,5 @@ add_subdirectory(arrow) -add_subdirectory(csv_reader) +add_subdirectory(copier_config) add_subdirectory(data_chunk) add_subdirectory(task_system) add_subdirectory(types) diff --git a/src/common/copier_config/CMakeLists.txt b/src/common/copier_config/CMakeLists.txt new file mode 100644 index 0000000000..fba3f7abf2 --- /dev/null +++ b/src/common/copier_config/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(kuzu_common_copier_config + OBJECT + copier_config.cpp) + +set(ALL_OBJECT_FILES + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/common/copier_config/copier_config.cpp b/src/common/copier_config/copier_config.cpp new file mode 100644 index 0000000000..e7cc3a8a56 --- /dev/null +++ b/src/common/copier_config/copier_config.cpp @@ -0,0 +1,79 @@ +#include "common/copier_config/copier_config.h" + +#include "common/constants.h" +#include "common/type_utils.h" +#include "common/utils.h" +#include "spdlog/spdlog.h" +#include "utf8proc_wrapper.h" + +using namespace kuzu::utf8proc; + +namespace kuzu { +namespace common { +CopyDescription::CopyDescription(const std::string& filePath, CSVReaderConfig csvReaderConfig) + : filePath{filePath}, csvReaderConfig{nullptr}, fileType{FileType::CSV} { + setFileType(filePath); + if (fileType == FileType::CSV) { + this->csvReaderConfig = std::make_unique(csvReaderConfig); + } +} + +CopyDescription::CopyDescription(const CopyDescription& copyDescription) + : filePath{copyDescription.filePath}, csvReaderConfig{nullptr}, fileType{ + copyDescription.fileType} { + if (fileType == FileType::CSV) { + this->csvReaderConfig = std::make_unique(*copyDescription.csvReaderConfig); + } +} + +std::string CopyDescription::getFileTypeName(FileType fileType) { + switch (fileType) { + case FileType::CSV: + return "csv"; + + case FileType::ARROW: + return "arrow"; + + case FileType::PARQUET: + return "parquet"; + } +} + +std::string CopyDescription::getFileTypeSuffix(FileType fileType) { + return "." + getFileTypeName(fileType); +} + +void CopyDescription::setFileType(std::string const& fileName) { + auto csvSuffix = getFileTypeSuffix(FileType::CSV); + auto arrowSuffix = getFileTypeSuffix(FileType::ARROW); + auto parquetSuffix = getFileTypeSuffix(FileType::PARQUET); + + if (fileName.length() >= csvSuffix.length()) { + if (!fileName.compare( + fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) { + fileType = FileType::CSV; + return; + } + } + + if (fileName.length() >= arrowSuffix.length()) { + if (!fileName.compare( + fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) { + fileType = FileType::ARROW; + return; + } + } + + if (fileName.length() >= parquetSuffix.length()) { + if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(), + parquetSuffix)) { + fileType = FileType::PARQUET; + return; + } + } + + throw CopyException("Unsupported file type: " + fileName); +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/csv_reader/CMakeLists.txt b/src/common/csv_reader/CMakeLists.txt deleted file mode 100644 index 5730117e81..0000000000 --- a/src/common/csv_reader/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -add_library(kuzu_common_csv_reader - OBJECT - csv_reader.cpp) - -set(ALL_OBJECT_FILES - ${ALL_OBJECT_FILES} $ - PARENT_SCOPE) diff --git a/src/common/csv_reader/csv_reader.cpp b/src/common/csv_reader/csv_reader.cpp deleted file mode 100644 index 8d83cfdb21..0000000000 --- a/src/common/csv_reader/csv_reader.cpp +++ /dev/null @@ -1,399 +0,0 @@ -#include "common/csv_reader/csv_reader.h" - -#include "common/constants.h" -#include "common/type_utils.h" -#include "common/utils.h" -#include "spdlog/spdlog.h" -#include "utf8proc_wrapper.h" - -using namespace kuzu::utf8proc; - -namespace kuzu { -namespace common { - -CSVReader::CSVReader(const std::string& fName, const CSVReaderConfig& config, uint64_t blockId) - : CSVReader{fName, config} { - readingBlockStartOffset = CopyConstants::CSV_READING_BLOCK_SIZE * blockId; - readingBlockEndOffset = CopyConstants::CSV_READING_BLOCK_SIZE * (blockId + 1); - auto isBeginningOfLine = false; - if (0 == readingBlockStartOffset) { - isBeginningOfLine = true; - } else { - fseek(fd, readingBlockStartOffset - 1, SEEK_SET); - if ('\n' == fgetc(fd)) { - isBeginningOfLine = true; - } - } - if (!isBeginningOfLine) { - while ('\n' != fgetc(fd)) {} - } -} - -CSVReader::CSVReader(const std::string& fname, const CSVReaderConfig& config) - : CSVReader{(char*)malloc(sizeof(char) * 1024), 0, -1l, config} { - openFile(fname); -} - -CSVReader::CSVReader( - char* line, uint64_t lineLen, int64_t linePtrStart, const CSVReaderConfig& config) - : fd{nullptr}, config{config}, logger{LoggerUtils::getLogger( - LoggerConstants::LoggerEnum::CSV_READER)}, - nextLineIsNotProcessed{false}, isEndOfBlock{false}, - nextTokenIsNotProcessed{false}, line{line}, lineCapacity{1024}, lineLen{lineLen}, - linePtrStart{linePtrStart}, linePtrEnd{linePtrStart}, readingBlockStartOffset{0}, - readingBlockEndOffset{UINT64_MAX}, nextTokenLen{UINT64_MAX} {} - -CSVReader::~CSVReader() { - // fd can be nullptr when the CSVReader is constructed by passing a char*, so it is reading over - // a substring instead of a file. - if (fd != nullptr) { - fclose(fd); - free(line); - } -} - -bool CSVReader::hasNextLine() { - // the block has already been ended, return false. - if (isEndOfBlock) { - return false; - } - // the previous line has not been processed yet, return true. - if (nextLineIsNotProcessed) { - return true; - } - // file cursor is past the block limit, end the block, return false. - auto curPos = ftell(fd); - if (curPos >= readingBlockEndOffset) { - isEndOfBlock = true; - return false; - } - // else, read the next line. The function getline() will dynamically allocate a larger line and - // update the lineCapacity accordingly if the length of the line exceeds the lineCapacity. - // We keep curPos in case the very final line does not have a \n character in which case - // we will seek back to where we were and read it without using getLine (inside the if). - lineLen = getline(&line, &lineCapacity, fd); - if (lineLen == (ssize_t)-1) { - isEndOfBlock = true; - return false; - } - // Text files created on DOS/Windows machines have different line endings than files created on - // Unix/Linux. DOS uses carriage return and line feed ("\r\n") as a line ending, which Unix uses - // just line feed ("\n"). If the current line uses dos-style newline, we should replace the - // '\r\n' with the linux-style newline '\n'. - if (lineLen > 1 && line[lineLen - 1] == '\n' && line[lineLen - 2] == '\r') { - line[lineLen - 2] = '\n'; - lineLen -= 1; - } - if (feof(fd)) { - // According to POSIX getline manual (https://man7.org/linux/man-pages/man3/getline.3.html) - // the behavior of getline when in reaches an end of file is underdefined in terms of how - // it leaves the first (line) argument above. Instead, we re-read the file, this time - // with fgets (https://en.cppreference.com/w/c/io/fgets), whose behavior is clear and will - // guarantee. - // We first determine the last offset of the file: - fseek(fd, 0L, SEEK_END); - auto lastPos = ftell(fd); - isEndOfBlock = true; - auto sizeOfRemainder = lastPos - curPos; - if (sizeOfRemainder <= 0) { - return false; - } - - if (lineCapacity < sizeOfRemainder) { - // Note: We don't have tests testing this case because although according to - // getline's documentation, the behavior is undefined, the getline call above - // (before the feof check) seems to be increasing the lineCapacity for the - // last lines without newline character. So this is here for safety but is - // not tested. - free(line); - // We are adding + 1 for the additional \n character we will append. - line = (char*)malloc(sizeOfRemainder + 1); - } - fseek(fd, curPos, SEEK_SET); - fgets(line, (int)sizeOfRemainder + 1, fd); - line[sizeOfRemainder] = '\n'; - lineLen = sizeOfRemainder; - } - // The line is empty - if (lineLen < 2) { - return false; - } - linePtrStart = linePtrEnd = -1; - return true; -} - -void CSVReader::skipLine() { - nextLineIsNotProcessed = false; -} - -bool CSVReader::skipTokenIfNull() { - if (linePtrEnd - linePtrStart == 0) { - nextLineIsNotProcessed = false; - return true; - } - return false; -} - -void CSVReader::skipToken() { - setNextTokenIsProcessed(); -} - -bool CSVReader::hasNextToken() { - if (nextTokenIsNotProcessed) { - return true; - } - linePtrEnd++; - linePtrStart = linePtrEnd; - if (linePtrEnd >= lineLen) { - nextLineIsNotProcessed = false; - return false; - } - nextTokenLen = 0; - bool isQuotedString = false; - uint32_t nestedListLevel = 0; - bool isList = false; - - if (config.quoteChar == line[linePtrEnd]) { - linePtrStart++; - linePtrEnd++; - isQuotedString = true; - } - if (config.listBeginChar == line[linePtrEnd]) { - linePtrStart++; - linePtrEnd++; - nestedListLevel++; - isList = true; - } - std::string lineStr; - while (true) { - if (isQuotedString) { - // ignore delimiter and new line character here - if (config.quoteChar == line[linePtrEnd]) { - break; - } else if (config.escapeChar == line[linePtrEnd]) { - // escape next special character - linePtrEnd++; - } - } else if (isList) { - // ignore delimiter and new line character here - if (config.listBeginChar == line[linePtrEnd]) { - linePtrEnd++; - nestedListLevel++; - } else if (config.listEndChar == line[linePtrEnd]) { - nestedListLevel--; - } - if (nestedListLevel == 0) { - break; - } - } else if (config.delimiter == line[linePtrEnd] || '\n' == line[linePtrEnd] || - linePtrEnd == lineLen) { - break; - } - lineStr += line[linePtrEnd]; - nextTokenLen++; - linePtrEnd++; - } - line[linePtrEnd] = 0; - if (isQuotedString) { - strncpy(line + linePtrStart, lineStr.c_str(), lineStr.length() + 1); - // if this is a std::string literal, skip the next comma as well - linePtrEnd++; - } - if (isList) { - // skip the next comma - linePtrEnd++; - } - return true; -} - -bool CSVReader::hasNextTokenOrError() { - if (!hasNextToken()) { - throw ReaderException( - StringUtils::string_format("CSV Reader was expecting more tokens but the line does not " - "have any tokens left. Last token: {}", - line + linePtrStart)); - } - return true; -} - -int64_t CSVReader::getInt64() { - setNextTokenIsProcessed(); - return TypeUtils::convertStringToNumber(line + linePtrStart); -} - -double_t CSVReader::getDouble() { - setNextTokenIsProcessed(); - return TypeUtils::convertStringToNumber(line + linePtrStart); -} - -uint8_t CSVReader::getBoolean() { - setNextTokenIsProcessed(); - return TypeUtils::convertToBoolean(line + linePtrStart); -} - -char* CSVReader::getString() { - setNextTokenIsProcessed(); - auto strVal = line + linePtrStart; - if (strlen(strVal) > BufferPoolConstants::DEFAULT_PAGE_SIZE) { - logger->warn( - StringUtils::getLongStringErrorMessage(strVal, BufferPoolConstants::DEFAULT_PAGE_SIZE)); - // If the std::string is too long, truncate it. - strVal[BufferPoolConstants::DEFAULT_PAGE_SIZE] = '\0'; - } - auto unicodeType = Utf8Proc::analyze(strVal, strlen(strVal)); - if (unicodeType == UnicodeType::ASCII) { - return strVal; - } else if (unicodeType == UnicodeType::UNICODE) { - return Utf8Proc::normalize(strVal, strlen(strVal)); - } else { - throw ReaderException("Invalid UTF-8 character encountered."); - } -} - -date_t CSVReader::getDate() { - date_t retVal = Date::FromCString(line + linePtrStart, nextTokenLen); - setNextTokenIsProcessed(); - return retVal; -} - -timestamp_t CSVReader::getTimestamp() { - timestamp_t retVal = Timestamp::FromCString(line + linePtrStart, nextTokenLen); - setNextTokenIsProcessed(); - return retVal; -} - -interval_t CSVReader::getInterval() { - interval_t retVal = Interval::FromCString(line + linePtrStart, nextTokenLen); - setNextTokenIsProcessed(); - return retVal; -} - -std::unique_ptr CSVReader::getList(const DataType& dataType) { - std::vector> listVal; - // Move the linePtrStart one character forward, because hasNextToken() will first increment it. - CSVReader listCSVReader(line, linePtrEnd - 1, linePtrStart - 1, config); - while (listCSVReader.hasNextToken()) { - if (!listCSVReader.skipTokenIfNull()) { - std::unique_ptr val; - switch (dataType.typeID) { - case INT64: { - val = std::make_unique(listCSVReader.getInt64()); - } break; - case DOUBLE: { - val = std::make_unique(listCSVReader.getDouble()); - } break; - case BOOL: { - val = std::make_unique((bool)listCSVReader.getBoolean()); - } break; - case STRING: { - val = std::make_unique(std::string(listCSVReader.getString())); - } break; - case DATE: { - val = std::make_unique(listCSVReader.getDate()); - } break; - case TIMESTAMP: { - val = std::make_unique(listCSVReader.getTimestamp()); - } break; - case INTERVAL: { - val = std::make_unique(listCSVReader.getInterval()); - } break; - case VAR_LIST: { - val = listCSVReader.getList(*dataType.childType); - } break; - default: - throw ReaderException("Unsupported data type " + - Types::dataTypeToString(dataType.childType->typeID) + - " inside LIST"); - } - listVal.push_back(std::move(val)); - } - } - auto numBytesOfOverflow = listVal.size() * Types::getDataTypeSize(dataType.typeID); - if (numBytesOfOverflow >= BufferPoolConstants::DEFAULT_PAGE_SIZE) { - throw ReaderException(StringUtils::string_format( - "Maximum num bytes of a LIST is {}. Input list's num bytes is {}.", - BufferPoolConstants::DEFAULT_PAGE_SIZE, numBytesOfOverflow)); - } - return std::make_unique( - DataType(VAR_LIST, std::make_unique(dataType)), std::move(listVal)); -} - -void CSVReader::setNextTokenIsProcessed() { - nextTokenIsNotProcessed = false; - nextTokenLen = UINT64_MAX; -} - -void CSVReader::openFile(const std::string& fName) { - fd = fopen(fName.c_str(), "r"); - if (nullptr == fd) { - throw ReaderException("Cannot open file: " + fName); - } -} - -CopyDescription::CopyDescription(const std::string& filePath, CSVReaderConfig csvReaderConfig) - : filePath{filePath}, csvReaderConfig{nullptr}, fileType{FileType::CSV} { - setFileType(filePath); - if (fileType == FileType::CSV) { - this->csvReaderConfig = std::make_unique(csvReaderConfig); - } -} - -CopyDescription::CopyDescription(const CopyDescription& copyDescription) - : filePath{copyDescription.filePath}, csvReaderConfig{nullptr}, fileType{ - copyDescription.fileType} { - if (fileType == FileType::CSV) { - this->csvReaderConfig = std::make_unique(*copyDescription.csvReaderConfig); - } -} - -std::string CopyDescription::getFileTypeName(FileType fileType) { - switch (fileType) { - case FileType::CSV: - return "csv"; - - case FileType::ARROW: - return "arrow"; - - case FileType::PARQUET: - return "parquet"; - } -} - -std::string CopyDescription::getFileTypeSuffix(FileType fileType) { - return "." + getFileTypeName(fileType); -} - -void CopyDescription::setFileType(std::string const& fileName) { - auto csvSuffix = getFileTypeSuffix(FileType::CSV); - auto arrowSuffix = getFileTypeSuffix(FileType::ARROW); - auto parquetSuffix = getFileTypeSuffix(FileType::PARQUET); - - if (fileName.length() >= csvSuffix.length()) { - if (!fileName.compare( - fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) { - fileType = FileType::CSV; - return; - } - } - - if (fileName.length() >= arrowSuffix.length()) { - if (!fileName.compare( - fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) { - fileType = FileType::ARROW; - return; - } - } - - if (fileName.length() >= parquetSuffix.length()) { - if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(), - parquetSuffix)) { - fileType = FileType::PARQUET; - return; - } - } - - throw CopyException("Unsupported file type: " + fileName); -} - -} // namespace common -} // namespace kuzu diff --git a/src/include/binder/binder.h b/src/include/binder/binder.h index fc56a2217d..8a43117360 100644 --- a/src/include/binder/binder.h +++ b/src/include/binder/binder.h @@ -1,7 +1,7 @@ #pragma once #include "binder/query/bound_regular_query.h" -#include "common/csv_reader/csv_reader.h" +#include "common/copier_config/copier_config.h" #include "expression_binder.h" #include "parser/query/regular_query.h" #include "query_normalizer.h" diff --git a/src/include/binder/copy/bound_copy.h b/src/include/binder/copy/bound_copy.h index 49fcc6f6a4..c0204a24dd 100644 --- a/src/include/binder/copy/bound_copy.h +++ b/src/include/binder/copy/bound_copy.h @@ -6,7 +6,7 @@ #include "binder/bound_statement.h" #include "catalog/catalog_structs.h" -#include "common/csv_reader/csv_reader.h" +#include "common/copier_config/copier_config.h" namespace kuzu { namespace binder { diff --git a/src/include/common/copier_config/copier_config.h b/src/include/common/copier_config/copier_config.h new file mode 100644 index 0000000000..bb3d0a09f4 --- /dev/null +++ b/src/include/common/copier_config/copier_config.h @@ -0,0 +1,52 @@ +#pragma once + +#include + +#include "common/constants.h" +#include "common/types/types_include.h" +#include "common/types/value.h" + +namespace spdlog { +class logger; +} + +namespace kuzu { +namespace common { + +struct CSVReaderConfig { + CSVReaderConfig() + : escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR}, + delimiter{CopyConstants::DEFAULT_CSV_DELIMITER}, + quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR}, + listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR}, + listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR}, + hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {} + + char escapeChar; + char delimiter; + char quoteChar; + char listBeginChar; + char listEndChar; + bool hasHeader; +}; + +struct CopyDescription { + CopyDescription(const std::string& filePath, CSVReaderConfig csvReaderConfig); + + CopyDescription(const CopyDescription& copyDescription); + + enum class FileType { CSV, ARROW, PARQUET }; + + static std::string getFileTypeName(FileType fileType); + + static std::string getFileTypeSuffix(FileType fileType); + + void setFileType(std::string const& fileName); + + const std::string filePath; + std::unique_ptr csvReaderConfig; + FileType fileType; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/csv_reader/csv_reader.h b/src/include/common/csv_reader/csv_reader.h deleted file mode 100644 index 2e1cb248ad..0000000000 --- a/src/include/common/csv_reader/csv_reader.h +++ /dev/null @@ -1,106 +0,0 @@ -#pragma once - -#include - -#include "common/constants.h" -#include "common/types/types_include.h" -#include "common/types/value.h" - -namespace spdlog { -class logger; -} - -namespace kuzu { -namespace common { - -struct CSVReaderConfig { - CSVReaderConfig() - : escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR}, - delimiter{CopyConstants::DEFAULT_CSV_DELIMITER}, - quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR}, - listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR}, - listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR}, - hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {} - - char escapeChar; - char delimiter; - char quoteChar; - char listBeginChar; - char listEndChar; - bool hasHeader; -}; - -struct CopyDescription { - CopyDescription(const std::string& filePath, CSVReaderConfig csvReaderConfig); - - CopyDescription(const CopyDescription& copyDescription); - - enum class FileType { CSV, ARROW, PARQUET }; - - static std::string getFileTypeName(FileType fileType); - - static std::string getFileTypeSuffix(FileType fileType); - - void setFileType(std::string const& fileName); - - const std::string filePath; - std::unique_ptr csvReaderConfig; - FileType fileType; -}; - -// TODO(Guodong): Remove this class and file and related code. -class CSVReader { - -public: - // Initializes to read a block in file. - CSVReader(const std::string& fname, const CSVReaderConfig& csvReaderConfig, uint64_t blockId); - // Initializes to read the complete file. - CSVReader(const std::string& fname, const CSVReaderConfig& csvReaderConfig); - // Initializes to read a part of a line. - CSVReader( - char* line, uint64_t lineLen, int64_t linePtrStart, const CSVReaderConfig& csvReaderConfig); - - ~CSVReader(); - - // returns true if there are more lines to be parsed in a block of a CSV file, else false. - bool hasNextLine(); - // returns true if the currently-pointed to line has more data to be parsed, else false. - bool hasNextToken(); - bool hasNextTokenOrError(); - // Marks the currently-pointed to line as processed. hasNextLine() has to be called the move the - // iterator to the next line. - void skipLine(); - // Marks the currently-pointed to token as processed. hasNextToken() has to be called the move - // the iterator to the next token of the line. - void skipToken(); - // skips the token only if it is empty and returns the result of operation. - bool skipTokenIfNull(); - - // reads the data currently pointed to by the cursor and returns the value in a specific format. - int64_t getInt64(); - double_t getDouble(); - uint8_t getBoolean(); - char* getString(); - date_t getDate(); - timestamp_t getTimestamp(); - interval_t getInterval(); - std::unique_ptr getList(const DataType& dataType); - -private: - void openFile(const std::string& fName); - void setNextTokenIsProcessed(); - -private: - FILE* fd; - const CSVReaderConfig& config; - std::shared_ptr logger; - bool nextLineIsNotProcessed, isEndOfBlock, nextTokenIsNotProcessed; - char* line; - size_t lineCapacity, lineLen; - int64_t linePtrStart, linePtrEnd; - size_t readingBlockStartOffset, readingBlockEndOffset; - uint64_t nextTokenLen; -}; - -} // namespace common -} // namespace kuzu diff --git a/src/include/planner/logical_plan/logical_operator/logical_copy.h b/src/include/planner/logical_plan/logical_operator/logical_copy.h index cda6eae41b..2d50dde844 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_copy.h +++ b/src/include/planner/logical_plan/logical_operator/logical_copy.h @@ -2,7 +2,7 @@ #include "base_logical_operator.h" #include "catalog/catalog_structs.h" -#include "common/csv_reader/csv_reader.h" +#include "common/copier_config/copier_config.h" namespace kuzu { namespace planner { diff --git a/src/include/processor/operator/copy/copy.h b/src/include/processor/operator/copy/copy.h index 33a2b15eb1..ed8ca573ce 100644 --- a/src/include/processor/operator/copy/copy.h +++ b/src/include/processor/operator/copy/copy.h @@ -1,6 +1,6 @@ #pragma once -#include "common/csv_reader/csv_reader.h" +#include "common/copier_config/copier_config.h" #include "common/task_system/task_scheduler.h" #include "processor/operator/physical_operator.h" #include "storage/store/nodes_statistics_and_deleted_ids.h" diff --git a/src/include/storage/copy_arrow/copy_structures_arrow.h b/src/include/storage/copy_arrow/copy_structures_arrow.h index dae7b1b9e6..a923151f9c 100644 --- a/src/include/storage/copy_arrow/copy_structures_arrow.h +++ b/src/include/storage/copy_arrow/copy_structures_arrow.h @@ -1,7 +1,7 @@ #pragma once #include "catalog/catalog.h" -#include "common/csv_reader/csv_reader.h" +#include "common/copier_config/copier_config.h" #include "common/logging_level_utils.h" #include "common/task_system/task_scheduler.h" #include "storage/in_mem_storage_structure/in_mem_column.h" diff --git a/test/copy/copy_test.cpp b/test/copy/copy_test.cpp index 49e3f9c408..6090c5c6fc 100644 --- a/test/copy/copy_test.cpp +++ b/test/copy/copy_test.cpp @@ -1,4 +1,7 @@ -#include "common/csv_reader/csv_reader.h" +#include +#include +#include + #include "graph_test/graph_test.h" #include "json.hpp" #include "storage/storage_manager.h" @@ -94,25 +97,35 @@ TEST_F(CopyNodePropertyTest, NodeStructuredStringPropertyTest) { graph->getNodesStore().getNodePropertyColumn(tableID, propertyIdx.propertyID)); std::string fName = TestHelper::appendKuzuRootPath("dataset/copy-node-property-test/vPerson.csv"); - CSVReaderConfig config; - CSVReader csvReader(fName, config); + std::ifstream f(fName); + ASSERT_TRUE(f.is_open()); int lineIdx = 0; uint64_t count = 0; auto dummyReadOnlyTrx = Transaction::getDummyReadOnlyTrx(); - while (csvReader.hasNextLine()) { - csvReader.hasNextToken(); - csvReader.skipToken(); - csvReader.hasNextToken(); + while (f.good()) { + std::string line; + std::getline(f, line); + if (line.empty()) { + continue; + } + auto pos = line.find(","); + line = line.substr(pos + 1); + if (line[0] == '"') { + line = line.substr(1); + } + if (line[line.length() - 1] == '"') { + line = line.substr(0, line.length() - 1); + } if ((count % 100) == 0) { ASSERT_TRUE(column->isNull(count /* nodeOffset */, dummyReadOnlyTrx.get())); } else { ASSERT_FALSE(column->isNull(count /* nodeOffset */, dummyReadOnlyTrx.get())); - EXPECT_EQ(std::string(csvReader.getString()), column->readValue(lineIdx).strVal); + EXPECT_EQ(line, column->readValue(lineIdx).strVal); } lineIdx++; - csvReader.skipLine(); count++; } + f.close(); } void verifyP0ToP5999(KnowsTablePTablePKnowsLists& knowsTablePTablePKnowsLists) {