diff --git a/CMakeLists.txt b/CMakeLists.txt index 7b794ae9ee2..9b36a99a58e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.11) -project(Kuzu VERSION 0.0.7 LANGUAGES CXX) +project(Kuzu VERSION 0.0.7.1 LANGUAGES CXX) find_package(Threads REQUIRED) diff --git a/src/common/exception.cpp b/src/common/exception.cpp index 060a9f22e4a..056387769e4 100644 --- a/src/common/exception.cpp +++ b/src/common/exception.cpp @@ -12,6 +12,10 @@ std::string ExceptionMessage::existedPKException(const std::string& pkString) { pkString); } +std::string ExceptionMessage::nonExistPKException(const std::string& pkString) { + return StringUtils::string_format("Found non-existed primary key value {}.", pkString); +} + std::string ExceptionMessage::invalidPKType(const std::string& type) { return StringUtils::string_format( "Invalid primary key column type {}. Primary key must be either INT64, STRING or SERIAL.", diff --git a/src/include/common/exception.h b/src/include/common/exception.h index e0812fbd531..b5ffc76be3c 100644 --- a/src/include/common/exception.h +++ b/src/include/common/exception.h @@ -8,6 +8,7 @@ namespace common { struct ExceptionMessage { static std::string existedPKException(const std::string& pkString); + static std::string nonExistPKException(const std::string& pkString); static std::string invalidPKType(const std::string& type); static inline std::string nullPKException() { return "Found NULL, which violates the non-null constraint of the primary key column."; diff --git a/src/include/processor/operator/copy_from/read_csv.h b/src/include/processor/operator/copy_from/read_csv.h deleted file mode 100644 index 3fd8a6646d6..00000000000 --- a/src/include/processor/operator/copy_from/read_csv.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include "processor/operator/copy_from/read_file.h" - -namespace kuzu { -namespace processor { - -class ReadCSV : public ReadFile { -public: - ReadCSV(const DataPos& nodeOffsetPos, std::vector dataColumnPoses, - std::shared_ptr sharedState, uint32_t id, - const std::string& paramsString) - : ReadFile{nodeOffsetPos, std::move(dataColumnPoses), std::move(sharedState), - PhysicalOperatorType::READ_CSV, id, paramsString, true} {} - - inline std::shared_ptr readTuples( - std::unique_ptr morsel) override { - auto csvMorsel = reinterpret_cast(morsel.get()); - return csvMorsel->recordTable; - } - - inline std::unique_ptr clone() override { - return std::make_unique( - nodeOffsetPos, dataColumnPoses, sharedState, id, paramsString); - } -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/copy_from/read_file.h b/src/include/processor/operator/copy_from/read_file.h deleted file mode 100644 index 83e9c4c78dd..00000000000 --- a/src/include/processor/operator/copy_from/read_file.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include "processor/operator/physical_operator.h" -#include "storage/copier/read_file_state.h" - -namespace kuzu { -namespace processor { - -class ReadFile : public PhysicalOperator { -public: - ReadFile(const DataPos& nodeOffsetPos, std::vector dataColumnPoses, - std::shared_ptr sharedState, - PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString, - bool preservingOrder) - : PhysicalOperator{operatorType, id, paramsString}, nodeOffsetPos{nodeOffsetPos}, - dataColumnPoses{std::move(dataColumnPoses)}, sharedState{std::move(sharedState)}, - preservingOrder{preservingOrder} {} - - inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override { - sharedState->countNumRows(); - } - - inline bool isSource() const override { return true; } - -protected: - virtual std::shared_ptr readTuples( - std::unique_ptr morsel) = 0; - - bool getNextTuplesInternal(ExecutionContext* context) override; - -protected: - std::shared_ptr sharedState; - DataPos nodeOffsetPos; - std::vector dataColumnPoses; - bool preservingOrder; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/copy_from/read_npy.h b/src/include/processor/operator/copy_from/read_npy.h deleted file mode 100644 index baff50b3358..00000000000 --- a/src/include/processor/operator/copy_from/read_npy.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include "processor/operator/copy_from/read_file.h" -#include "storage/copier/npy_reader.h" -#include "storage/in_mem_storage_structure/in_mem_column.h" - -namespace kuzu { -namespace processor { - -class ReadNPY : public ReadFile { -public: - ReadNPY(const DataPos& nodeOffsetPos, std::vector dataColumnPoses, - std::shared_ptr sharedState, uint32_t id, - const std::string& paramsString, bool preservingOrder) - : ReadFile{nodeOffsetPos, std::move(dataColumnPoses), std::move(sharedState), - PhysicalOperatorType::READ_NPY, id, paramsString, preservingOrder} { - reader = std::make_unique(this->sharedState->filePaths); - } - - std::shared_ptr readTuples(std::unique_ptr morsel) final; - - inline std::unique_ptr clone() final { - return std::make_unique( - nodeOffsetPos, dataColumnPoses, sharedState, id, paramsString, preservingOrder); - } - -private: - std::unique_ptr reader; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/copy_from/read_parquet.h b/src/include/processor/operator/copy_from/read_parquet.h deleted file mode 100644 index 9dd42c106ba..00000000000 --- a/src/include/processor/operator/copy_from/read_parquet.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include "processor/operator/copy_from/read_file.h" - -namespace kuzu { -namespace processor { - -class ReadParquet : public ReadFile { -public: - ReadParquet(const DataPos& nodeOffsetPos, std::vector dataColumnPoses, - std::shared_ptr sharedState, uint32_t id, - const std::string& paramsString, bool preservingOrder) - : ReadFile{nodeOffsetPos, std::move(dataColumnPoses), std::move(sharedState), - PhysicalOperatorType::READ_PARQUET, id, paramsString, preservingOrder} {} - - std::shared_ptr readTuples( - std::unique_ptr morsel) override; - - inline std::unique_ptr clone() override { - return std::make_unique( - nodeOffsetPos, dataColumnPoses, sharedState, id, paramsString, preservingOrder); - } - -private: - std::unique_ptr reader; - std::string filePath; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/copy_from/copy.h b/src/include/processor/operator/persistent/copy.h similarity index 100% rename from src/include/processor/operator/copy_from/copy.h rename to src/include/processor/operator/persistent/copy.h diff --git a/src/include/processor/operator/copy_from/copy_node.h b/src/include/processor/operator/persistent/copy_node.h similarity index 94% rename from src/include/processor/operator/copy_from/copy_node.h rename to src/include/processor/operator/persistent/copy_node.h index 1b9eb18a59c..13e538e3cbe 100644 --- a/src/include/processor/operator/copy_from/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -20,6 +20,12 @@ class CopyNodeSharedState { std::unique_lock lck{mtx}; return getNextNodeGroupIdxWithoutLock(); } + inline void setNextNodeGroupIdx(common::node_group_idx_t nextNodeGroupIdx) { + std::unique_lock lck{mtx}; + if (nextNodeGroupIdx > currentNodeGroupIdx) { + currentNodeGroupIdx = nextNodeGroupIdx; + } + } void logCopyNodeWALRecord(storage::WAL* wal); diff --git a/src/include/processor/operator/copy_from/copy_rel.h b/src/include/processor/operator/persistent/copy_rel.h similarity index 96% rename from src/include/processor/operator/copy_from/copy_rel.h rename to src/include/processor/operator/persistent/copy_rel.h index f11f4d6bdb3..1e5477698f6 100644 --- a/src/include/processor/operator/copy_from/copy_rel.h +++ b/src/include/processor/operator/persistent/copy_rel.h @@ -1,6 +1,6 @@ #pragma once -#include "processor/operator/copy_from/copy.h" +#include "processor/operator/persistent/copy.h" #include "storage/store/nodes_store.h" #include "storage/store/rels_store.h" diff --git a/src/include/processor/operator/copy_to/copy_to.h b/src/include/processor/operator/persistent/copy_to.h similarity index 96% rename from src/include/processor/operator/copy_to/copy_to.h rename to src/include/processor/operator/persistent/copy_to.h index 5056ed0be72..13672a04f5b 100644 --- a/src/include/processor/operator/copy_to/copy_to.h +++ b/src/include/processor/operator/persistent/copy_to.h @@ -2,7 +2,7 @@ #include "common/copier_config/copier_config.h" #include "common/task_system/task_scheduler.h" -#include "processor/operator/copy_to/csv_file_writer.h" +#include "processor/operator/persistent/csv_file_writer.h" #include "processor/operator/physical_operator.h" #include "processor/result/result_set.h" diff --git a/src/include/processor/operator/copy_to/csv_file_writer.h b/src/include/processor/operator/persistent/csv_file_writer.h similarity index 100% rename from src/include/processor/operator/copy_to/csv_file_writer.h rename to src/include/processor/operator/persistent/csv_file_writer.h diff --git a/src/include/processor/operator/persistent/reader.h b/src/include/processor/operator/persistent/reader.h new file mode 100644 index 00000000000..bf17e32664f --- /dev/null +++ b/src/include/processor/operator/persistent/reader.h @@ -0,0 +1,54 @@ +#pragma once + +#include "processor/operator/physical_operator.h" +#include "storage/copier/reader_state.h" + +namespace kuzu { +namespace processor { + +struct ReaderInfo { + DataPos nodeOffsetPos; + std::vector dataColumnPoses; + bool isOrderPreserving; + storage::read_rows_func_t readFunc; + storage::init_reader_data_func_t initFunc; +}; + +class Reader : public PhysicalOperator { +public: + Reader(ReaderInfo readerInfo, std::shared_ptr sharedState, + uint32_t id, const std::string& paramsString) + : PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, + readerInfo{std::move(readerInfo)}, sharedState{std::move(sharedState)}, leftNumRows{0}, + currFileIdx{common::INVALID_VECTOR_IDX}, readFuncData{nullptr} {} + + inline void initGlobalStateInternal(ExecutionContext* context) final { + sharedState->validate(); + sharedState->countBlocks(); + } + inline bool isSource() const final { return true; } + + inline std::unique_ptr clone() final { + return make_unique(readerInfo, sharedState, getOperatorID(), paramsString); + } + +protected: + bool getNextTuplesInternal(ExecutionContext* context) final; + +private: + void getNextNodeGroupInSerial(std::shared_ptr& table); + void getNextNodeGroupInParallel(std::shared_ptr& table); + +private: + ReaderInfo readerInfo; + std::shared_ptr sharedState; + std::vector> leftRecordBatches; + common::row_idx_t leftNumRows; + + // For parallel reading. + common::vector_idx_t currFileIdx; + std::unique_ptr readFuncData; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/physical_operator.h b/src/include/processor/operator/physical_operator.h index 333c3cc6cbd..bffabe9a540 100644 --- a/src/include/processor/operator/physical_operator.h +++ b/src/include/processor/operator/physical_operator.h @@ -17,14 +17,8 @@ enum class PhysicalOperatorType : uint8_t { IN_QUERY_CALL, COPY_NODE, COPY_REL, - COPY_NPY, COPY_TO, - READ_CSV, - READ_NPY, - READ_PARQUET, - INSERT_NODE, CREATE_NODE_TABLE, - INSERT_REL, CREATE_REL_TABLE, CROSS_PRODUCT, DELETE_NODE, @@ -38,6 +32,8 @@ enum class PhysicalOperatorType : uint8_t { HASH_JOIN_BUILD, HASH_JOIN_PROBE, INDEX_SCAN, + INSERT_NODE, + INSERT_REL, INTERSECT_BUILD, INTERSECT, LIMIT, @@ -46,6 +42,7 @@ enum class PhysicalOperatorType : uint8_t { PATH_PROPERTY_PROBE, PROJECTION, PROFILE, + READER, RECURSIVE_JOIN, RENAME_PROPERTY, RENAME_TABLE, diff --git a/src/include/storage/copier/column_chunk.h b/src/include/storage/copier/column_chunk.h index ca9075c9256..888fbf6e76c 100644 --- a/src/include/storage/copier/column_chunk.h +++ b/src/include/storage/copier/column_chunk.h @@ -16,31 +16,33 @@ namespace storage { class NullColumnChunk; -struct ColumnChunkMetadata { - common::page_idx_t pageIdx = common::INVALID_PAGE_IDX; - common::page_idx_t numPages = 0; +struct BaseColumnChunkMetadata { + common::page_idx_t pageIdx; + common::page_idx_t numPages; - ColumnChunkMetadata() = default; - ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages) + BaseColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0} {} + BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages) : pageIdx(pageIdx), numPages(numPages) {} + virtual ~BaseColumnChunkMetadata() = default; }; -struct MainColumnChunkMetadata : public ColumnChunkMetadata { +struct ColumnChunkMetadata : public BaseColumnChunkMetadata { uint64_t numValues; - MainColumnChunkMetadata() = default; - MainColumnChunkMetadata( + ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {} + ColumnChunkMetadata( common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk) - : ColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {} + : BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {} }; -struct OverflowColumnChunkMetadata : public ColumnChunkMetadata { +struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata { common::offset_t lastOffsetInPage; - OverflowColumnChunkMetadata() = default; + OverflowColumnChunkMetadata() + : BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {} OverflowColumnChunkMetadata( common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage) - : ColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {} + : BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {} }; // Base data segment covers all fixed-sized data types. diff --git a/src/include/storage/copier/reader_state.h b/src/include/storage/copier/reader_state.h new file mode 100644 index 00000000000..4163033d3f2 --- /dev/null +++ b/src/include/storage/copier/reader_state.h @@ -0,0 +1,185 @@ +#pragma once + +#include "storage/copier/npy_reader.h" +#include "storage/copier/table_copy_utils.h" + +namespace kuzu { +namespace processor { +class Reader; +} +namespace storage { + +struct ReaderFunctionData { + ReaderFunctionData(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema) + : csvReaderConfig{csvReaderConfig}, tableSchema{tableSchema} {} + virtual ~ReaderFunctionData() = default; + + common::CSVReaderConfig csvReaderConfig; + catalog::TableSchema* tableSchema; +}; + +struct CSVReaderFunctionData : public ReaderFunctionData { + CSVReaderFunctionData(common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema, std::shared_ptr reader) + : ReaderFunctionData{csvReaderConfig, tableSchema}, reader{std::move(reader)} {} + + std::shared_ptr reader; +}; + +struct ParquetReaderFunctionData : public ReaderFunctionData { + ParquetReaderFunctionData(common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema, std::unique_ptr reader) + : ReaderFunctionData{csvReaderConfig, tableSchema}, reader{std::move(reader)} {} + + std::unique_ptr reader; +}; + +struct NPYReaderFunctionData : public ReaderFunctionData { + NPYReaderFunctionData(common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema, std::unique_ptr reader) + : ReaderFunctionData{csvReaderConfig, tableSchema}, reader{std::move(reader)} {} + + std::unique_ptr reader; +}; + +struct FileBlocksInfo { + common::row_idx_t numRows = 0; + std::vector numRowsPerBlock; +}; + +struct ReaderMorsel { + ReaderMorsel() + : fileIdx{common::INVALID_VECTOR_IDX}, blockIdx{common::INVALID_BLOCK_IDX}, + rowIdx{common::INVALID_ROW_IDX}, readerFuncData{nullptr} {} + + ReaderMorsel(common::vector_idx_t fileIdx, common::block_idx_t blockIdx, + common::row_idx_t rowIdx, ReaderFunctionData* readerFuncData) + : fileIdx{fileIdx}, blockIdx{blockIdx}, rowIdx{rowIdx}, readerFuncData{readerFuncData} {} + + virtual ~ReaderMorsel() = default; + + common::vector_idx_t fileIdx; + common::block_idx_t blockIdx; + common::row_idx_t rowIdx; + ReaderFunctionData* readerFuncData; +}; + +struct SerialReaderMorsel : public ReaderMorsel { + SerialReaderMorsel(common::vector_idx_t fileIdx, common::block_idx_t blockIdx, + common::row_idx_t rowIdx, std::shared_ptr table) + : ReaderMorsel{fileIdx, blockIdx, rowIdx, nullptr}, table{std::move(table)} {} + + std::shared_ptr table; +}; + +using validate_func_t = + std::function& paths, catalog::TableSchema* tableSchema)>; +using init_reader_data_func_t = std::function( + std::vector& paths, common::vector_idx_t fileIdx, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>; +using count_blocks_func_t = + std::function(std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>; +using read_rows_func_t = std::function; + +struct ReaderFunctions { + static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType); + static count_blocks_func_t getCountBlocksFunc(common::CopyDescription::FileType fileType); + static init_reader_data_func_t getInitDataFunc(common::CopyDescription::FileType fileType); + static read_rows_func_t getReadRowsFunc(common::CopyDescription::FileType fileType); + + static inline void validateCSVFiles( + std::vector& paths, catalog::TableSchema* tableSchema) { + // DO NOTHING. + } + static inline void validateParquetFiles( + std::vector& paths, catalog::TableSchema* tableSchema) { + // DO NOTHING. + } + static void validateNPYFiles( + std::vector& paths, catalog::TableSchema* tableSchema); + + static std::vector countRowsInCSVFile(std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); + static std::vector countRowsInParquetFile(std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); + static std::vector countRowsInNPYFile(std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); + + static std::unique_ptr initCSVReadData(std::vector& paths, + common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema); + static std::unique_ptr initParquetReadData(std::vector& paths, + common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema); + static std::unique_ptr initNPYReadData(std::vector& paths, + common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema); + + static arrow::RecordBatchVector readRowsFromCSVFile( + const ReaderFunctionData& functionData, common::block_idx_t blockIdx); + static arrow::RecordBatchVector readRowsFromParquetFile( + const ReaderFunctionData& functionData, common::block_idx_t blockIdx); + static arrow::RecordBatchVector readRowsFromNPYFile( + const ReaderFunctionData& functionData, common::block_idx_t blockIdx); +}; + +class ReaderSharedState { + friend class processor::Reader; + +public: + ReaderSharedState(common::CopyDescription::FileType fileType, + std::vector filePaths, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema) + : fileType{fileType}, filePaths{std::move(filePaths)}, csvReaderConfig{csvReaderConfig}, + tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0}, + leftRecordBatches{}, leftNumRows{0} { + validateFunc = ReaderFunctions::getValidateFunc(fileType); + initFunc = ReaderFunctions::getInitDataFunc(fileType); + countBlocksFunc = ReaderFunctions::getCountBlocksFunc(fileType); + readFunc = ReaderFunctions::getReadRowsFunc(fileType); + } + + void validate(); + void countBlocks(); + std::unique_ptr getSerialMorsel(); + std::unique_ptr getParallelMorsel(); + + inline void lock() { mtx.lock(); } + inline void unlock() { mtx.unlock(); } + inline common::row_idx_t& getNumRowsRef() { return std::ref(numRows); } + +private: + std::unique_ptr getMorselOfNextBlock(); + + static std::shared_ptr constructTableFromBatches( + std::vector>& recordBatches); + +public: + std::mutex mtx; + + common::CopyDescription::FileType fileType; + std::vector filePaths; + common::CSVReaderConfig csvReaderConfig; + catalog::TableSchema* tableSchema; + + validate_func_t validateFunc; + init_reader_data_func_t initFunc; + count_blocks_func_t countBlocksFunc; + read_rows_func_t readFunc; + std::unique_ptr readFuncData; + + common::row_idx_t numRows; + std::vector blockInfos; + + common::vector_idx_t currFileIdx; + common::block_idx_t currBlockIdx; + common::row_idx_t currRowIdx; + + std::vector> leftRecordBatches; + common::row_idx_t leftNumRows; +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/copier/rel_copier.h b/src/include/storage/copier/rel_copier.h index ff9c209e7c3..6115eaa8b7f 100644 --- a/src/include/storage/copier/rel_copier.h +++ b/src/include/storage/copier/rel_copier.h @@ -1,6 +1,6 @@ #pragma once -#include "storage/copier/read_file_state.h" +#include "storage/copier/reader_state.h" #include "storage/in_mem_storage_structure/in_mem_column.h" #include "storage/in_mem_storage_structure/in_mem_lists.h" #include "storage/index/hash_index.h" @@ -14,13 +14,15 @@ class DirectedInMemRelData; class RelCopier { public: - RelCopier(std::shared_ptr sharedState, + RelCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : sharedState{std::move(sharedState)}, copyDesc{copyDesc}, schema{schema}, - fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, numRows{0}, pkIndexes{ - std::move(pkIndexes)} { + fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, numRows{0}, + pkIndexes{std::move(pkIndexes)}, currFileIdx{common::INVALID_VECTOR_IDX}, + readFunc{std::move(readFunc)}, initFunc{std::move(initFunc)} { fwdCopyStates.resize(schema->getNumProperties()); for (auto i = 0u; i < schema->getNumProperties(); i++) { fwdCopyStates[i] = @@ -40,16 +42,15 @@ class RelCopier { virtual std::unique_ptr clone() const = 0; virtual void finalize() = 0; - inline std::shared_ptr getSharedState() const { return sharedState; } + inline std::shared_ptr getSharedState() const { return sharedState; } protected: - virtual void executeInternal(std::unique_ptr morsel) { - throw common::CopyException("RelCopier::executeInternal not implemented"); + virtual bool executeInternal() { + throw common::NotImplementedException("RelCopier::executeInternal not implemented"); } static void indexLookup(arrow::Array* pkArray, const common::LogicalType& pkColumnType, - PrimaryKeyIndex* pkIndex, common::offset_t* offsets, const std::string& filePath, - common::row_idx_t startRowIdxInFile); + PrimaryKeyIndex* pkIndex, common::offset_t* offsets); void copyRelColumnsOrCountRelListsSize(common::row_idx_t rowIdx, arrow::RecordBatch* recordBatch, common::RelDataDirection direction, @@ -70,7 +71,7 @@ class RelCopier { const std::shared_ptr& type, const uint8_t* data, uint64_t length); protected: - std::shared_ptr sharedState; + std::shared_ptr sharedState; common::CopyDescription copyDesc; catalog::RelTableSchema* schema; DirectedInMemRelData* fwdRelData; @@ -79,16 +80,22 @@ class RelCopier { std::vector pkIndexes; std::vector> fwdCopyStates; std::vector> bwdCopyStates; + + common::vector_idx_t currFileIdx; + std::unique_ptr readerFunctionData; + storage::read_rows_func_t readFunc; + storage::init_reader_data_func_t initFunc; }; class RelListsCounterAndColumnCopier : public RelCopier { protected: - RelListsCounterAndColumnCopier(std::shared_ptr sharedState, + RelListsCounterAndColumnCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkIndexes)} {} + std::move(pkIndexes), std::move(readFunc), std::move(initFunc)} {} void finalize() override; @@ -101,52 +108,51 @@ class RelListsCounterAndColumnCopier : public RelCopier { class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier { public: - ParquetRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, + ParquetRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData, - bwdRelData, std::move(pkIndexes)} {} + bwdRelData, std::move(pkIndexes), std::move(readFunc), std::move(initFunc)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes, readFunc, initFunc); } private: - void executeInternal(std::unique_ptr morsel) final; - -private: - std::unique_ptr reader; - std::string filePath; + bool executeInternal() final; }; class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier { public: - CSVRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, + CSVRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData, - bwdRelData, std::move(pkIndexes)} {} + bwdRelData, std::move(pkIndexes), std::move(readFunc), std::move(initFunc)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes, readFunc, initFunc); } private: - void executeInternal(std::unique_ptr morsel) final; + bool executeInternal() final; }; class RelListsCopier : public RelCopier { protected: - RelListsCopier(std::shared_ptr sharedState, + RelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkIndexes)} {} + std::move(pkIndexes), std::move(readFunc), std::move(initFunc)} {} private: void finalize() final; @@ -154,42 +160,40 @@ class RelListsCopier : public RelCopier { class ParquetRelListsCopier : public RelListsCopier { public: - ParquetRelListsCopier(std::shared_ptr sharedState, + ParquetRelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkIndexes)} {} + std::move(pkIndexes), std::move(readFunc), std::move(initFunc)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes, readFunc, initFunc); } private: - void executeInternal(std::unique_ptr morsel) final; - -private: - std::unique_ptr reader; - std::string filePath; + bool executeInternal() final; }; class CSVRelListsCopier : public RelListsCopier { public: - CSVRelListsCopier(std::shared_ptr sharedState, + CSVRelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkIndexes) + std::vector pkIndexes, read_rows_func_t readFunc, + init_reader_data_func_t initFunc) : RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkIndexes)} {} + std::move(pkIndexes), std::move(readFunc), std::move(initFunc)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes, readFunc, initFunc); } private: - void executeInternal(std::unique_ptr morsel) final; + bool executeInternal() final; }; class RelCopyTask : public common::Task { diff --git a/src/include/storage/copier/var_list_column_chunk.h b/src/include/storage/copier/var_list_column_chunk.h index 56ac9e84d86..2d72f1eb641 100644 --- a/src/include/storage/copier/var_list_column_chunk.h +++ b/src/include/storage/copier/var_list_column_chunk.h @@ -74,8 +74,8 @@ class VarListColumnChunk : public ColumnChunk { curListOffset += length; setValue(curListOffset, i + startPosInChunk); } - auto startOffset = listArray->value_offset(startPosInChunk); - auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend); + auto startOffset = listArray->value_offset(0); + auto endOffset = listArray->value_offset(numValuesToAppend); varListDataColumnChunk.resizeBuffer(curListOffset); varListDataColumnChunk.dataColumnChunk->append( listArray->values().get(), dataChunkOffsetToAppend, endOffset - startOffset); diff --git a/src/include/storage/storage_info.h b/src/include/storage/storage_info.h index 390f0910764..7f6e28a46a3 100644 --- a/src/include/storage/storage_info.h +++ b/src/include/storage/storage_info.h @@ -12,9 +12,10 @@ using storage_version_t = uint64_t; struct StorageVersionInfo { static std::unordered_map getStorageVersionInfo() { - return {{"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, - {"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, - {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}}; + return {{"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, + {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, + {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, + {"0.0.3", 1}}; } static storage_version_t getStorageVersion(); diff --git a/src/include/storage/storage_structure/disk_array.h b/src/include/storage/storage_structure/disk_array.h index 2411cbb7213..5cae903ff0a 100644 --- a/src/include/storage/storage_structure/disk_array.h +++ b/src/include/storage/storage_structure/disk_array.h @@ -234,8 +234,8 @@ class InMemDiskArray : public BaseInMemDiskArray { common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction); - static inline common::page_idx_t addDAHPageToFile(BMFileHandle& fileHandle, - StorageStructureID storageStructureID, BufferManager* bufferManager, WAL* wal) { + static inline common::page_idx_t addDAHPageToFile( + BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal) { DiskArrayHeader daHeader(sizeof(T)); return StorageStructureUtils::insertNewPage(fileHandle, StorageStructureID{StorageStructureType::METADATA}, *bufferManager, *wal, diff --git a/src/include/storage/store/node_column.h b/src/include/storage/store/node_column.h index d29028e5b9f..875c3174138 100644 --- a/src/include/storage/store/node_column.h +++ b/src/include/storage/store/node_column.h @@ -135,7 +135,7 @@ class NodeColumn { BMFileHandle* metadataFH; BufferManager* bufferManager; WAL* wal; - std::unique_ptr> metadataDA; + std::unique_ptr> metadataDA; std::unique_ptr nullColumn; std::vector> childrenColumns; read_node_column_func_t readNodeColumnFunc; diff --git a/src/include/storage/store/string_node_column.h b/src/include/storage/store/string_node_column.h index 34470e9b976..5aafd09ab45 100644 --- a/src/include/storage/store/string_node_column.h +++ b/src/include/storage/store/string_node_column.h @@ -41,7 +41,6 @@ class StringNodeColumn : public NodeColumn { common::ValueVector* resultVector) final; private: - void writeOverflow(); void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr, common::ValueVector* resultVector, common::page_idx_t overflowPageIdx); diff --git a/src/planner/planner.cpp b/src/planner/planner.cpp index 5206933d32e..5a2002bd8bf 100644 --- a/src/planner/planner.cpp +++ b/src/planner/planner.cpp @@ -177,7 +177,9 @@ std::unique_ptr Planner::planCopyFrom( auto& copyClause = reinterpret_cast(statement); auto plan = std::make_unique(); expression_vector arrowColumnExpressions; - bool hasSerial = false; + // For CSV file, and table with SERIAL columns, we need to read in serial from files. + bool readInSerialMode = + copyClause.getCopyDescription().fileType == CopyDescription::FileType::CSV; for (auto& property : catalog.getReadOnlyVersion()->getTableSchema(copyClause.getTableID())->properties) { if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) { @@ -185,12 +187,12 @@ std::unique_ptr Planner::planCopyFrom( common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}, property->getName(), property->getName())); } else { - hasSerial = true; + readInSerialMode = true; } } auto copy = make_shared(copyClause.getCopyDescription(), copyClause.getTableID(), - copyClause.getTableName(), hasSerial, std::move(arrowColumnExpressions), + copyClause.getTableName(), readInSerialMode, std::move(arrowColumnExpressions), std::make_shared( common::LogicalType{common::LogicalTypeID::INT64}, "nodeOffset", "nodeOffset"), copyClause.getStatementResult()->getSingleExpressionToCollect()); diff --git a/src/processor/map/map_copy.cpp b/src/processor/map/map_copy.cpp index 183b520da6c..b4614a6fee8 100644 --- a/src/processor/map/map_copy.cpp +++ b/src/processor/map/map_copy.cpp @@ -1,12 +1,9 @@ #include "planner/logical_plan/copy/logical_copy_from.h" #include "planner/logical_plan/copy/logical_copy_to.h" -#include "processor/operator/copy_from/copy_node.h" -#include "processor/operator/copy_from/copy_rel.h" -#include "processor/operator/copy_from/read_csv.h" -#include "processor/operator/copy_from/read_file.h" -#include "processor/operator/copy_from/read_npy.h" -#include "processor/operator/copy_from/read_parquet.h" -#include "processor/operator/copy_to/copy_to.h" +#include "processor/operator/persistent/copy_node.h" +#include "processor/operator/persistent/copy_rel.h" +#include "processor/operator/persistent/copy_to.h" +#include "processor/operator/persistent/reader.h" #include "processor/plan_mapper.h" using namespace kuzu::common; @@ -52,8 +49,13 @@ std::unique_ptr PlanMapper::mapCopyNode( fileType != CopyDescription::FileType::NPY) { throw NotImplementedException{"PlanMapper::mapLogicalCopyFromToPhysical"}; } - std::unique_ptr readFile; - std::shared_ptr readFileSharedState; + auto initReadDataFunc = ReaderFunctions::getInitDataFunc(fileType); + auto readRowsFunc = ReaderFunctions::getReadRowsFunc(fileType); + auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()); + auto readerSharedState = + std::make_shared(fileType, copy->getCopyDescription().filePaths, + *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); + auto outSchema = copy->getSchema(); auto dataColumnExpressions = copy->getDataColumnExpressions(); std::vector dataColumnPoses; @@ -62,43 +64,22 @@ std::unique_ptr PlanMapper::mapCopyNode( dataColumnPoses.emplace_back(outSchema->getExpressionPos(*dataColumnExpr)); } auto nodeOffsetPos = DataPos(outSchema->getExpressionPos(*copy->getNodeOffsetExpression())); - auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()); - switch (copy->getCopyDescription().fileType) { - case (CopyDescription::FileType::CSV): { - readFileSharedState = - std::make_shared(copy->getCopyDescription().filePaths, - *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); - readFile = std::make_unique(nodeOffsetPos, dataColumnPoses, readFileSharedState, - getOperatorID(), copy->getExpressionsForPrinting()); - } break; - case (CopyDescription::FileType::PARQUET): { - readFileSharedState = - std::make_shared(copy->getCopyDescription().filePaths, - *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); - readFile = - std::make_unique(nodeOffsetPos, dataColumnPoses, readFileSharedState, - getOperatorID(), copy->getExpressionsForPrinting(), copy->isPreservingOrder()); - } break; - case (CopyDescription::FileType::NPY): { - readFileSharedState = - std::make_shared(copy->getCopyDescription().filePaths, - *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); - readFile = std::make_unique(nodeOffsetPos, dataColumnPoses, readFileSharedState, - getOperatorID(), copy->getExpressionsForPrinting(), copy->isPreservingOrder()); - } break; - default: - throw NotImplementedException("PlanMapper::mapLogicalCopyNodeToPhysical"); - } - auto copyNodeSharedState = std::make_shared(readFileSharedState->numRows, - catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()), - storageManager.getNodesStore().getNodeTable(copy->getTableID()), copy->getCopyDescription(), - memoryManager); + auto reader = std::make_unique( + ReaderInfo{nodeOffsetPos, dataColumnPoses, copy->isPreservingOrder(), + std::move(readRowsFunc), std::move(initReadDataFunc)}, + readerSharedState, getOperatorID(), copy->getExpressionsForPrinting()); + + auto copyNodeSharedState = + std::make_shared(readerSharedState->getNumRowsRef(), + catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()), + storageManager.getNodesStore().getNodeTable(copy->getTableID()), + copy->getCopyDescription(), memoryManager); CopyNodeInfo copyNodeDataInfo{dataColumnPoses, nodeOffsetPos, copy->getCopyDescription(), storageManager.getNodesStore().getNodeTable(copy->getTableID()), &storageManager.getRelsStore(), catalog, storageManager.getWAL(), copy->isPreservingOrder()}; auto copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, - std::make_unique(copy->getSchema()), std::move(readFile), + std::make_unique(copy->getSchema()), std::move(reader), getOperatorID(), copy->getExpressionsForPrinting()); auto outputExpressions = binder::expression_vector{copy->getOutputExpression()}; return createFactorizedTableScanAligned(outputExpressions, outSchema, diff --git a/src/processor/map/plan_mapper.cpp b/src/processor/map/plan_mapper.cpp index 5010f42a6a9..3b7b345229f 100644 --- a/src/processor/map/plan_mapper.cpp +++ b/src/processor/map/plan_mapper.cpp @@ -188,8 +188,7 @@ std::unique_ptr PlanMapper::appendResultCollectorIfNotCopy( Schema* schema) { // We have a special code path for executing copy rel and copy npy, so we don't need to append // the resultCollector. - if (lastOperator->getOperatorType() != PhysicalOperatorType::COPY_REL && - lastOperator->getOperatorType() != PhysicalOperatorType::COPY_NPY) { + if (lastOperator->getOperatorType() != PhysicalOperatorType::COPY_REL) { lastOperator = createResultCollector( AccumulateType::REGULAR, expressionsToCollect, schema, std::move(lastOperator)); } diff --git a/src/processor/operator/CMakeLists.txt b/src/processor/operator/CMakeLists.txt index b1fe225fb2c..4f471486da6 100644 --- a/src/processor/operator/CMakeLists.txt +++ b/src/processor/operator/CMakeLists.txt @@ -1,6 +1,4 @@ add_subdirectory(aggregate) -add_subdirectory(copy_from) -add_subdirectory(copy_to) add_subdirectory(ddl) add_subdirectory(hash_join) add_subdirectory(intersect) diff --git a/src/processor/operator/copy_from/CMakeLists.txt b/src/processor/operator/copy_from/CMakeLists.txt deleted file mode 100644 index c44e8d05fef..00000000000 --- a/src/processor/operator/copy_from/CMakeLists.txt +++ /dev/null @@ -1,12 +0,0 @@ -add_library(kuzu_processor_operator_copy - OBJECT - copy.cpp - copy_rel.cpp - copy_node.cpp - read_file.cpp - read_parquet.cpp - read_npy.cpp) - -set(ALL_OBJECT_FILES - ${ALL_OBJECT_FILES} $ - PARENT_SCOPE) diff --git a/src/processor/operator/copy_from/read_file.cpp b/src/processor/operator/copy_from/read_file.cpp deleted file mode 100644 index c315846e3ea..00000000000 --- a/src/processor/operator/copy_from/read_file.cpp +++ /dev/null @@ -1,26 +0,0 @@ -#include "processor/operator/copy_from/read_file.h" - -using namespace kuzu::common; - -namespace kuzu { -namespace processor { - -bool ReadFile::getNextTuplesInternal(ExecutionContext* context) { - auto nodeOffsetVector = resultSet->getValueVector(nodeOffsetPos).get(); - auto morsel = preservingOrder ? sharedState->getMorselSerial() : sharedState->getMorsel(); - if (morsel == nullptr) { - return false; - } - nodeOffsetVector->setValue( - nodeOffsetVector->state->selVector->selectedPositions[0], morsel->rowsRead); - auto recordTable = readTuples(std::move(morsel)); - for (auto i = 0u; i < dataColumnPoses.size(); i++) { - ArrowColumnVector::setArrowColumn( - resultSet->getValueVector(dataColumnPoses[i]).get(), recordTable->column((int)i)); - } - resultSet->dataChunks[0]->state->setToUnflat(); - return true; -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/copy_from/read_npy.cpp b/src/processor/operator/copy_from/read_npy.cpp deleted file mode 100644 index 90e5aaca96f..00000000000 --- a/src/processor/operator/copy_from/read_npy.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "processor/operator/copy_from/read_npy.h" - -#include "common/constants.h" - -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -std::shared_ptr ReadNPY::readTuples(std::unique_ptr morsel) { - if (preservingOrder) { - auto serialMorsel = reinterpret_cast(morsel.get()); - return serialMorsel->recordTable; - } - return arrow::Table::FromRecordBatches({reader->readBlock(morsel->blockIdx)}).ValueOrDie(); -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/copy_from/read_parquet.cpp b/src/processor/operator/copy_from/read_parquet.cpp deleted file mode 100644 index a2fa063aa90..00000000000 --- a/src/processor/operator/copy_from/read_parquet.cpp +++ /dev/null @@ -1,28 +0,0 @@ -#include "processor/operator/copy_from/read_parquet.h" - -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -std::shared_ptr ReadParquet::readTuples(std::unique_ptr morsel) { - if (preservingOrder) { - auto serialMorsel = reinterpret_cast(morsel.get()); - return serialMorsel->recordTable; - } - assert(!morsel->filePath.empty()); - if (!reader || filePath != morsel->filePath) { - reader = TableCopyUtils::createParquetReader(morsel->filePath, sharedState->tableSchema); - filePath = morsel->filePath; - } - std::shared_ptr table; - TableCopyUtils::throwCopyExceptionIfNotOK( - reader->RowGroup(static_cast(morsel->blockIdx))->ReadTable(&table)); - arrow::TableBatchReader batchReader(*table); - std::shared_ptr recordBatch; - TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ReadNext(&recordBatch)); - return arrow::Table::FromRecordBatches({recordBatch}).ValueOrDie(); -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/copy_to/CMakeLists.txt b/src/processor/operator/copy_to/CMakeLists.txt deleted file mode 100644 index 59b23187e0d..00000000000 --- a/src/processor/operator/copy_to/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -add_library(kuzu_processor_operator_copy_to - OBJECT - copy_to.cpp - csv_file_writer.cpp - ) - -set(ALL_OBJECT_FILES - ${ALL_OBJECT_FILES} $ - PARENT_SCOPE) diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index ab6d127085d..f927091ecec 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -1,13 +1,19 @@ -add_library(kuzu_processor_operator_update +add_library(kuzu_processor_operator_persistent OBJECT + copy.cpp + copy_node.cpp + copy_rel.cpp + copy_to.cpp + csv_file_writer.cpp delete.cpp delete_executor.cpp insert.cpp insert_executor.cpp merge.cpp + reader.cpp set.cpp set_executor.cpp) set(ALL_OBJECT_FILES - ${ALL_OBJECT_FILES} $ + ${ALL_OBJECT_FILES} $ PARENT_SCOPE) diff --git a/src/processor/operator/copy_from/copy.cpp b/src/processor/operator/persistent/copy.cpp similarity index 94% rename from src/processor/operator/copy_from/copy.cpp rename to src/processor/operator/persistent/copy.cpp index 4326edb9a74..11673a25483 100644 --- a/src/processor/operator/copy_from/copy.cpp +++ b/src/processor/operator/persistent/copy.cpp @@ -1,4 +1,4 @@ -#include "processor/operator/copy_from/copy.h" +#include "processor/operator/persistent/copy.h" #include "common/string_utils.h" diff --git a/src/processor/operator/copy_from/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp similarity index 87% rename from src/processor/operator/copy_from/copy_node.cpp rename to src/processor/operator/persistent/copy_node.cpp index 9125bb2a0fb..ba085e21750 100644 --- a/src/processor/operator/copy_from/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -1,4 +1,4 @@ -#include "processor/operator/copy_from/copy_node.h" +#include "processor/operator/persistent/copy_node.h" #include "common/string_utils.h" #include "storage/copier/string_column_chunk.h" @@ -50,7 +50,8 @@ CopyNode::CopyNode(std::shared_ptr sharedState, CopyNodeInf std::unique_ptr child, uint32_t id, const std::string& paramsString) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id, paramsString}, - sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)} {} + nodeOffsetVector{nullptr}, sharedState{std::move(sharedState)}, copyNodeInfo{std::move( + copyNodeInfo)} {} void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localNodeGroup) { std::unique_lock xLck{mtx}; @@ -85,25 +86,22 @@ void CopyNode::executeInternal(ExecutionContext* context) { auto numTuplesToAppend = ArrowColumnVector::getArrowColumn( resultSet->getValueVector(copyNodeInfo.dataColumnPoses[0]).get()) ->length(); - auto nodeOffset = nodeOffsetVector->getValue( - nodeOffsetVector->state->selVector->selectedPositions[0]) - - 1; - uint64_t numAppendedTuples = 0; - while (numAppendedTuples < numTuplesToAppend) { - auto numAppendedTuplesInNodeGroup = localNodeGroup->append( - resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend - numAppendedTuples); - numAppendedTuples += numAppendedTuplesInNodeGroup; - if (localNodeGroup->isFull()) { - node_group_idx_t nodeGroupIdx; - if (copyNodeInfo.preservingOrder) { - nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset) - 1; - sharedState->currentNodeGroupIdx++; - } else { - nodeGroupIdx = sharedState->getNextNodeGroupIdx(); - } - writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), - sharedState->pkColumnID, sharedState->table, localNodeGroup.get()); + assert(numTuplesToAppend <= StorageConstants::NODE_GROUP_SIZE); + auto nodeOffset = nodeOffsetVector->getValue( + nodeOffsetVector->state->selVector->selectedPositions[0]); + auto numAppendedTuplesInNodeGroup = + localNodeGroup->append(resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend); + assert(numAppendedTuplesInNodeGroup == numTuplesToAppend); + if (localNodeGroup->isFull()) { + node_group_idx_t nodeGroupIdx; + if (copyNodeInfo.preservingOrder) { + nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + sharedState->setNextNodeGroupIdx(nodeGroupIdx + 1); + } else { + nodeGroupIdx = sharedState->getNextNodeGroupIdx(); } + writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), + sharedState->pkColumnID, sharedState->table, localNodeGroup.get()); } } if (localNodeGroup->getNumNodes() > 0) { diff --git a/src/processor/operator/copy_from/copy_rel.cpp b/src/processor/operator/persistent/copy_rel.cpp similarity index 92% rename from src/processor/operator/copy_from/copy_rel.cpp rename to src/processor/operator/persistent/copy_rel.cpp index acf47a26389..47ab2424959 100644 --- a/src/processor/operator/copy_from/copy_rel.cpp +++ b/src/processor/operator/persistent/copy_rel.cpp @@ -1,4 +1,4 @@ -#include "processor/operator/copy_from/copy_rel.h" +#include "processor/operator/persistent/copy_rel.h" #include "storage/copier/rel_copy_executor.h" diff --git a/src/processor/operator/copy_to/copy_to.cpp b/src/processor/operator/persistent/copy_to.cpp similarity index 94% rename from src/processor/operator/copy_to/copy_to.cpp rename to src/processor/operator/persistent/copy_to.cpp index 3f07c0a8039..fbb1027c359 100644 --- a/src/processor/operator/copy_to/copy_to.cpp +++ b/src/processor/operator/persistent/copy_to.cpp @@ -1,4 +1,4 @@ -#include "processor/operator/copy_to/copy_to.h" +#include "processor/operator/persistent/copy_to.h" #include "common/string_utils.h" #include "common/types/value.h" diff --git a/src/processor/operator/copy_to/csv_file_writer.cpp b/src/processor/operator/persistent/csv_file_writer.cpp similarity index 95% rename from src/processor/operator/copy_to/csv_file_writer.cpp rename to src/processor/operator/persistent/csv_file_writer.cpp index 2468f423ea8..7d5ed5d5806 100644 --- a/src/processor/operator/copy_to/csv_file_writer.cpp +++ b/src/processor/operator/persistent/csv_file_writer.cpp @@ -1,4 +1,4 @@ -#include "processor/operator/copy_to/csv_file_writer.h" +#include "processor/operator/persistent/csv_file_writer.h" #include @@ -70,6 +70,7 @@ void CSVFileWriter::writeValue(ValueVector* vector) { switch (vector->dataType.getLogicalTypeID()) { case LogicalTypeID::BOOL: return writeToBuffer(vector, selPos); + case LogicalTypeID::SERIAL: case LogicalTypeID::INT64: return writeToBuffer(vector, selPos); case LogicalTypeID::INT32: @@ -96,7 +97,7 @@ void CSVFileWriter::writeValue(ValueVector* vector) { case LogicalTypeID::STRUCT: return writeListToBuffer(vector, selPos); default: { - NotImplementedException("CSVFileWriter::writeValue"); + throw NotImplementedException("CSVFileWriter::writeValue"); } } } diff --git a/src/processor/operator/persistent/reader.cpp b/src/processor/operator/persistent/reader.cpp new file mode 100644 index 00000000000..30ce0e89680 --- /dev/null +++ b/src/processor/operator/persistent/reader.cpp @@ -0,0 +1,63 @@ +#include "processor/operator/persistent/reader.h" + +#include "storage/copier/npy_reader.h" + +using namespace kuzu::catalog; +using namespace kuzu::common; +using namespace kuzu::storage; + +namespace kuzu { +namespace processor { + +bool Reader::getNextTuplesInternal(ExecutionContext* context) { + std::shared_ptr table = nullptr; + readerInfo.isOrderPreserving ? getNextNodeGroupInSerial(table) : + getNextNodeGroupInParallel(table); + if (table == nullptr) { + return false; + } + for (auto i = 0u; i < readerInfo.dataColumnPoses.size(); i++) { + ArrowColumnVector::setArrowColumn( + resultSet->getValueVector(readerInfo.dataColumnPoses[i]).get(), table->column((int)i)); + } + return true; +} + +void Reader::getNextNodeGroupInSerial(std::shared_ptr& table) { + auto morsel = sharedState->getSerialMorsel(); + if (morsel->fileIdx == INVALID_VECTOR_IDX) { + return; + } + auto serialMorsel = reinterpret_cast(morsel.get()); + table = serialMorsel->table; + auto nodeOffsetVector = resultSet->getValueVector(readerInfo.nodeOffsetPos).get(); + nodeOffsetVector->setValue( + nodeOffsetVector->state->selVector->selectedPositions[0], morsel->rowIdx); +} + +void Reader::getNextNodeGroupInParallel(std::shared_ptr& table) { + while (leftNumRows < StorageConstants::NODE_GROUP_SIZE) { + auto morsel = sharedState->getParallelMorsel(); + if (morsel->fileIdx == INVALID_VECTOR_IDX) { + break; + } + if (morsel->fileIdx != currFileIdx) { + readFuncData = readerInfo.initFunc(sharedState->filePaths, morsel->fileIdx, + sharedState->csvReaderConfig, sharedState->tableSchema); + currFileIdx = morsel->fileIdx; + } + auto batchVector = readerInfo.readFunc(*readFuncData, morsel->blockIdx); + for (auto& batch : batchVector) { + leftNumRows += batch->num_rows(); + leftRecordBatches.push_back(std::move(batch)); + } + } + if (leftNumRows == 0) { + return; + } + table = ReaderSharedState::constructTableFromBatches(leftRecordBatches); + leftNumRows -= table->num_rows(); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index a21fb3feecd..60601ab4168 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -35,14 +35,8 @@ std::string PhysicalOperatorUtils::operatorTypeToString(PhysicalOperatorType ope case PhysicalOperatorType::CREATE_MACRO: { return "CREATE_MACRO"; } - case PhysicalOperatorType::READ_CSV: { - return "READ_CSV"; - } - case PhysicalOperatorType::READ_NPY: { - return "READ_NPY"; - } - case PhysicalOperatorType::READ_PARQUET: { - return "READ_PARQUET"; + case PhysicalOperatorType::READER: { + return "READER"; } case PhysicalOperatorType::INSERT_NODE: { return "INSERT_NODE"; diff --git a/src/processor/processor.cpp b/src/processor/processor.cpp index 56c57c7a74f..04bf4f6ba5f 100644 --- a/src/processor/processor.cpp +++ b/src/processor/processor.cpp @@ -1,8 +1,8 @@ #include "processor/processor.h" #include "processor/operator/aggregate/base_aggregate.h" -#include "processor/operator/copy_from/copy.h" -#include "processor/operator/copy_from/copy_node.h" +#include "processor/operator/persistent/copy.h" +#include "processor/operator/persistent/copy_node.h" #include "processor/operator/result_collector.h" #include "processor/operator/sink.h" #include "processor/processor_task.h" diff --git a/src/storage/copier/CMakeLists.txt b/src/storage/copier/CMakeLists.txt index ee0852a49f3..2b084df0908 100644 --- a/src/storage/copier/CMakeLists.txt +++ b/src/storage/copier/CMakeLists.txt @@ -3,7 +3,7 @@ add_library(kuzu_storage_in_mem_csv_copier column_chunk.cpp node_group.cpp npy_reader.cpp - read_file_state.cpp + reader_state.cpp rel_copier.cpp rel_copy_executor.cpp struct_column_chunk.cpp diff --git a/src/storage/copier/column_chunk.cpp b/src/storage/copier/column_chunk.cpp index 007617c9c08..2b464ddd321 100644 --- a/src/storage/copier/column_chunk.cpp +++ b/src/storage/copier/column_chunk.cpp @@ -58,7 +58,7 @@ void ColumnChunk::append( ValueVector* vector, offset_t startPosInChunk, uint32_t numValuesToAppend) { assert(vector->dataType.getLogicalTypeID() == LogicalTypeID::ARROW_COLUMN); auto chunkedArray = ArrowColumnVector::getArrowColumn(vector).get(); - for (auto array : chunkedArray->chunks()) { + for (const auto& array : chunkedArray->chunks()) { auto numValuesInArrayToAppend = std::min((uint64_t)array->length(), (uint64_t)numValuesToAppend); if (numValuesInArrayToAppend <= 0) { diff --git a/src/storage/copier/read_file_state.cpp b/src/storage/copier/read_file_state.cpp deleted file mode 100644 index ba33e18d88f..00000000000 --- a/src/storage/copier/read_file_state.cpp +++ /dev/null @@ -1,270 +0,0 @@ -#include "storage/copier/read_file_state.h" - -#include "storage/copier/npy_reader.h" -using namespace kuzu::common; - -namespace kuzu { -namespace storage { - -void ReadCSVSharedState::countNumRows() { - for (auto& filePath : filePaths) { - auto csvStreamingReader = - TableCopyUtils::createCSVReader(filePath, &csvReaderConfig, tableSchema); - std::shared_ptr currBatch; - uint64_t numBlocks = 0; - std::vector numRowsPerBlock; - while (true) { - TableCopyUtils::throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&currBatch)); - if (currBatch == nullptr) { - break; - } - ++numBlocks; - auto currNumRows = currBatch->num_rows(); - numRowsPerBlock.push_back(currNumRows); - numRows += currNumRows; - } - fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numRowsPerBlock}); - } -} - -// TODO(Guodong): Refactor duplicated between the three getMorsel() functions. -std::unique_ptr ReadCSVSharedState::getMorselSerial() { - std::unique_lock lck{mtx}; - std::shared_ptr resultTable; - if (leftOverData) { - resultTable = std::move(leftOverData); - leftOverData = nullptr; - } - while (true) { - if (currFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[currFileIdx]; - if (!reader) { - reader = TableCopyUtils::createCSVReader(filePath, &csvReaderConfig, tableSchema); - } - std::shared_ptr recordBatch; - TableCopyUtils::throwCopyExceptionIfNotOK(reader->ReadNext(&recordBatch)); - if (recordBatch == nullptr) { - // No more blocks to read in this file. - currFileIdx++; - currBlockIdx = 0; - currRowIdxInCurrFile = 1; - reader.reset(); - if (currFileIdx >= filePaths.size()) { - break; - } - continue; - } - auto numRowsInBatch = recordBatch->num_rows(); - rowsRead += numRowsInBatch; - currRowIdxInCurrFile += numRowsInBatch; - currBlockIdx++; - if (resultTable == nullptr) { - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::Table::FromRecordBatches({recordBatch}).Value(&resultTable)); - } else { - std::shared_ptr interimTable; - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::Table::FromRecordBatches({recordBatch}).Value(&interimTable)); - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::ConcatenateTables({resultTable, interimTable}).Value(&resultTable)); - } - if (resultTable->column(0)->length() >= common::StorageConstants::NODE_GROUP_SIZE) { - leftOverData = resultTable->Slice(common::StorageConstants::NODE_GROUP_SIZE); - resultTable = resultTable->Slice(0, common::StorageConstants::NODE_GROUP_SIZE); - break; - } - if (currFileIdx >= filePaths.size()) { - break; - } - } - auto rows = resultTable->num_rows(); - auto result = std::make_unique( - currRowIdx, filePaths[0], currRowIdxInCurrFile, rowsRead, std::move(resultTable)); - currRowIdx += rows; - return result; -} - -std::unique_ptr ReadCSVSharedState::getMorsel() { - return std::move(getMorselSerial()); -} - -void ReadParquetSharedState::countNumRows() { - for (auto& filePath : filePaths) { - std::unique_ptr reader = - TableCopyUtils::createParquetReader(filePath, tableSchema); - auto metadata = reader->parquet_reader()->metadata(); - uint64_t numBlocks = metadata->num_row_groups(); - std::vector numLinesPerBlock(numBlocks); - for (auto blockIdx = 0; blockIdx < numBlocks; ++blockIdx) { - numLinesPerBlock[blockIdx] = metadata->RowGroup(blockIdx)->num_rows(); - } - fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); - numRows += metadata->num_rows(); - } -} - -std::unique_ptr ReadParquetSharedState::getMorselSerial() { - std::unique_lock lck{mtx}; - std::shared_ptr resultTable; - if (leftOverData) { - resultTable = std::move(leftOverData); - leftOverData = nullptr; - } - while (true) { - if (currFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[currFileIdx]; - if (!reader) { - reader = TableCopyUtils::createParquetReader(filePath, tableSchema); - } - std::shared_ptr table; - TableCopyUtils::throwCopyExceptionIfNotOK(reader->ReadTable(&table)); - if (table == nullptr) { - // No more blocks to read in this file. - currFileIdx++; - currBlockIdx = 0; - reader.reset(); - if (currFileIdx >= filePaths.size()) { - break; - } - continue; - } - auto numRowsInBatch = table->num_rows(); - rowsRead += numRowsInBatch; - currRowIdxInCurrFile += numRowsInBatch; - currBlockIdx++; - if (resultTable == nullptr) { - resultTable = table; - } else { - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::ConcatenateTables({resultTable, table}).Value(&resultTable)); - } - if (resultTable->column(0)->length() < common::StorageConstants::NODE_GROUP_SIZE) { - leftOverData = resultTable->Slice(common::StorageConstants::NODE_GROUP_SIZE); - resultTable = resultTable->Slice(0, common::StorageConstants::NODE_GROUP_SIZE); - break; - } - if (currFileIdx >= filePaths.size()) { - break; - } - } - auto rows = resultTable->num_rows(); - auto result = std::make_unique( - currRowIdx, filePaths[0], currRowIdxInCurrFile, rowsRead, std::move(resultTable)); - currRowIdx += rows; - return result; -} - -// TODO(Guodong): Refactor duplicated between the three getMorsel() functions. -std::unique_ptr ReadParquetSharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - if (currFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[currFileIdx]; - auto fileBlockInfo = fileBlockInfos.at(filePath); - if (currBlockIdx >= fileBlockInfo.numBlocks) { - // No more blocks to read in this file. - currFileIdx++; - currBlockIdx = 0; - currRowIdxInCurrFile = 1; - continue; - } - auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[currBlockIdx]; - auto result = std::make_unique( - currRowIdx, currBlockIdx, numRowsInBlock, filePath, currRowIdxInCurrFile); - currRowIdx += numRowsInBlock; - currRowIdxInCurrFile += numRowsInBlock; - currBlockIdx++; - return result; - } -} - -void ReadNPYSharedState::countNumRows() { - uint8_t idx = 0; - uint64_t firstFileRows; - for (auto& filePath : filePaths) { - auto reader = make_unique(filePath); - numRows = reader->getNumRows(); - if (idx == 0) { - firstFileRows = numRows; - } - auto tableType = tableSchema->getProperty(idx)->getDataType(); - reader->validate(*tableType, firstFileRows, tableSchema->tableName); - auto numBlocks = (uint64_t)((numRows + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY) / - CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - std::vector numRowsPerBlock(numBlocks, CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numRowsPerBlock}); - idx++; - } -} - -std::unique_ptr ReadNPYSharedState::getMorselSerial() { - std::unique_lock lck{mtx}; - std::shared_ptr resultTable; - if (leftOverData) { - resultTable = std::move(leftOverData); - leftOverData = nullptr; - } - while (true) { - std::shared_ptr recordBatch = reader->readBlock(currBlockIdx); - if (recordBatch == nullptr) { - // No more blocks to read - break; - } - auto numRowsInBatch = recordBatch->num_rows(); - rowsRead += numRowsInBatch; - currRowIdxInCurrFile += numRowsInBatch; - currBlockIdx++; - if (resultTable == nullptr) { - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::Table::FromRecordBatches({recordBatch}).Value(&resultTable)); - } else { - std::shared_ptr interimTable; - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::Table::FromRecordBatches({recordBatch}).Value(&interimTable)); - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::ConcatenateTables({resultTable, interimTable}).Value(&resultTable)); - } - if (resultTable->column(0)->length() >= common::StorageConstants::NODE_GROUP_SIZE) { - leftOverData = resultTable->Slice(common::StorageConstants::NODE_GROUP_SIZE); - resultTable = resultTable->Slice(0, common::StorageConstants::NODE_GROUP_SIZE); - } - break; - } - auto rows = resultTable->num_rows(); - auto result = std::make_unique( - currRowIdx, "", currRowIdxInCurrFile, rowsRead, std::move(resultTable)); - currRowIdx += rows; - return result; -} - -// TODO(Guodong): Refactor duplicated between the three getMorsel() functions. -std::unique_ptr ReadNPYSharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - auto filePath = filePaths[0]; - auto fileBlockInfo = fileBlockInfos.at(filePath); - if (currBlockIdx >= fileBlockInfo.numBlocks) { - // No more blocks to read. - return nullptr; - } - auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[currBlockIdx]; - auto result = std::make_unique( - currRowIdx, currBlockIdx, numRowsInBlock, filePath, currRowIdxInCurrFile); - currRowIdx += numRowsInBlock; - currRowIdxInCurrFile += numRowsInBlock; - currBlockIdx++; - return result; - } -} - -} // namespace storage -} // namespace kuzu diff --git a/src/storage/copier/reader_state.cpp b/src/storage/copier/reader_state.cpp new file mode 100644 index 00000000000..33c5df6c36d --- /dev/null +++ b/src/storage/copier/reader_state.cpp @@ -0,0 +1,276 @@ +#include "storage/copier/reader_state.h" + +using namespace kuzu::catalog; +using namespace kuzu::common; + +namespace kuzu { +namespace storage { + +validate_func_t ReaderFunctions::getValidateFunc(CopyDescription::FileType fileType) { + switch (fileType) { + case CopyDescription::FileType::CSV: + return validateCSVFiles; + case CopyDescription::FileType::PARQUET: + return validateParquetFiles; + case CopyDescription::FileType::NPY: + return validateNPYFiles; + default: + throw NotImplementedException{"ReaderFunctions::getValidateFunc"}; + } +} + +count_blocks_func_t ReaderFunctions::getCountBlocksFunc(CopyDescription::FileType fileType) { + switch (fileType) { + case CopyDescription::FileType::CSV: + return countRowsInCSVFile; + case CopyDescription::FileType::PARQUET: + return countRowsInParquetFile; + case CopyDescription::FileType::NPY: + return countRowsInNPYFile; + default: + throw NotImplementedException{"ReaderFunctions::getRowsCounterFunc"}; + } +} + +init_reader_data_func_t ReaderFunctions::getInitDataFunc(CopyDescription::FileType fileType) { + switch (fileType) { + case CopyDescription::FileType::CSV: + return initCSVReadData; + case CopyDescription::FileType::PARQUET: + return initParquetReadData; + case CopyDescription::FileType::NPY: + return initNPYReadData; + default: + throw NotImplementedException{"ReaderFunctions::getInitDataFunc"}; + } +} + +read_rows_func_t ReaderFunctions::getReadRowsFunc(CopyDescription::FileType fileType) { + switch (fileType) { + case CopyDescription::FileType::CSV: + return readRowsFromCSVFile; + case CopyDescription::FileType::PARQUET: + return readRowsFromParquetFile; + case CopyDescription::FileType::NPY: + return readRowsFromNPYFile; + default: + throw NotImplementedException{"ReaderFunctions::getReadRowsFunc"}; + } +} + +void ReaderFunctions::validateNPYFiles(std::vector& paths, TableSchema* tableSchema) { + assert(!paths.empty() && paths.size() == tableSchema->getNumProperties()); + row_idx_t numRows; + for (auto i = 0u; i < paths.size(); i++) { + auto reader = make_unique(paths[i]); + if (i == 0) { + numRows = reader->getNumRows(); + } + auto tableType = tableSchema->getProperty(i)->getDataType(); + reader->validate(*tableType, numRows, tableSchema->tableName); + } +} + +std::vector ReaderFunctions::countRowsInCSVFile( + std::vector& paths, CSVReaderConfig csvReaderConfig, TableSchema* tableSchema) { + std::vector result(paths.size()); + for (auto i = 0u; i < paths.size(); i++) { + auto csvStreamingReader = + TableCopyUtils::createCSVReader(paths[i], &csvReaderConfig, tableSchema); + std::shared_ptr currBatch; + row_idx_t numRows = 0; + std::vector blocks; + while (true) { + TableCopyUtils::throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&currBatch)); + if (currBatch == nullptr) { + break; + } + auto numRowsInBatch = currBatch->num_rows(); + numRows += numRowsInBatch; + blocks.push_back(numRowsInBatch); + } + result[i] = {numRows, blocks}; + } + return result; +} + +std::vector ReaderFunctions::countRowsInParquetFile( + std::vector& paths, CSVReaderConfig csvReaderConfig, TableSchema* tableSchema) { + std::vector result(paths.size()); + for (auto i = 0u; i < paths.size(); i++) { + std::unique_ptr reader = + TableCopyUtils::createParquetReader(paths[i], tableSchema); + auto metadata = reader->parquet_reader()->metadata(); + auto numBlocks = metadata->num_row_groups(); + std::vector blocks; + blocks.resize(numBlocks); + for (auto blockIdx = 0; blockIdx < numBlocks; blockIdx++) { + blocks[blockIdx] = metadata->RowGroup(blockIdx)->num_rows(); + } + result[i] = {(row_idx_t)metadata->num_rows(), blocks}; + } + return result; +} + +std::vector ReaderFunctions::countRowsInNPYFile( + std::vector& paths, CSVReaderConfig csvReaderConfig, TableSchema* tableSchema) { + assert(!paths.empty()); + auto reader = make_unique(paths[0]); + auto numRows = reader->getNumRows(); + auto numBlocks = (block_idx_t)((numRows + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY) / + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); + std::vector blocks(numBlocks, CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); + return {FileBlocksInfo{numRows, blocks}}; +} + +std::unique_ptr ReaderFunctions::initCSVReadData( + std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + assert(fileIdx < paths.size()); + auto reader = TableCopyUtils::createCSVReader(paths[fileIdx], &csvReaderConfig, tableSchema); + return std::make_unique(csvReaderConfig, tableSchema, std::move(reader)); +} + +std::unique_ptr ReaderFunctions::initParquetReadData( + std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + assert(fileIdx < paths.size()); + auto reader = TableCopyUtils::createParquetReader(paths[fileIdx], tableSchema); + return std::make_unique( + csvReaderConfig, tableSchema, std::move(reader)); +} + +std::unique_ptr ReaderFunctions::initNPYReadData( + std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + auto reader = make_unique(paths); + return std::make_unique(csvReaderConfig, tableSchema, std::move(reader)); +} + +arrow::RecordBatchVector ReaderFunctions::readRowsFromCSVFile( + const ReaderFunctionData& functionData, common::block_idx_t blockIdx) { + auto& readerData = (CSVReaderFunctionData&)(functionData); + std::shared_ptr recordBatch; + TableCopyUtils::throwCopyExceptionIfNotOK(readerData.reader->ReadNext(&recordBatch)); + assert(recordBatch); + arrow::RecordBatchVector result{recordBatch}; + return result; +} + +arrow::RecordBatchVector ReaderFunctions::readRowsFromParquetFile( + const ReaderFunctionData& functionData, common::block_idx_t blockIdx) { + auto& readerData = (ParquetReaderFunctionData&)(functionData); + std::shared_ptr table; + TableCopyUtils::throwCopyExceptionIfNotOK( + readerData.reader->RowGroup(static_cast(blockIdx))->ReadTable(&table)); + assert(table); + arrow::TableBatchReader batchReader(*table); + arrow::RecordBatchVector result; + TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ToRecordBatches().Value(&result)); + return result; +} + +arrow::RecordBatchVector ReaderFunctions::readRowsFromNPYFile( + const ReaderFunctionData& functionData, common::block_idx_t blockIdx) { + auto& readerData = (NPYReaderFunctionData&)(functionData); + auto recordBatch = readerData.reader->readBlock(blockIdx); + arrow::RecordBatchVector result{recordBatch}; + return result; +} + +void ReaderSharedState::validate() { + validateFunc(filePaths, tableSchema); +} + +void ReaderSharedState::countBlocks() { + readFuncData = initFunc(filePaths, 0 /* fileIdx */, csvReaderConfig, tableSchema); + blockInfos = countBlocksFunc(filePaths, csvReaderConfig, tableSchema); + for (auto& blockInfo : blockInfos) { + numRows += blockInfo.numRows; + } +} + +std::unique_ptr ReaderSharedState::getSerialMorsel() { + std::unique_lock xLck{mtx}; + while (leftNumRows < StorageConstants::NODE_GROUP_SIZE) { + auto morsel = getMorselOfNextBlock(); + if (morsel->fileIdx >= filePaths.size()) { + // No more blocks. + break; + } + auto batchVector = readFunc(*morsel->readerFuncData, morsel->blockIdx); + for (auto& batch : batchVector) { + leftNumRows += batch->num_rows(); + leftRecordBatches.push_back(std::move(batch)); + } + } + if (leftNumRows == 0) { + return std::make_unique(); + } + auto table = constructTableFromBatches(leftRecordBatches); + leftNumRows -= table->num_rows(); + auto result = + std::make_unique(currFileIdx, currBlockIdx, currRowIdx, table); + currRowIdx += table->num_rows(); + return result; +} + +std::unique_ptr ReaderSharedState::getParallelMorsel() { + std::unique_lock xLck{mtx}; + while (true) { + auto morsel = getMorselOfNextBlock(); + if (morsel->fileIdx >= filePaths.size()) { + // No more blocks. + break; + } + assert(morsel->fileIdx < blockInfos.size() && + morsel->blockIdx < blockInfos[morsel->fileIdx].numRowsPerBlock.size()); + if (morsel->fileIdx != currFileIdx) { + readFuncData = initFunc(filePaths, morsel->fileIdx, csvReaderConfig, tableSchema); + } + currRowIdx += blockInfos[morsel->fileIdx].numRowsPerBlock[morsel->blockIdx]; + return morsel; + } + return std::make_unique(); +} + +std::unique_ptr ReaderSharedState::getMorselOfNextBlock() { + auto numBlocksInFile = blockInfos[currFileIdx].numRowsPerBlock.size(); + if (currBlockIdx >= numBlocksInFile) { + currFileIdx += fileType == CopyDescription::FileType::NPY ? filePaths.size() : 1; + currBlockIdx = 0; + if (currFileIdx >= filePaths.size()) { + // End of all files. + return std::make_unique(); + } + } + return std::make_unique( + currFileIdx, currBlockIdx++, currRowIdx, readFuncData.get()); +} + +std::shared_ptr ReaderSharedState::constructTableFromBatches( + std::vector>& recordBatches) { + std::shared_ptr table; + std::vector> recordBatchesForTable; + row_idx_t numRowsInTable = 0; + while (numRowsInTable < StorageConstants::NODE_GROUP_SIZE && !recordBatches.empty()) { + auto& currBatch = recordBatches.front(); + auto numRowsInBatch = currBatch->num_rows(); + if (numRowsInTable + numRowsInBatch > StorageConstants::NODE_GROUP_SIZE) { + auto numRowsToAppend = StorageConstants::NODE_GROUP_SIZE - numRowsInTable; + auto slicedBatch = currBatch->Slice(0, (int64_t)numRowsToAppend); + recordBatchesForTable.push_back(slicedBatch); + recordBatches.front() = currBatch->Slice((int64_t)numRowsToAppend); + break; + } + recordBatchesForTable.push_back(std::move(recordBatches[0])); + numRowsInTable += numRowsInBatch; + recordBatches.erase(recordBatches.begin()); + } + TableCopyUtils::throwCopyExceptionIfNotOK( + arrow::Table::FromRecordBatches(recordBatchesForTable).Value(&table)); + return table; +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/copier/rel_copier.cpp b/src/storage/copier/rel_copier.cpp index 293223fe337..97cc1f36e9b 100644 --- a/src/storage/copier/rel_copier.cpp +++ b/src/storage/copier/rel_copier.cpp @@ -25,16 +25,9 @@ void RelCopier::execute(ExecutionContext* executionContext) { if (executionContext->clientContext->isInterrupted()) { throw InterruptException(); } - auto morsel = sharedState->getMorsel(); - if (morsel == nullptr) { - // No more morsels. + if (!executeInternal()) { break; } - executeInternal(std::move(morsel)); - } - { - std::unique_lock xLck{sharedState->mtx}; - sharedState->numRows += numRows; } } @@ -48,21 +41,16 @@ void RelCopier::copyRelColumnsOrCountRelListsSize(row_idx_t rowIdx, arrow::Recor } void RelCopier::indexLookup(arrow::Array* pkArray, const LogicalType& pkColumnType, - PrimaryKeyIndex* pkIndex, offset_t* offsets, const std::string& filePath, - row_idx_t startRowIdxInFile) { + PrimaryKeyIndex* pkIndex, offset_t* offsets) { auto length = pkArray->length(); if (pkArray->null_count() != 0) { for (auto i = 0u; i < length; i++) { if (pkArray->IsNull(i)) { - throw CopyException(StringUtils::string_format( - "NULL found around L{} in file {} violates the non-null " - "constraint of the primary key column.", - (startRowIdxInFile + i), filePath)); + throw CopyException(ExceptionMessage::nullPKException()); } } } std::string errorPKValueStr; - row_idx_t errorPKRowIdx = INVALID_ROW_IDX; switch (pkColumnType.getLogicalTypeID()) { case LogicalTypeID::SERIAL: { for (auto i = 0u; i < length; i++) { @@ -80,7 +68,6 @@ void RelCopier::indexLookup(arrow::Array* pkArray, const LogicalType& pkColumnTy auto val = dynamic_cast(pkArray)->Value(i); if (!pkIndex->lookup(&transaction::DUMMY_READ_TRANSACTION, val, offsets[i])) { errorPKValueStr = std::to_string(val); - errorPKRowIdx = startRowIdxInFile + i; } } } @@ -98,23 +85,17 @@ void RelCopier::indexLookup(arrow::Array* pkArray, const LogicalType& pkColumnTy if (!pkIndex->lookup( &transaction::DUMMY_READ_TRANSACTION, val.c_str(), offsets[i])) { errorPKValueStr = val; - errorPKRowIdx = startRowIdxInFile + i; } } } } break; default: { throw NotImplementedException( - StringUtils::string_format("Invalid primary key column type {}.", - LogicalTypeUtils::dataTypeToString(pkColumnType))); + ExceptionMessage::invalidPKType(LogicalTypeUtils::dataTypeToString(pkColumnType))); } } if (!errorPKValueStr.empty()) { - assert(errorPKRowIdx != INVALID_ROW_IDX); - throw CopyException( - StringUtils::string_format("Primary key column contains value {} around " - "L{} in file {} that does not exist in the table.", - errorPKValueStr, errorPKRowIdx, filePath)); + throw CopyException(ExceptionMessage::nonExistPKException(errorPKValueStr)); } } @@ -298,57 +279,66 @@ void RelListsCounterAndColumnCopier::buildRelListsHeaders( } } -void ParquetRelListsCounterAndColumnsCopier::executeInternal( - std::unique_ptr morsel) { - assert(!morsel->filePath.empty()); - if (!reader || filePath != morsel->filePath) { - reader = TableCopyUtils::createParquetReader(morsel->filePath, schema); - filePath = morsel->filePath; +bool ParquetRelListsCounterAndColumnsCopier::executeInternal() { + auto morsel = sharedState->getParallelMorsel(); + if (morsel->fileIdx == INVALID_VECTOR_IDX) { + // No more morsels. + return false; } - std::shared_ptr table; - TableCopyUtils::throwCopyExceptionIfNotOK( - reader->RowGroup(static_cast(morsel->blockIdx))->ReadTable(&table)); - arrow::TableBatchReader batchReader(*table); - std::shared_ptr recordBatch; - TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ReadNext(&recordBatch)); - auto numRowsInBatch = recordBatch->num_rows(); - std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numRowsInBatch); - adjPKOffsets.resize(numRowsInBatch); - indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], - (offset_t*)boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], - (offset_t*)adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - std::vector> pkOffsetsArrays(2); - pkOffsetsArrays[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); - pkOffsetsArrays[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); - copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), FWD, pkOffsetsArrays); - copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), BWD, pkOffsetsArrays); - numRows += numRowsInBatch; + auto batchVector = readFunc(*morsel->readerFuncData, morsel->blockIdx); + auto startRowIdx = morsel->rowIdx; + for (auto& recordBatch : batchVector) { + auto numRowsInBatch = recordBatch->num_rows(); + std::vector boundPKOffsets, adjPKOffsets; + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); + indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], + (offset_t*)boundPKOffsets.data()); + indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], + (offset_t*)adjPKOffsets.data()); + std::vector> pkOffsetsArrays(2); + pkOffsetsArrays[0] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); + pkOffsetsArrays[1] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); + copyRelColumnsOrCountRelListsSize(startRowIdx, recordBatch.get(), FWD, pkOffsetsArrays); + copyRelColumnsOrCountRelListsSize(startRowIdx, recordBatch.get(), BWD, pkOffsetsArrays); + numRows += numRowsInBatch; + startRowIdx += numRowsInBatch; + } + return true; } -void CSVRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr morsel) { - assert(!morsel->filePath.empty()); - auto csvRelCopyMorsel = reinterpret_cast(morsel.get()); - auto recordBatch = csvRelCopyMorsel->recordTable->CombineChunksToBatch().ValueOrDie(); - auto numRowsInBatch = recordBatch->num_rows(); - std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numRowsInBatch); - adjPKOffsets.resize(numRowsInBatch); - indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], - boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], - adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - std::vector> pkOffsets(2); - pkOffsets[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); - pkOffsets[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); - copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), FWD, pkOffsets); - copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), BWD, pkOffsets); - numRows += numRowsInBatch; +bool CSVRelListsCounterAndColumnsCopier::executeInternal() { + auto morsel = sharedState->getSerialMorsel(); + if (morsel->fileIdx == INVALID_VECTOR_IDX) { + return false; + } + auto serialMorsel = reinterpret_cast(morsel.get()); + arrow::TableBatchReader batchReader(*serialMorsel->table); + arrow::RecordBatchVector batchVector; + TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ToRecordBatches().Value(&batchVector)); + auto startRowIdx = morsel->rowIdx; + for (auto& recordBatch : batchVector) { + auto numRowsInBatch = recordBatch->num_rows(); + std::vector boundPKOffsets, adjPKOffsets; + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); + indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], + boundPKOffsets.data()); + indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], + adjPKOffsets.data()); + std::vector> pkOffsets(2); + pkOffsets[0] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); + pkOffsets[1] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); + copyRelColumnsOrCountRelListsSize(startRowIdx, recordBatch.get(), FWD, pkOffsets); + copyRelColumnsOrCountRelListsSize(startRowIdx, recordBatch.get(), BWD, pkOffsets); + numRows += numRowsInBatch; + startRowIdx += numRowsInBatch; + } + return true; } void RelListsCopier::finalize() { @@ -366,64 +356,75 @@ void RelListsCopier::finalize() { } } -void ParquetRelListsCopier::executeInternal(std::unique_ptr morsel) { - assert(!morsel->filePath.empty()); - if (!reader || filePath != morsel->filePath) { - reader = TableCopyUtils::createParquetReader(morsel->filePath, schema); - filePath = morsel->filePath; - } - std::shared_ptr table; - TableCopyUtils::throwCopyExceptionIfNotOK( - reader->RowGroup(static_cast(morsel->blockIdx))->ReadTable(&table)); - arrow::TableBatchReader batchReader(*table); - std::shared_ptr recordBatch; - TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ReadNext(&recordBatch)); - auto numRowsInBatch = recordBatch->num_rows(); - std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numRowsInBatch); - adjPKOffsets.resize(numRowsInBatch); - indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], - boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], - adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - std::vector> pkOffsets(2); - pkOffsets[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); - pkOffsets[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); - if (!fwdRelData->isColumns) { - copyRelLists(morsel->rowIdx, recordBatch.get(), FWD, pkOffsets); +bool ParquetRelListsCopier::executeInternal() { + auto morsel = sharedState->getParallelMorsel(); + if (morsel->fileIdx == INVALID_VECTOR_IDX) { + // No more morsels. + return false; } - if (!bwdRelData->isColumns) { - copyRelLists(morsel->rowIdx, recordBatch.get(), BWD, pkOffsets); + auto batchVector = readFunc(*morsel->readerFuncData, morsel->blockIdx); + auto startRowIdx = morsel->rowIdx; + for (auto& recordBatch : batchVector) { + auto numRowsInBatch = recordBatch->num_rows(); + std::vector boundPKOffsets, adjPKOffsets; + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); + indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], + boundPKOffsets.data()); + indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], + adjPKOffsets.data()); + std::vector> pkOffsets(2); + pkOffsets[0] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); + pkOffsets[1] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); + if (!fwdRelData->isColumns) { + copyRelLists(startRowIdx, recordBatch.get(), FWD, pkOffsets); + } + if (!bwdRelData->isColumns) { + copyRelLists(startRowIdx, recordBatch.get(), BWD, pkOffsets); + } + numRows += numRowsInBatch; + startRowIdx += numRowsInBatch; } - numRows += numRowsInBatch; + return true; } -void CSVRelListsCopier::executeInternal(std::unique_ptr morsel) { - assert(!morsel->filePath.empty()); - auto csvRelCopyMorsel = reinterpret_cast(morsel.get()); - auto recordBatch = csvRelCopyMorsel->recordTable->CombineChunksToBatch().ValueOrDie(); - auto numRowsInBatch = recordBatch->num_rows(); - std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numRowsInBatch); - adjPKOffsets.resize(numRowsInBatch); - indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], - boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], - adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); - std::vector> pkOffsets(2); - pkOffsets[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); - pkOffsets[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); - if (!fwdRelData->isColumns) { - copyRelLists(morsel->rowIdx, recordBatch.get(), FWD, pkOffsets); +bool CSVRelListsCopier::executeInternal() { + auto morsel = sharedState->getSerialMorsel(); + if (morsel->fileIdx == INVALID_VECTOR_IDX) { + // No more morsels. + return false; } - if (!bwdRelData->isColumns) { - copyRelLists(morsel->rowIdx, recordBatch.get(), BWD, pkOffsets); + auto serialMorsel = reinterpret_cast(morsel.get()); + arrow::TableBatchReader batchReader(*serialMorsel->table); + arrow::RecordBatchVector batchVector; + TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ToRecordBatches().Value(&batchVector)); + auto startRowIdx = morsel->rowIdx; + for (auto& recordBatch : batchVector) { + auto numRowsInBatch = recordBatch->num_rows(); + std::vector boundPKOffsets, adjPKOffsets; + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); + indexLookup(recordBatch->column(0).get(), *schema->getSrcPKDataType(), pkIndexes[0], + boundPKOffsets.data()); + indexLookup(recordBatch->column(1).get(), *schema->getDstPKDataType(), pkIndexes[1], + adjPKOffsets.data()); + std::vector> pkOffsets(2); + pkOffsets[0] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); + pkOffsets[1] = createArrowPrimitiveArray( + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); + if (!fwdRelData->isColumns) { + copyRelLists(startRowIdx, recordBatch.get(), FWD, pkOffsets); + } + if (!bwdRelData->isColumns) { + copyRelLists(startRowIdx, recordBatch.get(), BWD, pkOffsets); + } + numRows += numRowsInBatch; + startRowIdx += numRowsInBatch; } - numRows += numRowsInBatch; + return true; } void RelCopyTask::run() { diff --git a/src/storage/copier/rel_copy_executor.cpp b/src/storage/copier/rel_copy_executor.cpp index ed569fd9aed..f74fe2e711e 100644 --- a/src/storage/copier/rel_copy_executor.cpp +++ b/src/storage/copier/rel_copy_executor.cpp @@ -1,6 +1,7 @@ #include "storage/copier/rel_copy_executor.h" #include "common/string_utils.h" +#include "storage/copier/reader_state.h" using namespace kuzu::common; using namespace kuzu::catalog; @@ -104,37 +105,39 @@ row_idx_t RelCopyExecutor::populateRelLists(processor::ExecutionContext* executi } std::unique_ptr RelCopyExecutor::createRelCopier(RelCopierType relCopierType) { - std::shared_ptr sharedState; + auto readerSharedState = std::make_shared(copyDescription.fileType, + copyDescription.filePaths, *copyDescription.csvReaderConfig, tableSchema); + readerSharedState->validate(); + readerSharedState->countBlocks(); + auto initFunc = ReaderFunctions::getInitDataFunc(copyDescription.fileType); + auto readFunc = ReaderFunctions::getReadRowsFunc(copyDescription.fileType); std::unique_ptr relCopier; switch (copyDescription.fileType) { case CopyDescription::FileType::CSV: { - sharedState = std::make_shared( - copyDescription.filePaths, *copyDescription.csvReaderConfig, tableSchema); switch (relCopierType) { case RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER: { - relCopier = std::make_unique(sharedState, - copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); + relCopier = std::make_unique(readerSharedState, + copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes, + readFunc, initFunc); } break; case RelCopierType::REL_LIST_COPIER: { - relCopier = std::make_unique(std::move(sharedState), copyDescription, - tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); + relCopier = + std::make_unique(std::move(readerSharedState), copyDescription, + tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes, readFunc, initFunc); } break; } } break; case CopyDescription::FileType::PARQUET: { - std::unordered_map fileBlockInfos; - TableCopyUtils::countNumLines(copyDescription, tableSchema, fileBlockInfos); - sharedState = std::make_shared( - copyDescription.filePaths, *copyDescription.csvReaderConfig, tableSchema); - sharedState->fileBlockInfos = std::move(fileBlockInfos); switch (relCopierType) { case RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER: { - relCopier = std::make_unique(sharedState, - copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); + relCopier = std::make_unique(readerSharedState, + copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes, + readFunc, initFunc); } break; case RelCopierType::REL_LIST_COPIER: { - relCopier = std::make_unique(std::move(sharedState), - copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); + relCopier = std::make_unique(std::move(readerSharedState), + copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes, + readFunc, initFunc); } break; } } break; diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp index fa135d30073..416941b9b56 100644 --- a/src/storage/storage_manager.cpp +++ b/src/storage/storage_manager.cpp @@ -29,11 +29,9 @@ std::unique_ptr StorageManager::createMetadataDAHInfo( const common::LogicalType& dataType) { auto metadataDAHInfo = std::make_unique(); metadataDAHInfo->dataDAHPageIdx = InMemDiskArray::addDAHPageToFile( - *metadataFH, StorageStructureID{StorageStructureType::METADATA}, - memoryManager.getBufferManager(), wal); + *metadataFH, memoryManager.getBufferManager(), wal); metadataDAHInfo->nullDAHPageIdx = InMemDiskArray::addDAHPageToFile( - *metadataFH, StorageStructureID{StorageStructureType::METADATA}, - memoryManager.getBufferManager(), wal); + *metadataFH, memoryManager.getBufferManager(), wal); switch (dataType.getPhysicalType()) { case PhysicalTypeID::STRUCT: { auto fields = StructType::getFields(&dataType); @@ -47,8 +45,11 @@ std::unique_ptr StorageManager::createMetadataDAHInfo( createMetadataDAHInfo(*VarListType::getChildType(&dataType))); } break; case PhysicalTypeID::STRING: { - auto dummyChildType = LogicalType{LogicalTypeID::ANY}; - metadataDAHInfo->childrenInfos.push_back(createMetadataDAHInfo(dummyChildType)); + auto childMetadataDAHInfo = std::make_unique(); + childMetadataDAHInfo->dataDAHPageIdx = + InMemDiskArray::addDAHPageToFile( + *metadataFH, memoryManager.getBufferManager(), wal); + metadataDAHInfo->childrenInfos.push_back(std::move(childMetadataDAHInfo)); } break; default: { // DO NOTHING. diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 92e81a29fe8..605c1052f92 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -531,24 +531,24 @@ template class BaseDiskArray; template class BaseDiskArray>; template class BaseDiskArray>; template class BaseDiskArray; -template class BaseDiskArray; +template class BaseDiskArray; template class BaseDiskArray; template class BaseInMemDiskArray; template class BaseInMemDiskArray>; template class BaseInMemDiskArray>; template class BaseInMemDiskArray; -template class BaseInMemDiskArray; +template class BaseInMemDiskArray; template class BaseInMemDiskArray; template class InMemDiskArrayBuilder; template class InMemDiskArrayBuilder>; template class InMemDiskArrayBuilder>; template class InMemDiskArrayBuilder; -template class InMemDiskArrayBuilder; +template class InMemDiskArrayBuilder; template class InMemDiskArray; template class InMemDiskArray>; template class InMemDiskArray>; template class InMemDiskArray; -template class InMemDiskArray; +template class InMemDiskArray; template class InMemDiskArray; } // namespace storage diff --git a/src/storage/store/node_column.cpp b/src/storage/store/node_column.cpp index 0ef0ae22e3f..38ce595df3b 100644 --- a/src/storage/store/node_column.cpp +++ b/src/storage/store/node_column.cpp @@ -46,7 +46,7 @@ void NullNodeColumnFunc::readValuesFromPage(uint8_t* frame, PageElementCursor& p ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) { // Read bit-packed null flags from the frame into the result vector // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. - // Otherwise it could read off the end of the page. + // Otherwise, it could read off the end of the page. resultVector->setNullFromBits( (uint64_t*)frame, pageCursor.elemPosInPage, posInVector, numValuesToRead); } @@ -54,7 +54,7 @@ void NullNodeColumnFunc::readValuesFromPage(uint8_t* frame, PageElementCursor& p void NullNodeColumnFunc::writeValueToPage( uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. - // Otherwise it could read off the end of the page. + // Otherwise, it could read off the end of the page. NullMask::setNull( (uint64_t*)frame, posInFrame, NullMask::isNull(vector->getNullMaskData(), posInVector)); } @@ -63,7 +63,7 @@ void BoolNodeColumnFunc::readValuesFromPage(uint8_t* frame, PageElementCursor& p ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) { // Read bit-packed null flags from the frame into the result vector // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. - // Otherwise it could read off the end of the page. + // Otherwise, it could read off the end of the page. // // Currently, the frame stores bitpacked bools, but the value_vector does not for (auto i = 0; i < numValuesToRead; i++) { @@ -75,7 +75,7 @@ void BoolNodeColumnFunc::readValuesFromPage(uint8_t* frame, PageElementCursor& p void BoolNodeColumnFunc::writeValueToPage( uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. - // Otherwise it could read/write off the end of the page. + // Otherwise, it could read/write off the end of the page. NullMask::copyNullMask( vector->getValue(posInVector) ? &NullMask::ALL_NULL_ENTRY : &NullMask::NO_NULL_ENTRY, posInVector, (uint64_t*)frame, posInFrame, 1); @@ -91,7 +91,7 @@ NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeader transaction::Transaction* transaction, bool requireNullColumn) : storageStructureID{StorageStructureID::newDataID()}, dataType{std::move(dataType)}, dataFH{dataFH}, metadataFH{metadataFH}, bufferManager{bufferManager}, wal{wal} { - metadataDA = std::make_unique>(*metadataFH, + metadataDA = std::make_unique>(*metadataFH, StorageStructureID::newMetadataID(), metaDAHeaderInfo.dataDAHPageIdx, bufferManager, wal, transaction); numBytesPerFixedSizedValue = ColumnChunk::getDataTypeSizeInChunk(this->dataType); @@ -274,9 +274,9 @@ page_idx_t NodeColumn::append( // Main column chunk. page_idx_t numPagesFlushed = 0; auto numPagesForChunk = columnChunk->flushBuffer(dataFH, startPageIdx); + ColumnChunkMetadata metadata{startPageIdx, numPagesForChunk, columnChunk->getNumValues()}; metadataDA->resize(nodeGroupIdx + 1); - metadataDA->update(nodeGroupIdx, - MainColumnChunkMetadata{startPageIdx, numPagesForChunk, columnChunk->getNumValues()}); + metadataDA->update(nodeGroupIdx, metadata); numPagesFlushed += numPagesForChunk; startPageIdx += numPagesForChunk; // Null column chunk. @@ -444,7 +444,7 @@ page_idx_t NullNodeColumn::append( auto numPagesFlushed = columnChunk->flushBuffer(dataFH, startPageIdx); metadataDA->resize(nodeGroupIdx + 1); metadataDA->update(nodeGroupIdx, - MainColumnChunkMetadata{startPageIdx, numPagesFlushed, columnChunk->getNumValues()}); + ColumnChunkMetadata{startPageIdx, numPagesFlushed, columnChunk->getNumValues()}); return numPagesFlushed; } @@ -477,6 +477,7 @@ void SerialNodeColumn::scan( for (auto i = 0ul; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; auto offset = nodeIDVector->readNodeOffset(pos); + assert(!resultVector->isNull(pos)); resultVector->setValue(pos, offset); } } @@ -502,8 +503,8 @@ std::unique_ptr NodeColumnFactory::createNodeColumn(const LogicalTyp BufferManager* bufferManager, WAL* wal, Transaction* transaction) { switch (dataType.getLogicalTypeID()) { case LogicalTypeID::BOOL: { - return std::make_unique( - metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, true); + return std::make_unique(metaDAHeaderInfo, dataFH, metadataFH, bufferManager, + wal, transaction, true /* requireNullColumn */); } case LogicalTypeID::INT64: case LogicalTypeID::INT32: @@ -515,8 +516,8 @@ std::unique_ptr NodeColumnFactory::createNodeColumn(const LogicalTyp case LogicalTypeID::INTERVAL: case LogicalTypeID::INTERNAL_ID: case LogicalTypeID::FIXED_LIST: { - return std::make_unique( - dataType, metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, true); + return std::make_unique(dataType, metaDAHeaderInfo, dataFH, metadataFH, + bufferManager, wal, transaction, true /* requireNullColumn */); } case LogicalTypeID::BLOB: case LogicalTypeID::STRING: { diff --git a/src/storage/store/var_list_node_column.cpp b/src/storage/store/var_list_node_column.cpp index 670e3458d9a..0c7d8369ad5 100644 --- a/src/storage/store/var_list_node_column.cpp +++ b/src/storage/store/var_list_node_column.cpp @@ -152,7 +152,7 @@ ListOffsetInfoInStorage VarListNodeColumn::getListOffsetInfoInStorage(Transactio node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup, std::shared_ptr state) { auto offsetVector = std::make_unique(LogicalTypeID::INT64); - offsetVector->setState(state); + offsetVector->setState(std::move(state)); NodeColumn::scan(transaction, nodeGroupIdx, startOffsetInNodeGroup, endOffsetInNodeGroup, offsetVector.get()); auto prevNodeListOffsetInStorage = diff --git a/test/test_files/exceptions/copy/invalid_row.test b/test/test_files/exceptions/copy/invalid_row.test index c6a9607abad..d775230a311 100644 --- a/test/test_files/exceptions/copy/invalid_row.test +++ b/test/test_files/exceptions/copy/invalid_row.test @@ -20,4 +20,4 @@ ---- ok -STATEMENT COPY watch FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/invalid-row/eWatches.csv" ---- error -Copy exception: Primary key column contains value 5 around L2 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/invalid-row/eWatches.csv that does not exist in the table. +Copy exception: Found non-existed primary key value 5. diff --git a/test/test_files/exceptions/copy/null_pk.test b/test/test_files/exceptions/copy/null_pk.test index 6291c6189f5..98cfa22b038 100644 --- a/test/test_files/exceptions/copy/null_pk.test +++ b/test/test_files/exceptions/copy/null_pk.test @@ -20,4 +20,4 @@ Copy exception: Found NULL, which violates the non-null constraint of the primar ---- ok -STATEMENT COPY like FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/eLikes.csv" ---- error -Copy exception: NULL found around L3 in file ${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/null-pk/eLikes.csv violates the non-null constraint of the primary key column. +Copy exception: Found NULL, which violates the non-null constraint of the primary key column. diff --git a/test/test_files/tinysnb/explain/explain.test b/test/test_files/tinysnb/explain/explain.test index 209a520fd32..03dc94d2a05 100644 --- a/test/test_files/tinysnb/explain/explain.test +++ b/test/test_files/tinysnb/explain/explain.test @@ -1,5 +1,5 @@ -GROUP TinySnbReadTest --DATASET CSV tinysnb +-DATASET CSV empty --