From 22395ab9db4b20ba0b1ec5d994d61acf78a97d80 Mon Sep 17 00:00:00 2001 From: xiyang Date: Thu, 20 Jul 2023 00:31:49 -0400 Subject: [PATCH] X --- src/include/processor/operator/result_collector.h | 5 ++++- src/processor/mapper/create_result_collector.cpp | 8 ++------ src/processor/mapper/map_cross_product.cpp | 3 +-- src/processor/mapper/map_union.cpp | 11 +++++------ .../operator/table_scan/factorized_table_scan.cpp | 2 +- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/include/processor/operator/result_collector.h b/src/include/processor/operator/result_collector.h index 689695c2f49..8047d248b42 100644 --- a/src/include/processor/operator/result_collector.h +++ b/src/include/processor/operator/result_collector.h @@ -8,6 +8,9 @@ namespace processor { class ResultCollectorSharedState { public: + explicit ResultCollectorSharedState(std::shared_ptr table) + : table{std::move(table)} {} + inline void mergeLocalTable(FactorizedTable& localTable) { std::unique_lock lck{mtx}; table->merge(localTable); @@ -28,7 +31,7 @@ struct ResultCollectorInfo { std::unique_ptr tableSchema, std::vector payloadPositions) : tableSchema{std::move(tableSchema)}, payloadPositions{std::move(payloadPositions)} {} ResultCollectorInfo(const ResultCollectorInfo& other) - : tableSchema{other.tableSchema->copy()} {} + : tableSchema{other.tableSchema->copy()}, payloadPositions{other.payloadPositions} {} inline std::unique_ptr copy() const { return std::make_unique(*this); diff --git a/src/processor/mapper/create_result_collector.cpp b/src/processor/mapper/create_result_collector.cpp index 149ab6ae63a..620782d900c 100644 --- a/src/processor/mapper/create_result_collector.cpp +++ b/src/processor/mapper/create_result_collector.cpp @@ -10,7 +10,6 @@ namespace processor { std::unique_ptr PlanMapper::createResultCollector( const binder::expression_vector& expressions, Schema* schema, std::unique_ptr prevOperator) { - bool hasUnFlatColumn = false; std::vector payloadsPos; auto tableSchema = std::make_unique(); for (auto& expression : expressions) { @@ -22,15 +21,12 @@ std::unique_ptr PlanMapper::createResultCollector( } else { columnSchema = std::make_unique( true /* isUnFlat */, dataPos.dataChunkPos, (uint32_t)sizeof(overflow_value_t)); - hasUnFlatColumn = true; } tableSchema->appendColumn(std::move(columnSchema)); payloadsPos.push_back(dataPos); } - // auto sharedState = std::make_shared( - // memoryManager, tableSchema->copy(), hasUnFlatColumn ? 1 : - // common::DEFAULT_VECTOR_CAPACITY); - auto sharedState = std::make_shared(); + auto table = std::make_shared(memoryManager, tableSchema->copy()); + auto sharedState = std::make_shared(std::move(table)); auto info = std::make_unique(std::move(tableSchema), payloadsPos); return make_unique(std::make_unique(schema), std::move(info), std::move(sharedState), std::move(prevOperator), getOperatorID(), diff --git a/src/processor/mapper/map_cross_product.cpp b/src/processor/mapper/map_cross_product.cpp index 139fccc09b6..4ffcaa1d654 100644 --- a/src/processor/mapper/map_cross_product.cpp +++ b/src/processor/mapper/map_cross_product.cpp @@ -30,8 +30,7 @@ std::unique_ptr PlanMapper::mapLogicalCrossProductToPhysical( std::make_unique(std::move(outVecPos), std::move(colIndicesToScan)); auto table = resultCollector->getResultFactorizedTable(); auto maxMorselSize = table->hasUnflatCol() ? 1 : common::DEFAULT_VECTOR_CAPACITY; - auto localState = std::make_unique( - table, maxMorselSize); + auto localState = std::make_unique(table, maxMorselSize); return make_unique(std::move(info), std::move(localState), std::move(probeSidePrevOperator), std::move(resultCollector), getOperatorID(), logicalCrossProduct->getExpressionsForPrinting()); diff --git a/src/processor/mapper/map_union.cpp b/src/processor/mapper/map_union.cpp index 9ab6eec933a..7834c301098 100644 --- a/src/processor/mapper/map_union.cpp +++ b/src/processor/mapper/map_union.cpp @@ -32,13 +32,12 @@ std::unique_ptr PlanMapper::mapLogicalUnionAllToPhysical( outputPositions.emplace_back(outSchema->getExpressionPos(*expression)); columnIndices.push_back(i); } - auto info = std::make_unique(std::move(outputPositions), std::move(columnIndices)); + auto info = + std::make_unique(std::move(outputPositions), std::move(columnIndices)); auto maxMorselSize = tables[0]->hasUnflatCol() ? 1 : common::DEFAULT_VECTOR_CAPACITY; - auto unionSharedState = - make_shared(std::move(tables), maxMorselSize); - return make_unique(std::move(info), - unionSharedState, std::move(prevOperators), getOperatorID(), - logicalUnionAll.getExpressionsForPrinting()); + auto unionSharedState = make_shared(std::move(tables), maxMorselSize); + return make_unique(std::move(info), unionSharedState, std::move(prevOperators), + getOperatorID(), logicalUnionAll.getExpressionsForPrinting()); } } // namespace processor diff --git a/src/processor/operator/table_scan/factorized_table_scan.cpp b/src/processor/operator/table_scan/factorized_table_scan.cpp index 363120f0a9e..e54d9974ab5 100644 --- a/src/processor/operator/table_scan/factorized_table_scan.cpp +++ b/src/processor/operator/table_scan/factorized_table_scan.cpp @@ -29,4 +29,4 @@ bool FactorizedTableScan::getNextTuplesInternal(ExecutionContext* context) { } } // namespace processor -} // namespace kuzu \ No newline at end of file +} // namespace kuzu