diff --git a/src/binder/bind/bind_copy.cpp b/src/binder/bind/bind_copy.cpp index d426209787..b6505b22af 100644 --- a/src/binder/bind/bind_copy.cpp +++ b/src/binder/bind/bind_copy.cpp @@ -25,38 +25,68 @@ std::unique_ptr Binder::bindCopyToClause(const Statement& statem if (fileType != CopyDescription::FileType::CSV) { throw BinderException("COPY TO currently only supports csv files."); } - return std::make_unique( - CopyDescription(std::vector{boundFilePath}, fileType, columnNames), - std::move(query)); + auto copyDescription = std::make_unique( + fileType, std::vector{boundFilePath}, columnNames); + return std::make_unique(std::move(copyDescription), std::move(query)); +} + +// As a temporary constraint, we require npy files loaded with COPY FROM BY COLUMN keyword. +// And csv and parquet files loaded with COPY FROM keyword. +static void validateCopyNpyKeyword( + CopyDescription::FileType expectedType, CopyDescription::FileType actualType) { + if (expectedType == CopyDescription::FileType::UNKNOWN && + actualType == CopyDescription::FileType::NPY) { + throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files."); + } + if (expectedType == CopyDescription::FileType::NPY && actualType != expectedType) { + throw BinderException("Please use COPY FROM statement for copying csv and parquet files."); + } +} + +static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, catalog::TableSchema* schema) { + if (schema->properties.size() != numFiles) { + throw BinderException(StringUtils::string_format( + "Number of npy files is not equal to number of properties in table {}.", + schema->tableName)); + } } std::unique_ptr Binder::bindCopyFromClause(const Statement& statement) { auto& copyStatement = (CopyFrom&)statement; auto catalogContent = catalog.getReadOnlyVersion(); auto tableName = copyStatement.getTableName(); + // Bind to table schema. validateTableExist(catalog, tableName); auto tableID = catalogContent->getTableID(tableName); + auto tableSchema = catalogContent->getTableSchema(tableID); + // Bind csv reader configuration auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions()); auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths()); + // Bind file type. auto actualFileType = bindFileType(boundFilePaths); auto expectedFileType = copyStatement.getFileType(); - if (expectedFileType == CopyDescription::FileType::UNKNOWN && - actualFileType == CopyDescription::FileType::NPY) { - throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files."); - } - if (expectedFileType == CopyDescription::FileType::NPY && actualFileType != expectedFileType) { - throw BinderException("Please use COPY FROM statement for copying csv and parquet files."); - } + validateCopyNpyKeyword(expectedFileType, actualFileType); if (actualFileType == CopyDescription::FileType::NPY) { - auto tableSchema = catalogContent->getTableSchema(tableID); - if (tableSchema->properties.size() != boundFilePaths.size()) { - throw BinderException(StringUtils::string_format( - "Number of npy files is not equal to number of properties in table {}.", - tableSchema->tableName)); + validateCopyNpyFilesMatchSchema(boundFilePaths.size(), tableSchema); + } + // Bind execution mode. + // For CSV file, and table with SERIAL columns, we need to read in serial from files. + bool preservingOrder = actualFileType == CopyDescription::FileType::CSV; + expression_vector columnExpressions; + for (auto& property : tableSchema->getProperties()) { + if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) { + columnExpressions.push_back(createVariable( + property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN})); + } else { + preservingOrder = true; } } - return std::make_unique( - CopyDescription(boundFilePaths, actualFileType, csvReaderConfig), tableID, tableName); + auto copyDescription = std::make_unique( + actualFileType, boundFilePaths, std::move(csvReaderConfig)); + auto nodeOffsetExpression = + createVariable("nodeOffset", common::LogicalType{common::LogicalTypeID::INT64}); + return std::make_unique(std::move(copyDescription), tableSchema, + std::move(columnExpressions), std::move(nodeOffsetExpression), preservingOrder); } std::vector Binder::bindFilePaths(const std::vector& filePaths) { @@ -76,9 +106,9 @@ std::vector Binder::bindFilePaths(const std::vector& f return boundFilePaths; } -CSVReaderConfig Binder::bindParsingOptions( +std::unique_ptr Binder::bindParsingOptions( const std::unordered_map>* parsingOptions) { - CSVReaderConfig csvReaderConfig; + auto csvReaderConfig = std::make_unique(); for (auto& parsingOption : *parsingOptions) { auto copyOptionName = parsingOption.first; StringUtils::toUpper(copyOptionName); @@ -91,7 +121,7 @@ CSVReaderConfig Binder::bindParsingOptions( throw BinderException( "The value type of parsing csv option " + copyOptionName + " must be boolean."); } - csvReaderConfig.hasHeader = + csvReaderConfig->hasHeader = ((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue(); } else if (boundCopyOptionExpression->dataType.getLogicalTypeID() == LogicalTypeID::STRING && @@ -102,7 +132,7 @@ CSVReaderConfig Binder::bindParsingOptions( } auto copyOptionValue = ((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue(); - bindStringParsingOptions(csvReaderConfig, copyOptionName, copyOptionValue); + bindStringParsingOptions(*csvReaderConfig, copyOptionName, copyOptionValue); } else { throw BinderException("Unrecognized parsing csv option: " + copyOptionName + "."); } diff --git a/src/common/copier_config/copier_config.cpp b/src/common/copier_config/copier_config.cpp index 8ea257caa7..61125cbd5b 100644 --- a/src/common/copier_config/copier_config.cpp +++ b/src/common/copier_config/copier_config.cpp @@ -8,26 +8,6 @@ using namespace kuzu::utf8proc; namespace kuzu { namespace common { -// Copy From -CopyDescription::CopyDescription( - const std::vector& filePaths, FileType fileType, CSVReaderConfig csvReaderConfig) - : filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} { - this->csvReaderConfig = std::make_unique(csvReaderConfig); -} - -// Copy To -CopyDescription::CopyDescription(const std::vector& filePaths, FileType fileType, - const std::vector& columnNames) - : filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {} - -CopyDescription::CopyDescription(const CopyDescription& copyDescription) - : filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr}, - fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} { - if (copyDescription.csvReaderConfig != nullptr) { - this->csvReaderConfig = std::make_unique(*copyDescription.csvReaderConfig); - } -} - CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::string& extension) { CopyDescription::FileType fileType = CopyDescription::fileTypeMap[extension]; if (fileType == FileType::UNKNOWN) { diff --git a/src/include/binder/binder.h b/src/include/binder/binder.h index 142cf387a2..56f3d38957 100644 --- a/src/include/binder/binder.h +++ b/src/include/binder/binder.h @@ -101,7 +101,7 @@ class Binder { std::vector bindFilePaths(const std::vector& filePaths); - common::CSVReaderConfig bindParsingOptions( + std::unique_ptr bindParsingOptions( const std::unordered_map>* parsingOptions); void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig, diff --git a/src/include/binder/copy/bound_copy_from.h b/src/include/binder/copy/bound_copy_from.h index 3e8bfd79a4..5bd74b52e6 100644 --- a/src/include/binder/copy/bound_copy_from.h +++ b/src/include/binder/copy/bound_copy_from.h @@ -13,22 +13,30 @@ namespace binder { class BoundCopyFrom : public BoundStatement { public: - BoundCopyFrom( - common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName) + BoundCopyFrom(std::unique_ptr copyDescription, + catalog::TableSchema* tableSchema, expression_vector columnExpressions, + std::shared_ptr nodeOffsetExpression, bool preservingOrder_) : BoundStatement{common::StatementType::COPY_FROM, BoundStatementResult::createSingleStringColumnResult()}, - copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {} - - inline common::CopyDescription getCopyDescription() const { return copyDescription; } - - inline common::table_id_t getTableID() const { return tableID; } - - inline std::string getTableName() const { return tableName; } + copyDescription{std::move(copyDescription)}, tableSchema{tableSchema}, + columnExpressions{std::move(columnExpressions)}, + nodeOffsetExpression{std::move(nodeOffsetExpression)}, preservingOrder_{ + preservingOrder_} {} + + inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); } + inline catalog::TableSchema* getTableSchema() const { return tableSchema; } + inline expression_vector getColumnExpressions() const { return columnExpressions; } + inline std::shared_ptr getNodeOffsetExpression() const { + return nodeOffsetExpression; + } + inline bool preservingOrder() const { return preservingOrder_; } private: - common::CopyDescription copyDescription; - common::table_id_t tableID; - std::string tableName; + std::unique_ptr copyDescription; + catalog::TableSchema* tableSchema; + expression_vector columnExpressions; + std::shared_ptr nodeOffsetExpression; + bool preservingOrder_; }; } // namespace binder diff --git a/src/include/binder/copy/bound_copy_to.h b/src/include/binder/copy/bound_copy_to.h index 03b156c10a..9fbfd3a7dc 100644 --- a/src/include/binder/copy/bound_copy_to.h +++ b/src/include/binder/copy/bound_copy_to.h @@ -12,17 +12,17 @@ namespace binder { class BoundCopyTo : public BoundStatement { public: - BoundCopyTo( - common::CopyDescription copyDescription, std::unique_ptr regularQuery) + BoundCopyTo(std::unique_ptr copyDescription, + std::unique_ptr regularQuery) : BoundStatement{common::StatementType::COPY_TO, BoundStatementResult::createEmptyResult()}, - regularQuery{std::move(regularQuery)}, copyDescription{std::move(copyDescription)} {} + copyDescription{std::move(copyDescription)}, regularQuery{std::move(regularQuery)} {} - inline common::CopyDescription getCopyDescription() const { return copyDescription; } + inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); } inline BoundRegularQuery* getRegularQuery() const { return regularQuery.get(); } private: - common::CopyDescription copyDescription; + std::unique_ptr copyDescription; std::unique_ptr regularQuery; }; diff --git a/src/include/common/copier_config/copier_config.h b/src/include/common/copier_config/copier_config.h index eb2be6ef73..f962d63dc0 100644 --- a/src/include/common/copier_config/copier_config.h +++ b/src/include/common/copier_config/copier_config.h @@ -6,10 +6,6 @@ #include "common/types/types_include.h" #include "common/types/value.h" -namespace spdlog { -class logger; -} - namespace kuzu { namespace common { @@ -33,15 +29,32 @@ struct CSVReaderConfig { struct CopyDescription { enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 }; + FileType fileType; + std::vector filePaths; + std::vector columnNames; + std::unique_ptr csvReaderConfig; + // Copy From - CopyDescription(const std::vector& filePaths, FileType fileType, - CSVReaderConfig csvReaderConfig); + CopyDescription(FileType fileType, const std::vector& filePaths, + std::unique_ptr csvReaderConfig) + : fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {} // Copy To - CopyDescription(const std::vector& filePaths, FileType fileType, - const std::vector& columnNames); + CopyDescription(FileType fileType, const std::vector& filePaths, + const std::vector& columnNames) + : fileType{fileType}, filePaths{filePaths}, columnNames{columnNames}, csvReaderConfig{ + nullptr} {} - CopyDescription(const CopyDescription& copyDescription); + CopyDescription(const CopyDescription& other) + : fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} { + if (other.csvReaderConfig != nullptr) { + this->csvReaderConfig = std::make_unique(*other.csvReaderConfig); + } + } + + inline std::unique_ptr copy() const { + return std::make_unique(*this); + } inline static std::unordered_map fileTypeMap{ {"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET}, @@ -50,11 +63,6 @@ struct CopyDescription { static FileType getFileTypeFromExtension(const std::string& extension); static std::string getFileTypeName(FileType fileType); - - const std::vector filePaths; - const std::vector columnNames; - std::unique_ptr csvReaderConfig; - FileType fileType; }; } // namespace common diff --git a/src/include/parser/transformer.h b/src/include/parser/transformer.h index cbc0cc35de..6c0a1fa9d0 100644 --- a/src/include/parser/transformer.h +++ b/src/include/parser/transformer.h @@ -263,7 +263,7 @@ class Transformer { std::unique_ptr transformCopyTo(CypherParser::KU_CopyTOContext& ctx); - std::unique_ptr transformCopyFromCSV(CypherParser::KU_CopyFromCSVContext& ctx); + std::unique_ptr transformCopyFrom(CypherParser::KU_CopyFromCSVContext& ctx); std::unique_ptr transformCopyFromNPY(CypherParser::KU_CopyFromNPYContext& ctx); diff --git a/src/include/planner/logical_plan/copy/logical_copy_from.h b/src/include/planner/logical_plan/copy/logical_copy_from.h index 33548dbc52..077933b8e8 100644 --- a/src/include/planner/logical_plan/copy/logical_copy_from.h +++ b/src/include/planner/logical_plan/copy/logical_copy_from.h @@ -10,53 +10,45 @@ namespace planner { class LogicalCopyFrom : public LogicalOperator { public: - LogicalCopyFrom(const common::CopyDescription& copyDescription, common::table_id_t tableID, - std::string tableName, bool hasSerial, binder::expression_vector dataColumnExpressions, + LogicalCopyFrom(std::unique_ptr copyDescription, + catalog::TableSchema* tableSchema, binder::expression_vector columnExpressions, std::shared_ptr nodeOffsetExpression, - std::shared_ptr outputExpression) + std::shared_ptr outputExpression, bool orderPreserving_) : LogicalOperator{LogicalOperatorType::COPY_FROM}, - copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)}, - preservingOrder{hasSerial}, dataColumnExpressions{std::move(dataColumnExpressions)}, - nodeOffsetExpression{std::move(nodeOffsetExpression)}, outputExpression{ - std::move(outputExpression)} {} + copyDescription{std::move(copyDescription)}, tableSchema{tableSchema}, + columnExpressions{std::move(columnExpressions)}, nodeOffsetExpression{std::move( + nodeOffsetExpression)}, + outputExpression{std::move(outputExpression)}, orderPreserving_{orderPreserving_} {} - inline std::string getExpressionsForPrinting() const override { return tableName; } - - inline common::CopyDescription getCopyDescription() const { return copyDescription; } - - inline common::table_id_t getTableID() const { return tableID; } - - inline std::vector> getDataColumnExpressions() const { - return dataColumnExpressions; - } + inline std::string getExpressionsForPrinting() const override { return tableSchema->tableName; } + inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); } + inline catalog::TableSchema* getTableSchema() const { return tableSchema; } + inline binder::expression_vector getColumnExpressions() const { return columnExpressions; } inline std::shared_ptr getNodeOffsetExpression() const { return nodeOffsetExpression; } - inline std::shared_ptr getOutputExpression() const { return outputExpression; } - inline bool isPreservingOrder() { return preservingOrder; } + inline bool orderPreserving() { return orderPreserving_; } void computeFactorizedSchema() override; void computeFlatSchema() override; inline std::unique_ptr copy() override { - return make_unique(copyDescription, tableID, tableName, preservingOrder, - dataColumnExpressions, nodeOffsetExpression, outputExpression); + return make_unique(copyDescription->copy(), tableSchema, columnExpressions, + nodeOffsetExpression, outputExpression, orderPreserving_); } private: - common::CopyDescription copyDescription; - common::table_id_t tableID; - // Used for printing only. - std::string tableName; - binder::expression_vector dataColumnExpressions; + std::unique_ptr copyDescription; + catalog::TableSchema* tableSchema; + binder::expression_vector columnExpressions; std::shared_ptr nodeOffsetExpression; std::shared_ptr outputExpression; - bool preservingOrder; + bool orderPreserving_; }; } // namespace planner diff --git a/src/include/planner/logical_plan/copy/logical_copy_to.h b/src/include/planner/logical_plan/copy/logical_copy_to.h index 89b8cd31e2..c7216ea5b7 100644 --- a/src/include/planner/logical_plan/copy/logical_copy_to.h +++ b/src/include/planner/logical_plan/copy/logical_copy_to.h @@ -9,27 +9,28 @@ namespace planner { class LogicalCopyTo : public LogicalOperator { public: - LogicalCopyTo( - std::shared_ptr child, const common::CopyDescription& copyDescription) - : LogicalOperator{LogicalOperatorType::COPY_TO, child}, copyDescription{copyDescription} {} + LogicalCopyTo(std::shared_ptr child, + std::unique_ptr copyDescription) + : LogicalOperator{LogicalOperatorType::COPY_TO, std::move(child)}, + copyDescription{std::move(copyDescription)} {} f_group_pos_set getGroupsPosToFlatten(); inline std::string getExpressionsForPrinting() const override { return std::string{}; } - inline common::CopyDescription getCopyDescription() const { return copyDescription; } + inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); } void computeFactorizedSchema() override; void computeFlatSchema() override; inline std::unique_ptr copy() override { - return make_unique(children[0]->copy(), copyDescription); + return make_unique(children[0]->copy(), copyDescription->copy()); } private: std::shared_ptr outputExpression; - common::CopyDescription copyDescription; + std::unique_ptr copyDescription; }; } // namespace planner diff --git a/src/include/planner/planner.h b/src/include/planner/planner.h index 11aeeac29e..3f8f6cd0dd 100644 --- a/src/include/planner/planner.h +++ b/src/include/planner/planner.h @@ -52,8 +52,7 @@ class Planner { const storage::NodesStatisticsAndDeletedIDs& nodesStatistics, const storage::RelsStatistics& relsStatistics, const BoundStatement& statement); - static std::unique_ptr planCopyFrom( - const catalog::Catalog& catalog, const BoundStatement& statement); + static std::unique_ptr planCopyFrom(const BoundStatement& statement); }; } // namespace planner diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 13e538e3cb..53db94ab46 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -58,7 +58,7 @@ struct CopyNodeInfo { storage::RelsStore* relsStore; catalog::Catalog* catalog; storage::WAL* wal; - bool preservingOrder; + bool orderPreserving; }; class CopyNode : public Sink { diff --git a/src/include/processor/operator/persistent/reader.h b/src/include/processor/operator/persistent/reader.h index 24b8c6bd5c..ababd49c87 100644 --- a/src/include/processor/operator/persistent/reader.h +++ b/src/include/processor/operator/persistent/reader.h @@ -8,19 +8,30 @@ namespace processor { struct ReaderInfo { DataPos nodeOffsetPos; - std::vector dataColumnPoses; - bool isOrderPreserving; - storage::read_rows_func_t readFunc; - storage::init_reader_data_func_t initFunc; + std::vector dataColumnsPos; + bool orderPreserving; + + ReaderInfo( + const DataPos& nodeOffsetPos, std::vector dataColumnsPos, bool orderPreserving) + : nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)}, + orderPreserving{orderPreserving} {} + ReaderInfo(const ReaderInfo& other) + : nodeOffsetPos{other.nodeOffsetPos}, dataColumnsPos{other.dataColumnsPos}, + orderPreserving{other.orderPreserving} {} + + inline uint32_t getNumColumns() const { return dataColumnsPos.size(); } + + inline std::unique_ptr copy() const { return std::make_unique(*this); } }; class Reader : public PhysicalOperator { public: - Reader(ReaderInfo readerInfo, std::shared_ptr sharedState, - uint32_t id, const std::string& paramsString) - : PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, - readerInfo{std::move(readerInfo)}, sharedState{std::move(sharedState)}, readFuncData{ - nullptr} {} + Reader(std::unique_ptr info, + std::shared_ptr sharedState, uint32_t id, + const std::string& paramsString) + : PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, info{std::move(info)}, + sharedState{std::move(sharedState)}, dataChunk{nullptr}, + nodeOffsetVector{nullptr}, readFunc{nullptr}, initFunc{nullptr}, readFuncData{nullptr} {} void initGlobalStateInternal(ExecutionContext* context) final; @@ -29,7 +40,7 @@ class Reader : public PhysicalOperator { inline bool isSource() const final { return true; } inline std::unique_ptr clone() final { - return make_unique(readerInfo, sharedState, getOperatorID(), paramsString); + return make_unique(info->copy(), sharedState, getOperatorID(), paramsString); } protected: @@ -40,13 +51,18 @@ class Reader : public PhysicalOperator { void getNextNodeGroupInParallel(); private: - ReaderInfo readerInfo; + std::unique_ptr info; std::shared_ptr sharedState; - std::unique_ptr dataChunkToRead; + storage::LeftArrowArrays leftArrowArrays; + std::unique_ptr dataChunk = nullptr; + common::ValueVector* nodeOffsetVector = nullptr; + + storage::read_rows_func_t readFunc = nullptr; + storage::init_reader_data_func_t initFunc = nullptr; // For parallel reading. - std::unique_ptr readFuncData; + std::unique_ptr readFuncData = nullptr; }; } // namespace processor diff --git a/src/include/processor/physical_plan.h b/src/include/processor/physical_plan.h index 17d282c8b6..6375701f2d 100644 --- a/src/include/processor/physical_plan.h +++ b/src/include/processor/physical_plan.h @@ -20,15 +20,5 @@ class PhysicalPlan { std::unique_ptr lastOperator; }; -class PhysicalPlanUtil { -public: - static std::vector collectOperators( - PhysicalOperator* root, PhysicalOperatorType operatorType); - -private: - static void collectOperatorsRecursive(PhysicalOperator* op, PhysicalOperatorType operatorType, - std::vector& result); -}; - } // namespace processor } // namespace kuzu diff --git a/src/include/storage/copier/reader_state.h b/src/include/storage/copier/reader_state.h index 4b0b03597d..f4ae436c10 100644 --- a/src/include/storage/copier/reader_state.h +++ b/src/include/storage/copier/reader_state.h @@ -141,15 +141,14 @@ class ReaderSharedState { friend class processor::Reader; public: - ReaderSharedState(common::CopyDescription::FileType fileType, - std::vector filePaths, common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema) - : fileType{fileType}, filePaths{std::move(filePaths)}, csvReaderConfig{csvReaderConfig}, - tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} { - validateFunc = ReaderFunctions::getValidateFunc(fileType); - initFunc = ReaderFunctions::getInitDataFunc(fileType); - countBlocksFunc = ReaderFunctions::getCountBlocksFunc(fileType); - readFunc = ReaderFunctions::getReadRowsFunc(fileType); + ReaderSharedState( + std::unique_ptr copyDescription, catalog::TableSchema* tableSchema) + : copyDescription{std::move(copyDescription)}, tableSchema{tableSchema}, numRows{0}, + currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} { + validateFunc = ReaderFunctions::getValidateFunc(this->copyDescription->fileType); + initFunc = ReaderFunctions::getInitDataFunc(this->copyDescription->fileType); + countBlocksFunc = ReaderFunctions::getCountBlocksFunc(this->copyDescription->fileType); + readFunc = ReaderFunctions::getReadRowsFunc(this->copyDescription->fileType); } void validate(); @@ -168,9 +167,7 @@ class ReaderSharedState { public: std::mutex mtx; - common::CopyDescription::FileType fileType; - std::vector filePaths; - common::CSVReaderConfig csvReaderConfig; + std::unique_ptr copyDescription; catalog::TableSchema* tableSchema; validate_func_t validateFunc; diff --git a/src/parser/transform/transform_copy.cpp b/src/parser/transform/transform_copy.cpp index de3dfb6458..343ba55b81 100644 --- a/src/parser/transform/transform_copy.cpp +++ b/src/parser/transform/transform_copy.cpp @@ -12,7 +12,7 @@ std::unique_ptr Transformer::transformCopyTo(CypherParser::KU_CopyTOC return std::make_unique(std::move(filePath), std::move(regularQuery)); } -std::unique_ptr Transformer::transformCopyFromCSV( +std::unique_ptr Transformer::transformCopyFrom( CypherParser::KU_CopyFromCSVContext& ctx) { auto filePaths = transformFilePaths(ctx.kU_FilePaths()->StringLiteral()); auto tableName = transformSchemaName(*ctx.oC_SchemaName()); diff --git a/src/parser/transformer.cpp b/src/parser/transformer.cpp index 7768ca961e..1106543be2 100644 --- a/src/parser/transformer.cpp +++ b/src/parser/transformer.cpp @@ -29,7 +29,7 @@ std::unique_ptr Transformer::transformOcStatement( } else if (ctx.kU_CopyFromNPY()) { return transformCopyFromNPY(*ctx.kU_CopyFromNPY()); } else if (ctx.kU_CopyFromCSV()) { - return transformCopyFromCSV(*ctx.kU_CopyFromCSV()); + return transformCopyFrom(*ctx.kU_CopyFromCSV()); } else if (ctx.kU_CopyTO()) { return transformCopyTo(*ctx.kU_CopyTO()); } else if (ctx.kU_StandaloneCall()) { diff --git a/src/planner/operator/copy/logical_copy_from.cpp b/src/planner/operator/copy/logical_copy_from.cpp index 264c912919..927ba92d77 100644 --- a/src/planner/operator/copy/logical_copy_from.cpp +++ b/src/planner/operator/copy/logical_copy_from.cpp @@ -6,7 +6,7 @@ namespace planner { void LogicalCopyFrom::computeFactorizedSchema() { createEmptySchema(); auto groupPos = schema->createGroup(); - schema->insertToGroupAndScope(dataColumnExpressions, groupPos); + schema->insertToGroupAndScope(columnExpressions, groupPos); schema->insertToGroupAndScope(nodeOffsetExpression, groupPos); schema->insertToGroupAndScope(outputExpression, groupPos); schema->setGroupAsSingleState(groupPos); @@ -15,7 +15,7 @@ void LogicalCopyFrom::computeFactorizedSchema() { void LogicalCopyFrom::computeFlatSchema() { createEmptySchema(); schema->createGroup(); - schema->insertToGroupAndScope(dataColumnExpressions, 0); + schema->insertToGroupAndScope(columnExpressions, 0); schema->insertToGroupAndScope(nodeOffsetExpression, 0); schema->insertToGroupAndScope(outputExpression, 0); } diff --git a/src/planner/plan/CMakeLists.txt b/src/planner/plan/CMakeLists.txt index 83fb7af1ea..5377a3687d 100644 --- a/src/planner/plan/CMakeLists.txt +++ b/src/planner/plan/CMakeLists.txt @@ -20,6 +20,7 @@ add_library(kuzu_planner_plan_operator append_scan_node.cpp append_set.cpp append_unwind.cpp + plan_copy.cpp plan_join_order.cpp plan_projection.cpp plan_read.cpp diff --git a/src/planner/plan/plan_copy.cpp b/src/planner/plan/plan_copy.cpp new file mode 100644 index 0000000000..dad36d01b9 --- /dev/null +++ b/src/planner/plan/plan_copy.cpp @@ -0,0 +1,40 @@ +#include "binder/copy/bound_copy_from.h" +#include "binder/copy/bound_copy_to.h" +#include "planner/logical_plan/copy/logical_copy_from.h" +#include "planner/logical_plan/copy/logical_copy_to.h" +#include "planner/planner.h" + +using namespace kuzu::storage; +using namespace kuzu::catalog; +using namespace kuzu::common; + +namespace kuzu { +namespace planner { + +std::unique_ptr Planner::planCopyFrom(const BoundStatement& statement) { + auto& copyClause = reinterpret_cast(statement); + auto plan = std::make_unique(); + auto copy = make_shared(copyClause.getCopyDescription()->copy(), + copyClause.getTableSchema(), copyClause.getColumnExpressions(), + copyClause.getNodeOffsetExpression(), + copyClause.getStatementResult()->getSingleExpressionToCollect(), + copyClause.preservingOrder()); + plan->setLastOperator(std::move(copy)); + return plan; +} + +std::unique_ptr Planner::planCopyTo(const Catalog& catalog, + const NodesStatisticsAndDeletedIDs& nodesStatistics, const RelsStatistics& relsStatistics, + const BoundStatement& statement) { + auto& copyClause = reinterpret_cast(statement); + auto regularQuery = copyClause.getRegularQuery(); + assert(regularQuery->getStatementType() == StatementType::QUERY); + auto plan = QueryPlanner(catalog, nodesStatistics, relsStatistics).getBestPlan(*regularQuery); + auto logicalCopyTo = make_shared( + plan->getLastOperator(), copyClause.getCopyDescription()->copy()); + plan->setLastOperator(std::move(logicalCopyTo)); + return plan; +} + +} // namespace planner +} // namespace kuzu diff --git a/src/planner/planner.cpp b/src/planner/planner.cpp index 54053a40ba..6140338885 100644 --- a/src/planner/planner.cpp +++ b/src/planner/planner.cpp @@ -3,8 +3,6 @@ #include "binder/bound_create_macro.h" #include "binder/bound_explain.h" #include "binder/bound_standalone_call.h" -#include "binder/copy/bound_copy_from.h" -#include "binder/copy/bound_copy_to.h" #include "binder/ddl/bound_add_property.h" #include "binder/ddl/bound_create_table.h" #include "binder/ddl/bound_drop_property.h" @@ -12,8 +10,6 @@ #include "binder/ddl/bound_rename_property.h" #include "binder/ddl/bound_rename_table.h" #include "binder/expression/variable_expression.h" -#include "planner/logical_plan/copy/logical_copy_from.h" -#include "planner/logical_plan/copy/logical_copy_to.h" #include "planner/logical_plan/ddl/logical_add_property.h" #include "planner/logical_plan/ddl/logical_create_table.h" #include "planner/logical_plan/ddl/logical_drop_property.h" @@ -49,7 +45,7 @@ std::unique_ptr Planner::getBestPlan(const Catalog& catalog, plan = planCreateRdfGraph(statement); } break; case StatementType::COPY_FROM: { - plan = planCopyFrom(catalog, statement); + plan = planCopyFrom(statement); } break; case StatementType::COPY_TO: { plan = planCopyTo(catalog, nodesStatistics, relsStatistics, statement); @@ -183,47 +179,6 @@ std::unique_ptr Planner::planRenameProperty(const BoundStatement& s return plan; } -std::unique_ptr Planner::planCopyFrom( - const catalog::Catalog& catalog, const BoundStatement& statement) { - auto& copyClause = reinterpret_cast(statement); - auto plan = std::make_unique(); - expression_vector arrowColumnExpressions; - // For CSV file, and table with SERIAL columns, we need to read in serial from files. - bool readInSerialMode = - copyClause.getCopyDescription().fileType == CopyDescription::FileType::CSV; - for (auto& property : - catalog.getReadOnlyVersion()->getTableSchema(copyClause.getTableID())->properties) { - if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) { - arrowColumnExpressions.push_back(std::make_shared( - common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}, property->getName(), - property->getName())); - } else { - readInSerialMode = true; - } - } - auto copy = - make_shared(copyClause.getCopyDescription(), copyClause.getTableID(), - copyClause.getTableName(), readInSerialMode, std::move(arrowColumnExpressions), - std::make_shared( - common::LogicalType{common::LogicalTypeID::INT64}, "nodeOffset", "nodeOffset"), - copyClause.getStatementResult()->getSingleExpressionToCollect()); - plan->setLastOperator(std::move(copy)); - return plan; -} - -std::unique_ptr Planner::planCopyTo(const Catalog& catalog, - const NodesStatisticsAndDeletedIDs& nodesStatistics, const RelsStatistics& relsStatistics, - const BoundStatement& statement) { - auto& copyClause = reinterpret_cast(statement); - auto regularQuery = copyClause.getRegularQuery(); - assert(regularQuery->getStatementType() == StatementType::QUERY); - auto plan = QueryPlanner(catalog, nodesStatistics, relsStatistics).getBestPlan(*regularQuery); - auto logicalCopyTo = - make_shared(plan->getLastOperator(), copyClause.getCopyDescription()); - plan->setLastOperator(std::move(logicalCopyTo)); - return plan; -} - std::unique_ptr Planner::planStandaloneCall(const BoundStatement& statement) { auto& standaloneCallClause = reinterpret_cast(statement); auto plan = std::make_unique(); diff --git a/src/processor/CMakeLists.txt b/src/processor/CMakeLists.txt index 75e1a51067..9a14c56eae 100644 --- a/src/processor/CMakeLists.txt +++ b/src/processor/CMakeLists.txt @@ -4,7 +4,6 @@ add_subdirectory(result) add_library(kuzu_processor OBJECT - physical_plan.cpp processor.cpp processor_task.cpp) diff --git a/src/processor/map/CMakeLists.txt b/src/processor/map/CMakeLists.txt index 4ed48fd2e1..0611d98e50 100644 --- a/src/processor/map/CMakeLists.txt +++ b/src/processor/map/CMakeLists.txt @@ -9,7 +9,8 @@ add_library(kuzu_processor_mapper map_acc_hash_join.cpp map_standalone_call.cpp map_in_query_call.cpp - map_copy.cpp + map_copy_to.cpp + map_copy_from.cpp map_create.cpp map_create_macro.cpp map_cross_product.cpp diff --git a/src/processor/map/map_copy.cpp b/src/processor/map/map_copy.cpp deleted file mode 100644 index b4614a6fee..0000000000 --- a/src/processor/map/map_copy.cpp +++ /dev/null @@ -1,101 +0,0 @@ -#include "planner/logical_plan/copy/logical_copy_from.h" -#include "planner/logical_plan/copy/logical_copy_to.h" -#include "processor/operator/persistent/copy_node.h" -#include "processor/operator/persistent/copy_rel.h" -#include "processor/operator/persistent/copy_to.h" -#include "processor/operator/persistent/reader.h" -#include "processor/plan_mapper.h" - -using namespace kuzu::common; -using namespace kuzu::planner; -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -std::unique_ptr PlanMapper::mapCopyFrom(LogicalOperator* logicalOperator) { - auto copy = (LogicalCopyFrom*)logicalOperator; - auto tableSchema = catalog->getReadOnlyVersion()->getTableSchema(copy->getTableID()); - switch (tableSchema->getTableType()) { - case catalog::TableType::NODE: - return mapCopyNode(logicalOperator); - case catalog::TableType::REL: - return mapCopyRel(logicalOperator); - default: - throw NotImplementedException{"PlanMapper::mapCopy"}; - } -} - -std::unique_ptr PlanMapper::mapCopyTo(LogicalOperator* logicalOperator) { - auto copy = (LogicalCopyTo*)logicalOperator; - auto childSchema = logicalOperator->getChild(0)->getSchema(); - auto prevOperator = mapOperator(logicalOperator->getChild(0).get()); - std::vector vectorsToCopyPos; - for (auto& expression : childSchema->getExpressionsInScope()) { - vectorsToCopyPos.emplace_back(childSchema->getExpressionPos(*expression)); - } - auto sharedState = std::make_shared(); - return std::make_unique(sharedState, std::move(vectorsToCopyPos), - copy->getCopyDescription(), getOperatorID(), copy->getExpressionsForPrinting(), - std::move(prevOperator)); -} - -std::unique_ptr PlanMapper::mapCopyNode( - planner::LogicalOperator* logicalOperator) { - auto copy = (LogicalCopyFrom*)logicalOperator; - auto fileType = copy->getCopyDescription().fileType; - if (fileType != CopyDescription::FileType::CSV && - fileType != CopyDescription::FileType::PARQUET && - fileType != CopyDescription::FileType::NPY) { - throw NotImplementedException{"PlanMapper::mapLogicalCopyFromToPhysical"}; - } - auto initReadDataFunc = ReaderFunctions::getInitDataFunc(fileType); - auto readRowsFunc = ReaderFunctions::getReadRowsFunc(fileType); - auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()); - auto readerSharedState = - std::make_shared(fileType, copy->getCopyDescription().filePaths, - *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); - - auto outSchema = copy->getSchema(); - auto dataColumnExpressions = copy->getDataColumnExpressions(); - std::vector dataColumnPoses; - dataColumnPoses.reserve(dataColumnExpressions.size()); - for (auto& dataColumnExpr : dataColumnExpressions) { - dataColumnPoses.emplace_back(outSchema->getExpressionPos(*dataColumnExpr)); - } - auto nodeOffsetPos = DataPos(outSchema->getExpressionPos(*copy->getNodeOffsetExpression())); - auto reader = std::make_unique( - ReaderInfo{nodeOffsetPos, dataColumnPoses, copy->isPreservingOrder(), - std::move(readRowsFunc), std::move(initReadDataFunc)}, - readerSharedState, getOperatorID(), copy->getExpressionsForPrinting()); - - auto copyNodeSharedState = - std::make_shared(readerSharedState->getNumRowsRef(), - catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()), - storageManager.getNodesStore().getNodeTable(copy->getTableID()), - copy->getCopyDescription(), memoryManager); - CopyNodeInfo copyNodeDataInfo{dataColumnPoses, nodeOffsetPos, copy->getCopyDescription(), - storageManager.getNodesStore().getNodeTable(copy->getTableID()), - &storageManager.getRelsStore(), catalog, storageManager.getWAL(), - copy->isPreservingOrder()}; - auto copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, - std::make_unique(copy->getSchema()), std::move(reader), - getOperatorID(), copy->getExpressionsForPrinting()); - auto outputExpressions = binder::expression_vector{copy->getOutputExpression()}; - return createFactorizedTableScanAligned(outputExpressions, outSchema, - copyNodeSharedState->fTable, DEFAULT_VECTOR_CAPACITY /* maxMorselSize */, - std::move(copyNode)); -} - -std::unique_ptr PlanMapper::mapCopyRel( - planner::LogicalOperator* logicalOperator) { - auto copy = (LogicalCopyFrom*)logicalOperator; - auto relsStatistics = &storageManager.getRelsStore().getRelsStatistics(); - auto table = storageManager.getRelsStore().getRelTable(copy->getTableID()); - return std::make_unique(catalog, copy->getCopyDescription(), table, - storageManager.getWAL(), relsStatistics, storageManager.getNodesStore(), getOperatorID(), - copy->getExpressionsForPrinting()); -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp new file mode 100644 index 0000000000..a930c96903 --- /dev/null +++ b/src/processor/map/map_copy_from.cpp @@ -0,0 +1,75 @@ +#include "planner/logical_plan/copy/logical_copy_from.h" +#include "processor/operator/persistent/copy_node.h" +#include "processor/operator/persistent/copy_rel.h" +#include "processor/operator/persistent/reader.h" +#include "processor/plan_mapper.h" + +using namespace kuzu::planner; +using namespace kuzu::common; +using namespace kuzu::storage; + +namespace kuzu { +namespace processor { + +std::unique_ptr PlanMapper::mapCopyFrom(LogicalOperator* logicalOperator) { + auto copyFrom = (LogicalCopyFrom*)logicalOperator; + auto tableSchema = copyFrom->getTableSchema(); + switch (tableSchema->getTableType()) { + case catalog::TableType::NODE: + return mapCopyNode(logicalOperator); + case catalog::TableType::REL: + return mapCopyRel(logicalOperator); + default: + throw NotImplementedException{"PlanMapper::mapCopy"}; + } +} + +std::unique_ptr PlanMapper::mapCopyNode( + planner::LogicalOperator* logicalOperator) { + auto copyFrom = (LogicalCopyFrom*)logicalOperator; + auto fileType = copyFrom->getCopyDescription()->fileType; + assert(fileType == CopyDescription::FileType::CSV || + fileType == CopyDescription::FileType::PARQUET || + fileType == CopyDescription::FileType::NPY); + auto tableSchema = (catalog::NodeTableSchema*)copyFrom->getTableSchema(); + auto copyDesc = copyFrom->getCopyDescription(); + // Map reader. + auto readerSharedState = std::make_shared(copyDesc->copy(), tableSchema); + auto outSchema = copyFrom->getSchema(); + std::vector dataColumnsPos; + for (auto& expression : copyFrom->getColumnExpressions()) { + dataColumnsPos.emplace_back(outSchema->getExpressionPos(*expression)); + } + auto nodeOffsetPos = DataPos(outSchema->getExpressionPos(*copyFrom->getNodeOffsetExpression())); + auto readInfo = + std::make_unique(nodeOffsetPos, dataColumnsPos, copyFrom->orderPreserving()); + auto reader = std::make_unique(std::move(readInfo), readerSharedState, getOperatorID(), + copyFrom->getExpressionsForPrinting()); + // Map copy CSV. + auto nodeTable = storageManager.getNodesStore().getNodeTable(tableSchema->tableID); + auto copyNodeSharedState = std::make_shared( + readerSharedState->getNumRowsRef(), tableSchema, nodeTable, *copyDesc, memoryManager); + CopyNodeInfo copyNodeDataInfo{dataColumnsPos, nodeOffsetPos, *copyDesc, nodeTable, + &storageManager.getRelsStore(), catalog, storageManager.getWAL(), + copyFrom->orderPreserving()}; + auto copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, + std::make_unique(copyFrom->getSchema()), std::move(reader), + getOperatorID(), copyFrom->getExpressionsForPrinting()); + auto outputExpressions = binder::expression_vector{copyFrom->getOutputExpression()}; + return createFactorizedTableScanAligned(outputExpressions, outSchema, + copyNodeSharedState->fTable, DEFAULT_VECTOR_CAPACITY /* maxMorselSize */, + std::move(copyNode)); +} + +std::unique_ptr PlanMapper::mapCopyRel( + planner::LogicalOperator* logicalOperator) { + auto copy = (LogicalCopyFrom*)logicalOperator; + auto relsStatistics = &storageManager.getRelsStore().getRelsStatistics(); + auto table = storageManager.getRelsStore().getRelTable(copy->getTableSchema()->tableID); + return std::make_unique(catalog, *copy->getCopyDescription(), table, + storageManager.getWAL(), relsStatistics, storageManager.getNodesStore(), getOperatorID(), + copy->getExpressionsForPrinting()); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/map/map_copy_to.cpp b/src/processor/map/map_copy_to.cpp new file mode 100644 index 0000000000..2d3c33f70c --- /dev/null +++ b/src/processor/map/map_copy_to.cpp @@ -0,0 +1,27 @@ +#include "planner/logical_plan/copy/logical_copy_to.h" +#include "processor/operator/persistent/copy_to.h" +#include "processor/plan_mapper.h" + +using namespace kuzu::common; +using namespace kuzu::planner; +using namespace kuzu::storage; + +namespace kuzu { +namespace processor { + +std::unique_ptr PlanMapper::mapCopyTo(LogicalOperator* logicalOperator) { + auto copy = (LogicalCopyTo*)logicalOperator; + auto childSchema = logicalOperator->getChild(0)->getSchema(); + auto prevOperator = mapOperator(logicalOperator->getChild(0).get()); + std::vector vectorsToCopyPos; + for (auto& expression : childSchema->getExpressionsInScope()) { + vectorsToCopyPos.emplace_back(childSchema->getExpressionPos(*expression)); + } + auto sharedState = std::make_shared(); + return std::make_unique(sharedState, std::move(vectorsToCopyPos), + *copy->getCopyDescription(), getOperatorID(), copy->getExpressionsForPrinting(), + std::move(prevOperator)); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index ba085e2175..b43730979c 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -94,7 +94,7 @@ void CopyNode::executeInternal(ExecutionContext* context) { assert(numAppendedTuplesInNodeGroup == numTuplesToAppend); if (localNodeGroup->isFull()) { node_group_idx_t nodeGroupIdx; - if (copyNodeInfo.preservingOrder) { + if (copyNodeInfo.orderPreserving) { nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); sharedState->setNextNodeGroupIdx(nodeGroupIdx + 1); } else { diff --git a/src/processor/operator/persistent/reader.cpp b/src/processor/operator/persistent/reader.cpp index 5183343c2d..b4b55785bc 100644 --- a/src/processor/operator/persistent/reader.cpp +++ b/src/processor/operator/persistent/reader.cpp @@ -15,24 +15,26 @@ void Reader::initGlobalStateInternal(ExecutionContext* context) { } void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - dataChunkToRead = std::make_unique(readerInfo.dataColumnPoses.size(), - resultSet->getDataChunk(readerInfo.dataColumnPoses[0].dataChunkPos)->state); - for (auto i = 0u; i < readerInfo.dataColumnPoses.size(); i++) { - dataChunkToRead->insert(i, resultSet->getValueVector(readerInfo.dataColumnPoses[i])); + dataChunk = std::make_unique(info->getNumColumns(), + resultSet->getDataChunk(info->dataColumnsPos[0].dataChunkPos)->state); + for (auto i = 0u; i < info->getNumColumns(); i++) { + dataChunk->insert(i, resultSet->getValueVector(info->dataColumnsPos[i])); } + initFunc = storage::ReaderFunctions::getInitDataFunc(sharedState->copyDescription->fileType); + readFunc = storage::ReaderFunctions::getReadRowsFunc(sharedState->copyDescription->fileType); + nodeOffsetVector = resultSet->getValueVector(info->nodeOffsetPos).get(); } bool Reader::getNextTuplesInternal(ExecutionContext* context) { - readerInfo.isOrderPreserving ? getNextNodeGroupInSerial() : getNextNodeGroupInParallel(); - return dataChunkToRead->state->selVector->selectedSize != 0; + info->orderPreserving ? getNextNodeGroupInSerial() : getNextNodeGroupInParallel(); + return dataChunk->state->selVector->selectedSize != 0; } void Reader::getNextNodeGroupInSerial() { - auto morsel = sharedState->getSerialMorsel(dataChunkToRead.get()); + auto morsel = sharedState->getSerialMorsel(dataChunk.get()); if (morsel->fileIdx == INVALID_VECTOR_IDX) { return; } - auto nodeOffsetVector = resultSet->getValueVector(readerInfo.nodeOffsetPos).get(); nodeOffsetVector->setValue( nodeOffsetVector->state->selVector->selectedPositions[0], morsel->rowIdx); } @@ -44,18 +46,18 @@ void Reader::getNextNodeGroupInParallel() { break; } if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) { - readFuncData = readerInfo.initFunc(sharedState->filePaths, morsel->fileIdx, - sharedState->csvReaderConfig, sharedState->tableSchema); + readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx, + *sharedState->copyDescription->csvReaderConfig, sharedState->tableSchema); } - readerInfo.readFunc(*readFuncData, morsel->blockIdx, dataChunkToRead.get()); - leftArrowArrays.appendFromDataChunk(dataChunkToRead.get()); + readFunc(*readFuncData, morsel->blockIdx, dataChunk.get()); + leftArrowArrays.appendFromDataChunk(dataChunk.get()); } if (leftArrowArrays.getLeftNumRows() == 0) { - dataChunkToRead->state->selVector->selectedSize = 0; + dataChunk->state->selVector->selectedSize = 0; } else { int64_t numRowsToReturn = std::min(leftArrowArrays.getLeftNumRows(), StorageConstants::NODE_GROUP_SIZE); - leftArrowArrays.appendToDataChunk(dataChunkToRead.get(), numRowsToReturn); + leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn); } } diff --git a/src/processor/physical_plan.cpp b/src/processor/physical_plan.cpp deleted file mode 100644 index 675b23c409..0000000000 --- a/src/processor/physical_plan.cpp +++ /dev/null @@ -1,24 +0,0 @@ -#include "processor/physical_plan.h" - -namespace kuzu { -namespace processor { - -std::vector PhysicalPlanUtil::collectOperators( - PhysicalOperator* root, PhysicalOperatorType operatorType) { - std::vector result; - collectOperatorsRecursive(root, operatorType, result); - return result; -} - -void PhysicalPlanUtil::collectOperatorsRecursive(PhysicalOperator* op, - PhysicalOperatorType operatorType, std::vector& result) { - if (op->getOperatorType() == operatorType) { - result.push_back(op); - } - for (auto i = 0u; i < op->getNumChildren(); ++i) { - collectOperatorsRecursive(op->getChild(i), operatorType, result); - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/storage/copier/reader_state.cpp b/src/storage/copier/reader_state.cpp index ee6cb38f03..da48a585a5 100644 --- a/src/storage/copier/reader_state.cpp +++ b/src/storage/copier/reader_state.cpp @@ -234,12 +234,14 @@ void ReaderFunctions::readRowsFromNPYFile(const ReaderFunctionData& functionData } void ReaderSharedState::validate() { - validateFunc(filePaths, tableSchema); + validateFunc(copyDescription->filePaths, tableSchema); } void ReaderSharedState::countBlocks() { - readFuncData = initFunc(filePaths, 0 /* fileIdx */, csvReaderConfig, tableSchema); - blockInfos = countBlocksFunc(filePaths, csvReaderConfig, tableSchema); + readFuncData = initFunc(copyDescription->filePaths, 0 /* fileIdx */, + *copyDescription->csvReaderConfig, tableSchema); + blockInfos = + countBlocksFunc(copyDescription->filePaths, *copyDescription->csvReaderConfig, tableSchema); for (auto& blockInfo : blockInfos) { numRows += blockInfo.numRows; } @@ -249,12 +251,13 @@ std::unique_ptr ReaderSharedState::getSerialMorsel(DataChunk* data std::unique_lock xLck{mtx}; while (leftArrowArrays.getLeftNumRows() < StorageConstants::NODE_GROUP_SIZE) { auto morsel = getMorselOfNextBlock(); - if (morsel->fileIdx >= filePaths.size()) { + if (morsel->fileIdx >= copyDescription->filePaths.size()) { // No more blocks. break; } if (morsel->fileIdx != readFuncData->fileIdx) { - readFuncData = initFunc(filePaths, morsel->fileIdx, csvReaderConfig, tableSchema); + readFuncData = initFunc(copyDescription->filePaths, morsel->fileIdx, + *copyDescription->csvReaderConfig, tableSchema); } readFunc(*readFuncData, morsel->blockIdx, dataChunk); leftArrowArrays.appendFromDataChunk(dataChunk); @@ -276,7 +279,7 @@ std::unique_ptr ReaderSharedState::getParallelMorsel() { std::unique_lock xLck{mtx}; while (true) { auto morsel = getMorselOfNextBlock(); - if (morsel->fileIdx >= filePaths.size()) { + if (morsel->fileIdx >= copyDescription->filePaths.size()) { // No more blocks. break; } @@ -294,7 +297,9 @@ std::unique_ptr ReaderSharedState::getMorselOfNextBlock() { } auto numBlocksInFile = blockInfos[currFileIdx].numRowsPerBlock.size(); if (currBlockIdx >= numBlocksInFile) { - currFileIdx += fileType == CopyDescription::FileType::NPY ? filePaths.size() : 1; + currFileIdx += copyDescription->fileType == CopyDescription::FileType::NPY ? + copyDescription->filePaths.size() : + 1; currBlockIdx = 0; } return std::make_unique(currFileIdx, currBlockIdx++, currRowIdx); diff --git a/src/storage/copier/rel_copier.cpp b/src/storage/copier/rel_copier.cpp index be9510246c..faccc34ab2 100644 --- a/src/storage/copier/rel_copier.cpp +++ b/src/storage/copier/rel_copier.cpp @@ -292,8 +292,8 @@ bool ParquetRelListsCounterAndColumnsCopier::executeInternal() { return false; } if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) { - readFuncData = initFunc(sharedState->filePaths, morsel->fileIdx, - sharedState->csvReaderConfig, sharedState->tableSchema); + readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx, + *sharedState->copyDescription->csvReaderConfig, sharedState->tableSchema); } readFunc(*readFuncData, morsel->blockIdx, dataChunkToRead.get()); auto startRowIdx = morsel->rowIdx; @@ -389,8 +389,8 @@ bool ParquetRelListsCopier::executeInternal() { return false; } if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) { - readFuncData = initFunc(sharedState->filePaths, morsel->fileIdx, - sharedState->csvReaderConfig, sharedState->tableSchema); + readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx, + *sharedState->copyDescription->csvReaderConfig, sharedState->tableSchema); } readFunc(*readFuncData, morsel->blockIdx, dataChunkToRead.get()); auto startRowIdx = morsel->rowIdx; diff --git a/src/storage/copier/rel_copy_executor.cpp b/src/storage/copier/rel_copy_executor.cpp index f74fe2e711..c40c53da6d 100644 --- a/src/storage/copier/rel_copy_executor.cpp +++ b/src/storage/copier/rel_copy_executor.cpp @@ -105,8 +105,8 @@ row_idx_t RelCopyExecutor::populateRelLists(processor::ExecutionContext* executi } std::unique_ptr RelCopyExecutor::createRelCopier(RelCopierType relCopierType) { - auto readerSharedState = std::make_shared(copyDescription.fileType, - copyDescription.filePaths, *copyDescription.csvReaderConfig, tableSchema); + auto readerSharedState = + std::make_shared(copyDescription.copy(), tableSchema); readerSharedState->validate(); readerSharedState->countBlocks(); auto initFunc = ReaderFunctions::getInitDataFunc(copyDescription.fileType);