diff --git a/src/binder/bind/bind_copy.cpp b/src/binder/bind/bind_copy.cpp index 7ee18ff155..2de4bac81f 100644 --- a/src/binder/bind/bind_copy.cpp +++ b/src/binder/bind/bind_copy.cpp @@ -11,15 +11,15 @@ namespace kuzu { namespace binder { std::unique_ptr Binder::bindCopyClause(const Statement& statement) { - auto& copyCSV = (Copy&)statement; + auto& copyStatement = (Copy&)statement; auto catalogContent = catalog.getReadOnlyVersion(); - auto tableName = copyCSV.getTableName(); + auto tableName = copyStatement.getTableName(); validateTableExist(catalog, tableName); auto tableID = catalogContent->getTableID(tableName); - auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions()); - auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths()); + auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions()); + auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths()); auto actualFileType = bindFileType(boundFilePaths); - auto expectedFileType = copyCSV.getFileType(); + auto expectedFileType = copyStatement.getFileType(); if (expectedFileType == common::CopyDescription::FileType::UNKNOWN && actualFileType == common::CopyDescription::FileType::NPY) { throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files."); diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index 07b5a37cd6..3e9d018a5f 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -170,15 +170,11 @@ namespace kuzu { namespace catalog { CatalogContent::CatalogContent() : nextTableID{0} { - logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG); registerBuiltInFunctions(); } CatalogContent::CatalogContent(const std::string& directory) { - logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG); - logger->info("Initializing catalog."); readFromFile(directory, DBFileType::ORIGINAL); - logger->info("Initializing catalog done."); registerBuiltInFunctions(); } @@ -295,7 +291,6 @@ void CatalogContent::saveToFile(const std::string& directory, DBFileType dbFileT void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFileType) { auto catalogPath = StorageUtils::getCatalogFilePath(directory, dbFileType); - logger->debug("Reading from {}.", catalogPath); auto fileInfo = FileUtils::openFile(catalogPath, O_RDONLY); uint64_t offset = 0; validateMagicBytes(fileInfo.get(), offset); diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h index 4f24dd2353..76a9989c2a 100644 --- a/src/include/catalog/catalog.h +++ b/src/include/catalog/catalog.h @@ -16,10 +16,6 @@ #include "storage/wal/wal.h" #include "transaction/transaction.h" -namespace spdlog { -class logger; -} - namespace kuzu { namespace catalog { @@ -27,15 +23,12 @@ class CatalogContent { friend class Catalog; public: - // This constructor is only used for mock catalog testing only. CatalogContent(); explicit CatalogContent(const std::string& directory); CatalogContent(const CatalogContent& other); - virtual ~CatalogContent() = default; - /** * Node and Rel table functions. */ @@ -164,7 +157,6 @@ class CatalogContent { void registerBuiltInFunctions(); private: - std::shared_ptr logger; std::unordered_map> nodeTableSchemas; std::unordered_map> relTableSchemas; // These two maps are maintained as caches. They are not serialized to the catalog file, but @@ -184,8 +176,6 @@ class Catalog { explicit Catalog(storage::WAL* wal); - virtual ~Catalog() = default; - // TODO(Guodong): Get rid of these two functions. inline CatalogContent* getReadOnlyVersion() const { return catalogContentForReadOnlyTrx.get(); } inline CatalogContent* getWriteVersion() const { return catalogContentForWriteTrx.get(); } diff --git a/src/include/planner/logical_plan/logical_operator/logical_copy.h b/src/include/planner/logical_plan/logical_operator/logical_copy.h index a73710384b..f962cc59ca 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_copy.h +++ b/src/include/planner/logical_plan/logical_operator/logical_copy.h @@ -11,18 +11,16 @@ class LogicalCopy : public LogicalOperator { public: LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID, - std::string tableName, binder::expression_vector arrowColumnExpressions, + std::string tableName, binder::expression_vector dataColumnExpressions, std::shared_ptr rowIdxExpression, std::shared_ptr filePathExpression, - std::shared_ptr columnIdxExpression, std::shared_ptr outputExpression) - : LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription}, - tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move( - arrowColumnExpressions)}, - rowIdxExpression{std::move(rowIdxExpression)}, filePathExpression{std::move( - filePathExpression)}, - columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{ - std::move(outputExpression)} {} + : LogicalOperator{LogicalOperatorType::COPY}, + copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)}, + dataColumnExpressions{std::move(dataColumnExpressions)}, rowIdxExpression{std::move( + rowIdxExpression)}, + filePathExpression{std::move(filePathExpression)}, outputExpression{ + std::move(outputExpression)} {} inline std::string getExpressionsForPrinting() const override { return tableName; } @@ -30,8 +28,8 @@ class LogicalCopy : public LogicalOperator { inline common::table_id_t getTableID() const { return tableID; } - inline std::vector> getArrowColumnExpressions() const { - return arrowColumnExpressions; + inline std::vector> getDataColumnExpressions() const { + return dataColumnExpressions; } inline std::shared_ptr getRowIdxExpression() const { @@ -42,10 +40,6 @@ class LogicalCopy : public LogicalOperator { return filePathExpression; } - inline std::shared_ptr getColumnIdxExpression() const { - return columnIdxExpression; - } - inline std::shared_ptr getOutputExpression() const { return outputExpression; } @@ -54,8 +48,8 @@ class LogicalCopy : public LogicalOperator { void computeFlatSchema() override; inline std::unique_ptr copy() override { - return make_unique(copyDescription, tableID, tableName, arrowColumnExpressions, - rowIdxExpression, filePathExpression, columnIdxExpression, outputExpression); + return make_unique(copyDescription, tableID, tableName, dataColumnExpressions, + rowIdxExpression, filePathExpression, outputExpression); } private: @@ -63,10 +57,9 @@ class LogicalCopy : public LogicalOperator { common::table_id_t tableID; // Used for printing only. std::string tableName; - binder::expression_vector arrowColumnExpressions; + binder::expression_vector dataColumnExpressions; std::shared_ptr rowIdxExpression; std::shared_ptr filePathExpression; - std::shared_ptr columnIdxExpression; std::shared_ptr outputExpression; }; diff --git a/src/include/processor/operator/copy/copy_node.h b/src/include/processor/operator/copy/copy_node.h index bb3f14ab68..a6f2462ca5 100644 --- a/src/include/processor/operator/copy/copy_node.h +++ b/src/include/processor/operator/copy/copy_node.h @@ -36,7 +36,7 @@ class CopyNodeSharedState { struct CopyNodeDataInfo { DataPos rowIdxVectorPos; DataPos filePathVectorPos; - std::vector arrowColumnPoses; + std::vector dataColumnPoses; }; class CopyNode : public Sink { @@ -62,8 +62,8 @@ class CopyNode : public Sink { inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override { rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get(); filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get(); - for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) { - arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); + for (auto& arrowColumnPos : copyNodeDataInfo.dataColumnPoses) { + dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); } } @@ -124,7 +124,7 @@ class CopyNode : public Sink { storage::WAL* wal; common::ValueVector* rowIdxVector; common::ValueVector* filePathVector; - std::vector arrowColumnVectors; + std::vector dataColumnVectors; std::vector> copyStates; }; diff --git a/src/include/processor/operator/copy/copy_npy_node.h b/src/include/processor/operator/copy/copy_npy_node.h deleted file mode 100644 index 1d4416f125..0000000000 --- a/src/include/processor/operator/copy/copy_npy_node.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include "processor/operator/copy/copy_node.h" - -namespace kuzu { -namespace processor { - -class CopyNPYNode : public CopyNode { -public: - CopyNPYNode(std::shared_ptr sharedState, CopyNodeDataInfo copyNodeDataInfo, - const DataPos& columnIdxPos, const common::CopyDescription& copyDesc, - storage::NodeTable* table, storage::RelsStore* relsStore, catalog::Catalog* catalog, - storage::WAL* wal, std::unique_ptr resultSetDescriptor, - std::unique_ptr child, uint32_t id, const std::string& paramsString) - : CopyNode{std::move(sharedState), std::move(copyNodeDataInfo), copyDesc, table, relsStore, - catalog, wal, std::move(resultSetDescriptor), std::move(child), id, paramsString}, - columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {} - - void executeInternal(ExecutionContext* context) final; - - inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final { - CopyNode::initLocalStateInternal(resultSet, context); - columnIdxVector = resultSet->getValueVector(columnIdxPos).get(); - } - - void flushChunksAndPopulatePKIndexSingleColumn( - std::vector>& columnChunks, - common::offset_t startNodeOffset, common::offset_t endNodeOffset, - common::vector_idx_t columnToCopy, const std::string& filePath, - common::row_idx_t startRowIdxInFile); - - inline std::unique_ptr clone() final { - return std::make_unique(sharedState, copyNodeDataInfo, columnIdxPos, copyDesc, - table, relsStore, catalog, wal, resultSetDescriptor->copy(), children[0]->clone(), id, - paramsString); - } - -private: - DataPos columnIdxPos; - common::ValueVector* columnIdxVector; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/copy/read_csv.h b/src/include/processor/operator/copy/read_csv.h index c927cea572..629dd8517c 100644 --- a/src/include/processor/operator/copy/read_csv.h +++ b/src/include/processor/operator/copy/read_csv.h @@ -8,10 +8,10 @@ namespace processor { class ReadCSV : public ReadFile { public: ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, + std::vector dataColumnPoses, std::shared_ptr sharedState, uint32_t id, const std::string& paramsString) - : ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses), + : ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses), std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {} inline std::shared_ptr readTuples( @@ -22,7 +22,7 @@ class ReadCSV : public ReadFile { inline std::unique_ptr clone() override { return std::make_unique( - rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString); + rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString); } }; diff --git a/src/include/processor/operator/copy/read_file.h b/src/include/processor/operator/copy/read_file.h index 508bf87bb0..87373c2bd6 100644 --- a/src/include/processor/operator/copy/read_file.h +++ b/src/include/processor/operator/copy/read_file.h @@ -9,11 +9,11 @@ namespace processor { class ReadFile : public PhysicalOperator { public: ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, + std::vector dataColumnPoses, std::shared_ptr sharedState, PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString) : PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos}, - filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)}, + filePathVectorPos{filePathVectorPos}, dataColumnPoses{std::move(dataColumnPoses)}, sharedState{std::move(sharedState)}, rowIdxVector{nullptr}, filePathVector{nullptr} {} void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; @@ -24,6 +24,7 @@ class ReadFile : public PhysicalOperator { inline bool isSource() const override { return true; } +protected: virtual std::shared_ptr readTuples( std::unique_ptr morsel) = 0; @@ -33,10 +34,10 @@ class ReadFile : public PhysicalOperator { std::shared_ptr sharedState; DataPos rowIdxVectorPos; DataPos filePathVectorPos; - std::vector arrowColumnPoses; + std::vector dataColumnPoses; common::ValueVector* rowIdxVector; common::ValueVector* filePathVector; - std::vector arrowColumnVectors; + std::vector dataColumnVectors; }; } // namespace processor diff --git a/src/include/processor/operator/copy/read_npy.h b/src/include/processor/operator/copy/read_npy.h index aaf6b3e135..23f0e9fdd1 100644 --- a/src/include/processor/operator/copy/read_npy.h +++ b/src/include/processor/operator/copy/read_npy.h @@ -10,29 +10,24 @@ namespace processor { class ReadNPY : public ReadFile { public: ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, const DataPos& columnIdxPos, + std::vector dataColumnPoses, std::shared_ptr sharedState, uint32_t id, const std::string& paramsString) - : ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses), - std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString}, - columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {} + : ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses), + std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString} { + reader = std::make_unique(this->sharedState->filePaths); + } std::shared_ptr readTuples( std::unique_ptr morsel) final; - bool getNextTuplesInternal(ExecutionContext* context) final; - - void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final; - inline std::unique_ptr clone() final { - return std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, - columnIdxPos, sharedState, id, paramsString); + return std::make_unique( + rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString); } private: - std::unique_ptr reader; - DataPos columnIdxPos; - common::ValueVector* columnIdxVector; + std::unique_ptr reader; }; } // namespace processor diff --git a/src/include/processor/operator/copy/read_parquet.h b/src/include/processor/operator/copy/read_parquet.h index 4afa72fae6..76b7f56d99 100644 --- a/src/include/processor/operator/copy/read_parquet.h +++ b/src/include/processor/operator/copy/read_parquet.h @@ -8,10 +8,10 @@ namespace processor { class ReadParquet : public ReadFile { public: ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, + std::vector dataColumnPoses, std::shared_ptr sharedState, uint32_t id, const std::string& paramsString) - : ReadFile{offsetVectorPos, filePathVectorPos, std::move(arrowColumnPoses), + : ReadFile{offsetVectorPos, filePathVectorPos, std::move(dataColumnPoses), std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {} std::shared_ptr readTuples( @@ -19,7 +19,7 @@ class ReadParquet : public ReadFile { inline std::unique_ptr clone() override { return std::make_unique( - rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString); + rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString); } private: diff --git a/src/include/storage/copier/npy_reader.h b/src/include/storage/copier/npy_reader.h index 9d1776d984..e639a0df0e 100644 --- a/src/include/storage/copier/npy_reader.h +++ b/src/include/storage/copier/npy_reader.h @@ -54,5 +54,15 @@ class NpyReader { static inline const std::string defaultFieldName = "NPY_FIELD"; }; +class NpyMultiFileReader { +public: + explicit NpyMultiFileReader(const std::vector& filePaths); + + std::shared_ptr readBlock(common::block_idx_t blockIdx) const; + +private: + std::vector> fileReaders; +}; + } // namespace storage } // namespace kuzu diff --git a/src/include/storage/copier/read_file_state.h b/src/include/storage/copier/read_file_state.h index b8e1d031ce..d355509b53 100644 --- a/src/include/storage/copier/read_file_state.h +++ b/src/include/storage/copier/read_file_state.h @@ -35,19 +35,6 @@ class ReadCSVMorsel : public ReadFileMorsel { std::shared_ptr recordBatch; }; -class ReadNPYMorsel : public ReadFileMorsel { -public: - ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows, - std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile) - : ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile}, - columnIdx{curFileIdx} {} - - inline common::vector_idx_t getColumnIdx() const { return columnIdx; } - -private: - common::vector_idx_t columnIdx; -}; - class ReadFileSharedState { public: ReadFileSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, diff --git a/src/include/storage/index/hash_index.h b/src/include/storage/index/hash_index.h index 6bb62795f4..512683ffac 100644 --- a/src/include/storage/index/hash_index.h +++ b/src/include/storage/index/hash_index.h @@ -93,7 +93,7 @@ class HashIndex : public BaseHashIndex { void prepareCommit(); void prepareRollback(); void checkpointInMemory(); - void rollback() const; + void rollbackInMemory() const; inline BMFileHandle* getFileHandle() const { return fileHandle.get(); } private: @@ -183,9 +183,9 @@ class PrimaryKeyIndex { keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->checkpointInMemory() : hashIndexForString->checkpointInMemory(); } - inline void rollback() { - keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->rollback() : - hashIndexForString->rollback(); + inline void rollbackInMemory() { + keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->rollbackInMemory() : + hashIndexForString->rollbackInMemory(); } inline void prepareCommit() { keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->prepareCommit() : diff --git a/src/include/storage/storage_manager.h b/src/include/storage/storage_manager.h index 220232dcb6..c238d17d94 100644 --- a/src/include/storage/storage_manager.h +++ b/src/include/storage/storage_manager.h @@ -34,9 +34,9 @@ class StorageManager { nodesStore->checkpointInMemory(wal->updatedNodeTables); relsStore->checkpointInMemory(wal->updatedRelTables); } - inline void rollback() { - nodesStore->rollback(wal->updatedNodeTables); - relsStore->rollback(wal->updatedRelTables); + inline void rollbackInMemory() { + nodesStore->rollbackInMemory(wal->updatedNodeTables); + relsStore->rollbackInMemory(wal->updatedRelTables); } inline std::string getDirectory() const { return wal->getDirectory(); } inline WAL* getWAL() const { return wal; } diff --git a/src/include/storage/storage_structure/in_mem_file.h b/src/include/storage/storage_structure/in_mem_file.h index ed520665ee..badf174fb4 100644 --- a/src/include/storage/storage_structure/in_mem_file.h +++ b/src/include/storage/storage_structure/in_mem_file.h @@ -70,7 +70,7 @@ class InMemOverflowFile : public InMemFile { // Copy overflow data at srcOverflow into dstKUString. void copyStringOverflow( - PageByteCursor& overflowCursor, uint8_t* srcOverflow, common::ku_string_t* dstKUString); + PageByteCursor& dstOverflowCursor, uint8_t* srcOverflow, common::ku_string_t* dstKUString); void copyListOverflowFromFile(InMemOverflowFile* srcInMemOverflowFile, const PageByteCursor& srcOverflowCursor, PageByteCursor& dstOverflowCursor, common::ku_list_t* dstKUList, common::LogicalType* listChildDataType); diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index 9ebdfaa0b8..7fdfd070d7 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -57,7 +57,7 @@ class NodeTable { void prepareCommit(); void prepareRollback(); inline void checkpointInMemory() { pkIndex->checkpointInMemory(); } - inline void rollback() { pkIndex->rollback(); } + inline void rollbackInMemory() { pkIndex->rollbackInMemory(); } private: void deleteNode( diff --git a/src/include/storage/store/nodes_store.h b/src/include/storage/store/nodes_store.h index 54905e485c..c167cb6180 100644 --- a/src/include/storage/store/nodes_store.h +++ b/src/include/storage/store/nodes_store.h @@ -60,9 +60,9 @@ class NodesStore { nodeTables.at(updatedNodeTable)->checkpointInMemory(); } } - inline void rollback(const std::unordered_set& updatedTables) { + inline void rollbackInMemory(const std::unordered_set& updatedTables) { for (auto updatedNodeTable : updatedTables) { - nodeTables.at(updatedNodeTable)->rollback(); + nodeTables.at(updatedNodeTable)->rollbackInMemory(); } } diff --git a/src/include/storage/store/rel_table.h b/src/include/storage/store/rel_table.h index c967b8251f..2a04043296 100644 --- a/src/include/storage/store/rel_table.h +++ b/src/include/storage/store/rel_table.h @@ -214,7 +214,7 @@ class RelTable { void prepareCommit(); void prepareRollback(); void checkpointInMemory(); - void rollback(); + void rollbackInMemory(); void insertRel(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector, const std::vector& relPropertyVectors); diff --git a/src/include/storage/store/rels_store.h b/src/include/storage/store/rels_store.h index 0c46f40b50..7015c144bd 100644 --- a/src/include/storage/store/rels_store.h +++ b/src/include/storage/store/rels_store.h @@ -77,9 +77,9 @@ class RelsStore { relTables.at(updatedTableID)->checkpointInMemory(); } } - inline void rollback(const std::unordered_set& updatedTables) { + inline void rollbackInMemory(const std::unordered_set& updatedTables) { for (auto updatedTableID : updatedTables) { - relTables.at(updatedTableID)->rollback(); + relTables.at(updatedTableID)->rollbackInMemory(); } } diff --git a/src/planner/operator/logical_copy.cpp b/src/planner/operator/logical_copy.cpp index f07790c4be..5ceb963d85 100644 --- a/src/planner/operator/logical_copy.cpp +++ b/src/planner/operator/logical_copy.cpp @@ -6,10 +6,9 @@ namespace planner { void LogicalCopy::computeFactorizedSchema() { createEmptySchema(); auto groupPos = schema->createGroup(); - schema->insertToGroupAndScope(arrowColumnExpressions, groupPos); + schema->insertToGroupAndScope(dataColumnExpressions, groupPos); schema->insertToGroupAndScope(rowIdxExpression, groupPos); schema->insertToGroupAndScope(filePathExpression, groupPos); - schema->insertToGroupAndScope(columnIdxExpression, groupPos); schema->insertToGroupAndScope(outputExpression, groupPos); schema->setGroupAsSingleState(groupPos); } @@ -17,10 +16,9 @@ void LogicalCopy::computeFactorizedSchema() { void LogicalCopy::computeFlatSchema() { createEmptySchema(); schema->createGroup(); - schema->insertToGroupAndScope(arrowColumnExpressions, 0); + schema->insertToGroupAndScope(dataColumnExpressions, 0); schema->insertToGroupAndScope(rowIdxExpression, 0); schema->insertToGroupAndScope(filePathExpression, 0); - schema->insertToGroupAndScope(columnIdxExpression, 0); schema->insertToGroupAndScope(outputExpression, 0); } diff --git a/src/planner/planner.cpp b/src/planner/planner.cpp index 409c3d1635..c77e599491 100644 --- a/src/planner/planner.cpp +++ b/src/planner/planner.cpp @@ -186,8 +186,6 @@ std::unique_ptr Planner::planCopy( common::LogicalType{common::LogicalTypeID::INT64}, "rowIdx", "rowIdx"), std::make_shared( common::LogicalType{common::LogicalTypeID::STRING}, "filePath", "filePath"), - std::make_shared( - common::LogicalType{common::LogicalTypeID::INT64}, "columnIdx", "columnIdx"), copyClause.getStatementResult()->getSingleExpressionToCollect()); plan->setLastOperator(std::move(copy)); return plan; diff --git a/src/processor/mapper/map_copy.cpp b/src/processor/mapper/map_copy.cpp index 06f454bfd6..90cfdec13d 100644 --- a/src/processor/mapper/map_copy.cpp +++ b/src/processor/mapper/map_copy.cpp @@ -1,7 +1,6 @@ #include "planner/logical_plan/logical_operator/logical_copy.h" #include "processor/mapper/plan_mapper.h" #include "processor/operator/copy/copy_node.h" -#include "processor/operator/copy/copy_npy_node.h" #include "processor/operator/copy/copy_rel.h" #include "processor/operator/copy/read_csv.h" #include "processor/operator/copy/read_file.h" @@ -36,22 +35,21 @@ std::unique_ptr PlanMapper::mapLogicalCopyNodeToPhysical(Logic std::unique_ptr readFile; std::shared_ptr readFileSharedState; auto outSchema = copy->getSchema(); - auto arrowColumnExpressions = copy->getArrowColumnExpressions(); - std::vector arrowColumnPoses; - arrowColumnPoses.reserve(arrowColumnExpressions.size()); - for (auto& arrowColumnPos : arrowColumnExpressions) { - arrowColumnPoses.emplace_back(outSchema->getExpressionPos(*arrowColumnPos)); + auto dataColumnExpressions = copy->getDataColumnExpressions(); + std::vector dataColumnPoses; + dataColumnPoses.reserve(dataColumnExpressions.size()); + for (auto& dataColumnExpr : dataColumnExpressions) { + dataColumnPoses.emplace_back(outSchema->getExpressionPos(*dataColumnExpr)); } auto rowIdxVectorPos = DataPos(outSchema->getExpressionPos(*copy->getRowIdxExpression())); auto filePathVectorPos = DataPos(outSchema->getExpressionPos(*copy->getFilePathExpression())); - auto columnIdxPos = DataPos(outSchema->getExpressionPos(*copy->getColumnIdxExpression())); auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()); switch (copy->getCopyDescription().fileType) { case (common::CopyDescription::FileType::CSV): { readFileSharedState = std::make_shared(copy->getCopyDescription().filePaths, *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); - readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, + readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, dataColumnPoses, readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); } break; case (common::CopyDescription::FileType::PARQUET): { @@ -59,15 +57,15 @@ std::unique_ptr PlanMapper::mapLogicalCopyNodeToPhysical(Logic std::make_shared(copy->getCopyDescription().filePaths, *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); readFile = - std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, + std::make_unique(rowIdxVectorPos, filePathVectorPos, dataColumnPoses, readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); } break; case (common::CopyDescription::FileType::NPY): { readFileSharedState = std::make_shared(copy->getCopyDescription().filePaths, *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); - readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, - columnIdxPos, readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); + readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, dataColumnPoses, + readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); } break; default: throw common::NotImplementedException("PlanMapper::mapLogicalCopyNodeToPhysical"); @@ -75,22 +73,12 @@ std::unique_ptr PlanMapper::mapLogicalCopyNodeToPhysical(Logic auto copyNodeSharedState = std::make_shared(readFileSharedState->numRows, memoryManager); std::unique_ptr copyNode; - CopyNodeDataInfo copyNodeDataInfo{rowIdxVectorPos, filePathVectorPos, arrowColumnPoses}; - if (copy->getCopyDescription().fileType == common::CopyDescription::FileType::NPY) { - copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, - columnIdxPos, copy->getCopyDescription(), - storageManager.getNodesStore().getNodeTable(copy->getTableID()), - &storageManager.getRelsStore(), catalog, storageManager.getWAL(), - std::make_unique(copy->getSchema()), std::move(readFile), - getOperatorID(), copy->getExpressionsForPrinting()); - } else { - copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, - copy->getCopyDescription(), - storageManager.getNodesStore().getNodeTable(copy->getTableID()), - &storageManager.getRelsStore(), catalog, storageManager.getWAL(), - std::make_unique(copy->getSchema()), std::move(readFile), - getOperatorID(), copy->getExpressionsForPrinting()); - } + CopyNodeDataInfo copyNodeDataInfo{rowIdxVectorPos, filePathVectorPos, dataColumnPoses}; + copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, + copy->getCopyDescription(), storageManager.getNodesStore().getNodeTable(copy->getTableID()), + &storageManager.getRelsStore(), catalog, storageManager.getWAL(), + std::make_unique(copy->getSchema()), std::move(readFile), + getOperatorID(), copy->getExpressionsForPrinting()); auto outputExpressions = binder::expression_vector{copy->getOutputExpression()}; return createFactorizedTableScan( outputExpressions, outSchema, copyNodeSharedState->table, std::move(copyNode)); diff --git a/src/processor/operator/copy/CMakeLists.txt b/src/processor/operator/copy/CMakeLists.txt index a4b2fd055f..c44e8d05fe 100644 --- a/src/processor/operator/copy/CMakeLists.txt +++ b/src/processor/operator/copy/CMakeLists.txt @@ -3,7 +3,6 @@ add_library(kuzu_processor_operator_copy copy.cpp copy_rel.cpp copy_node.cpp - copy_npy_node.cpp read_file.cpp read_parquet.cpp read_npy.cpp) diff --git a/src/processor/operator/copy/copy_node.cpp b/src/processor/operator/copy/copy_node.cpp index f97c7ce61e..b925cd0b78 100644 --- a/src/processor/operator/copy/copy_node.cpp +++ b/src/processor/operator/copy/copy_node.cpp @@ -52,7 +52,7 @@ void CopyNodeSharedState::initializeColumns( std::pair CopyNode::getStartAndEndRowIdx(common::vector_idx_t columnIdx) { auto startRowIdx = rowIdxVector->getValue(rowIdxVector->state->selVector->selectedPositions[0]); - auto numRows = ArrowColumnVector::getArrowColumn(arrowColumnVectors[columnIdx])->length(); + auto numRows = ArrowColumnVector::getArrowColumn(dataColumnVectors[columnIdx])->length(); auto endRowIdx = startRowIdx + numRows - 1; return {startRowIdx, endRowIdx}; } @@ -76,7 +76,7 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) { auto columnChunk = sharedState->columns[i]->createInMemColumnChunk(startRowIdx, endRowIdx, ©Desc); columnChunk->copyArrowArray( - *ArrowColumnVector::getArrowColumn(arrowColumnVectors[i]), copyStates[i].get()); + *ArrowColumnVector::getArrowColumn(dataColumnVectors[i]), copyStates[i].get()); columnChunks.push_back(std::move(columnChunk)); } flushChunksAndPopulatePKIndex( diff --git a/src/processor/operator/copy/copy_npy_node.cpp b/src/processor/operator/copy/copy_npy_node.cpp deleted file mode 100644 index b67703a94d..0000000000 --- a/src/processor/operator/copy/copy_npy_node.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "processor/operator/copy/copy_npy_node.h" - -using namespace kuzu::common; -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -void CopyNPYNode::executeInternal(ExecutionContext* context) { - logCopyWALRecord(); - while (children[0]->getNextTuple(context)) { - std::vector> columnChunks; - columnChunks.reserve(sharedState->columns.size()); - auto columnToCopy = columnIdxVector->getValue( - columnIdxVector->state->selVector->selectedPositions[0]); - auto [startOffset, endOffset] = getStartAndEndRowIdx(columnToCopy); - auto [filePath, startRowIdxInFile] = getFilePathAndRowIdxInFile(); - auto columnChunk = sharedState->columns[columnToCopy]->createInMemColumnChunk( - startOffset, endOffset, ©Desc); - columnChunk->copyArrowArray( - *ArrowColumnVector::getArrowColumn(arrowColumnVectors[columnToCopy]), - copyStates[columnToCopy].get()); - columnChunks.push_back(std::move(columnChunk)); - flushChunksAndPopulatePKIndexSingleColumn( - columnChunks, startOffset, endOffset, columnToCopy, filePath, startRowIdxInFile); - } -} - -void CopyNPYNode::flushChunksAndPopulatePKIndexSingleColumn( - std::vector>& columnChunks, offset_t startNodeOffset, - offset_t endNodeOffset, vector_idx_t columnToCopy, const std::string& filePath, - row_idx_t startRowIdxInFile) { - sharedState->columns[columnToCopy]->flushChunk(columnChunks[0].get()); - if (sharedState->pkIndex && columnToCopy == sharedState->pkColumnID) { - populatePKIndex(columnChunks[0].get(), - sharedState->columns[columnToCopy]->getInMemOverflowFile(), startNodeOffset, - (endNodeOffset - startNodeOffset + 1), filePath, startRowIdxInFile); - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/copy/read_file.cpp b/src/processor/operator/copy/read_file.cpp index 105e6aba9c..f6f3ad046f 100644 --- a/src/processor/operator/copy/read_file.cpp +++ b/src/processor/operator/copy/read_file.cpp @@ -6,8 +6,8 @@ namespace processor { void ReadFile::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { rowIdxVector = resultSet->getValueVector(rowIdxVectorPos).get(); filePathVector = resultSet->getValueVector(filePathVectorPos).get(); - for (auto& arrowColumnPos : arrowColumnPoses) { - arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); + for (auto& arrowColumnPos : dataColumnPoses) { + dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); } } @@ -23,9 +23,9 @@ bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) filePathVector->setValue( rowIdxVector->state->selVector->selectedPositions[0], morsel->filePath); auto recordBatch = readTuples(std::move(morsel)); - for (auto i = 0u; i < arrowColumnVectors.size(); i++) { + for (auto i = 0u; i < dataColumnVectors.size(); i++) { common::ArrowColumnVector::setArrowColumn( - arrowColumnVectors[i], recordBatch->column((int)i)); + dataColumnVectors[i], recordBatch->column((int)i)); } return true; } diff --git a/src/processor/operator/copy/read_npy.cpp b/src/processor/operator/copy/read_npy.cpp index 33e6f4dc6b..dd598623d5 100644 --- a/src/processor/operator/copy/read_npy.cpp +++ b/src/processor/operator/copy/read_npy.cpp @@ -9,34 +9,7 @@ namespace kuzu { namespace processor { std::shared_ptr ReadNPY::readTuples(std::unique_ptr morsel) { - assert(!morsel->filePath.empty()); - if (!reader || reader->getFilePath() != morsel->filePath) { - reader = std::make_unique(morsel->filePath); - } - auto batch = reader->readBlock(morsel->blockIdx); - return batch; -} - -bool ReadNPY::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) { - auto sharedStateNPY = reinterpret_cast(sharedState.get()); - auto morsel = sharedStateNPY->getMorsel(); - if (morsel == nullptr) { - return false; - } - auto npyMorsel = reinterpret_cast(morsel.get()); - auto startRowIdx = npyMorsel->rowIdx; - auto columnIdx = npyMorsel->getColumnIdx(); - rowIdxVector->setValue(rowIdxVector->state->selVector->selectedPositions[0], startRowIdx); - columnIdxVector->setValue(columnIdxVector->state->selVector->selectedPositions[0], columnIdx); - auto recordBatch = readTuples(std::move(morsel)); - common::ArrowColumnVector::setArrowColumn( - arrowColumnVectors[columnIdx], recordBatch->column((int)0)); - return true; -} - -void ReadNPY::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - ReadFile::initLocalStateInternal(resultSet, context); - columnIdxVector = resultSet->getValueVector(columnIdxPos).get(); + return reader->readBlock(morsel->blockIdx); } } // namespace processor diff --git a/src/storage/copier/npy_reader.cpp b/src/storage/copier/npy_reader.cpp index 3d33752fe2..8afd9bb7b6 100644 --- a/src/storage/copier/npy_reader.cpp +++ b/src/storage/copier/npy_reader.cpp @@ -251,5 +251,27 @@ std::shared_ptr NpyReader::readBlock(common::block_idx_t blo return result; } +NpyMultiFileReader::NpyMultiFileReader(const std::vector& filePaths) { + for (auto& file : filePaths) { + fileReaders.push_back(std::make_unique(file)); + } +} + +std::shared_ptr NpyMultiFileReader::readBlock(block_idx_t blockIdx) const { + assert(fileReaders.size() > 1); + auto resultArrowBatch = fileReaders[0]->readBlock(blockIdx); + for (int fileIdx = 1; fileIdx < fileReaders.size(); fileIdx++) { + auto nextArrowBatch = fileReaders[fileIdx]->readBlock(blockIdx); + auto result = resultArrowBatch->AddColumn( + fileIdx, std::to_string(fileIdx), nextArrowBatch->column(0)); + if (result.ok()) { + resultArrowBatch = result.ValueOrDie(); + } else { + throw CopyException("Failed to read NPY file."); + } + } + return resultArrowBatch; +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/copier/read_file_state.cpp b/src/storage/copier/read_file_state.cpp index 1fcc4ad40a..5b2bcfdad8 100644 --- a/src/storage/copier/read_file_state.cpp +++ b/src/storage/copier/read_file_state.cpp @@ -125,23 +125,15 @@ void ReadNPYSharedState::countNumRows() { std::unique_ptr ReadNPYSharedState::getMorsel() { std::unique_lock lck{mtx}; while (true) { - if (currFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[currFileIdx]; + auto filePath = filePaths[0]; auto fileBlockInfo = fileBlockInfos.at(filePath); if (currBlockIdx >= fileBlockInfo.numBlocks) { - // No more blocks to read in this file. - currFileIdx++; - currRowIdxInCurrFile = 1; - currBlockIdx = 0; - currRowIdx = 0; - continue; + // No more blocks to read. + return nullptr; } auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[currBlockIdx]; - auto result = std::make_unique( - currRowIdx, currBlockIdx, numRowsInBlock, filePath, currFileIdx, currRowIdxInCurrFile); + auto result = std::make_unique( + currRowIdx, currBlockIdx, numRowsInBlock, filePath, currRowIdxInCurrFile); currRowIdx += numRowsInBlock; currRowIdxInCurrFile += numRowsInBlock; currBlockIdx++; diff --git a/src/storage/index/hash_index.cpp b/src/storage/index/hash_index.cpp index 5c120daaca..36a340d233 100644 --- a/src/storage/index/hash_index.cpp +++ b/src/storage/index/hash_index.cpp @@ -426,7 +426,7 @@ void HashIndex::checkpointInMemory() { } template -void HashIndex::rollback() const { +void HashIndex::rollbackInMemory() const { if (!localStorage->hasUpdates()) { return; } diff --git a/src/storage/storage_structure/in_mem_file.cpp b/src/storage/storage_structure/in_mem_file.cpp index a9b71ea99b..4aee3991d2 100644 --- a/src/storage/storage_structure/in_mem_file.cpp +++ b/src/storage/storage_structure/in_mem_file.cpp @@ -169,19 +169,19 @@ ku_list_t InMemOverflowFile::copyList(const Value& listValue, PageByteCursor& ov } void InMemOverflowFile::copyStringOverflow( - PageByteCursor& overflowCursor, uint8_t* srcOverflow, ku_string_t* dstKUString) { + PageByteCursor& dstOverflowCursor, uint8_t* srcOverflow, ku_string_t* dstKUString) { // Allocate a new page if necessary. - if (overflowCursor.offsetInPage + dstKUString->len >= BufferPoolConstants::PAGE_4KB_SIZE || - overflowCursor.pageIdx == UINT32_MAX) { - overflowCursor.offsetInPage = 0; - overflowCursor.pageIdx = addANewOverflowPage(); + if (dstOverflowCursor.offsetInPage + dstKUString->len >= BufferPoolConstants::PAGE_4KB_SIZE || + dstOverflowCursor.pageIdx == UINT32_MAX) { + dstOverflowCursor.offsetInPage = 0; + dstOverflowCursor.pageIdx = addANewOverflowPage(); } TypeUtils::encodeOverflowPtr( - dstKUString->overflowPtr, overflowCursor.pageIdx, overflowCursor.offsetInPage); + dstKUString->overflowPtr, dstOverflowCursor.pageIdx, dstOverflowCursor.offsetInPage); std::shared_lock lck(lock); - pages[overflowCursor.pageIdx]->write( - overflowCursor.offsetInPage, overflowCursor.offsetInPage, srcOverflow, dstKUString->len); - overflowCursor.offsetInPage += dstKUString->len; + pages[dstOverflowCursor.pageIdx]->write(dstOverflowCursor.offsetInPage, + dstOverflowCursor.offsetInPage, srcOverflow, dstKUString->len); + dstOverflowCursor.offsetInPage += dstKUString->len; } void InMemOverflowFile::copyListOverflowFromFile(InMemOverflowFile* srcInMemOverflowFile, diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index c5f681a6e7..b213c03d23 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -290,7 +290,7 @@ void RelTable::checkpointInMemory() { std::bind(&RelTable::clearListsUpdatesStore, this)); } -void RelTable::rollback() { +void RelTable::rollbackInMemory() { performOpOnListsWithUpdates( std::bind(&Lists::rollbackInMemoryIfNecessary, std::placeholders::_1), std::bind(&RelTable::clearListsUpdatesStore, this)); diff --git a/src/storage/wal_replayer.cpp b/src/storage/wal_replayer.cpp index 43051facce..6cbd8cc23d 100644 --- a/src/storage/wal_replayer.cpp +++ b/src/storage/wal_replayer.cpp @@ -47,7 +47,7 @@ void WALReplayer::replay() { if (isCheckpoint) { storageManager->checkpointInMemory(); } else { - storageManager->rollback(); + storageManager->rollbackInMemory(); } } } diff --git a/test/test_runner/csv_to_parquet_converter.cpp b/test/test_runner/csv_to_parquet_converter.cpp index 68a3943455..0e46ab3321 100644 --- a/test/test_runner/csv_to_parquet_converter.cpp +++ b/test/test_runner/csv_to_parquet_converter.cpp @@ -22,16 +22,16 @@ arrow::Status CSVToParquetConverter::runCSVToParquetConversion( std::shared_ptr outFileStream; std::shared_ptr infile; std::shared_ptr csvTable; - ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(inputFile)); + ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(inputFile)) auto readOptions = arrow::csv::ReadOptions::Defaults(); auto parseOptions = arrow::csv::ParseOptions::Defaults(); readOptions.autogenerate_column_names = !hasHeader; parseOptions.delimiter = delimiter; ARROW_ASSIGN_OR_RAISE( auto csvReader, arrow::csv::TableReader::Make(arrow::io::default_io_context(), infile, - readOptions, parseOptions, arrow::csv::ConvertOptions::Defaults())); - ARROW_ASSIGN_OR_RAISE(csvTable, csvReader->Read()); - ARROW_ASSIGN_OR_RAISE(outFileStream, arrow::io::FileOutputStream::Open(outputFile)); + readOptions, parseOptions, arrow::csv::ConvertOptions::Defaults())) + ARROW_ASSIGN_OR_RAISE(csvTable, csvReader->Read()) + ARROW_ASSIGN_OR_RAISE(outFileStream, arrow::io::FileOutputStream::Open(outputFile)) PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable( *csvTable, arrow::default_memory_pool(), outFileStream, csvTable->num_rows())); return arrow::Status::OK(); @@ -87,7 +87,7 @@ void CSVToParquetConverter::createCopyFile(const std::string& parquetDatasetPath throw TestException(StringUtils::string_format( "Error opening file: {}, errno: {}.", targetCopyCypherFile, errno)); } - for (auto copyCommand : copyCommands) { + for (const auto& copyCommand : copyCommands) { auto cmd = "COPY " + copyCommand.table + " FROM '" + copyCommand.parquetFilePath + "'"; outfile << cmd << std::endl; }