Skip to content

Commit

Permalink
X
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Jul 20, 2023
1 parent 3b7001a commit 22395ab
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 16 deletions.
5 changes: 4 additions & 1 deletion src/include/processor/operator/result_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ namespace processor {

class ResultCollectorSharedState {
public:
explicit ResultCollectorSharedState(std::shared_ptr<FactorizedTable> table)
: table{std::move(table)} {}

inline void mergeLocalTable(FactorizedTable& localTable) {
std::unique_lock lck{mtx};
table->merge(localTable);
Expand All @@ -28,7 +31,7 @@ struct ResultCollectorInfo {
std::unique_ptr<FactorizedTableSchema> tableSchema, std::vector<DataPos> payloadPositions)
: tableSchema{std::move(tableSchema)}, payloadPositions{std::move(payloadPositions)} {}
ResultCollectorInfo(const ResultCollectorInfo& other)
: tableSchema{other.tableSchema->copy()} {}
: tableSchema{other.tableSchema->copy()}, payloadPositions{other.payloadPositions} {}

inline std::unique_ptr<ResultCollectorInfo> copy() const {
return std::make_unique<ResultCollectorInfo>(*this);
Expand Down
8 changes: 2 additions & 6 deletions src/processor/mapper/create_result_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace processor {
std::unique_ptr<ResultCollector> PlanMapper::createResultCollector(
const binder::expression_vector& expressions, Schema* schema,
std::unique_ptr<PhysicalOperator> prevOperator) {
bool hasUnFlatColumn = false;
std::vector<DataPos> payloadsPos;
auto tableSchema = std::make_unique<FactorizedTableSchema>();
for (auto& expression : expressions) {
Expand All @@ -22,15 +21,12 @@ std::unique_ptr<ResultCollector> PlanMapper::createResultCollector(
} else {
columnSchema = std::make_unique<ColumnSchema>(
true /* isUnFlat */, dataPos.dataChunkPos, (uint32_t)sizeof(overflow_value_t));
hasUnFlatColumn = true;
}
tableSchema->appendColumn(std::move(columnSchema));
payloadsPos.push_back(dataPos);
}
// auto sharedState = std::make_shared<ResultCollectorSharedState>(
// memoryManager, tableSchema->copy(), hasUnFlatColumn ? 1 :
// common::DEFAULT_VECTOR_CAPACITY);
auto sharedState = std::make_shared<ResultCollectorSharedState>();
auto table = std::make_shared<FactorizedTable>(memoryManager, tableSchema->copy());
auto sharedState = std::make_shared<ResultCollectorSharedState>(std::move(table));
auto info = std::make_unique<ResultCollectorInfo>(std::move(tableSchema), payloadsPos);
return make_unique<ResultCollector>(std::make_unique<ResultSetDescriptor>(schema),
std::move(info), std::move(sharedState), std::move(prevOperator), getOperatorID(),
Expand Down
3 changes: 1 addition & 2 deletions src/processor/mapper/map_cross_product.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCrossProductToPhysical(
std::make_unique<CrossProductInfo>(std::move(outVecPos), std::move(colIndicesToScan));
auto table = resultCollector->getResultFactorizedTable();
auto maxMorselSize = table->hasUnflatCol() ? 1 : common::DEFAULT_VECTOR_CAPACITY;
auto localState = std::make_unique<CrossProductLocalState>(
table, maxMorselSize);
auto localState = std::make_unique<CrossProductLocalState>(table, maxMorselSize);
return make_unique<CrossProduct>(std::move(info), std::move(localState),
std::move(probeSidePrevOperator), std::move(resultCollector), getOperatorID(),
logicalCrossProduct->getExpressionsForPrinting());
Expand Down
11 changes: 5 additions & 6 deletions src/processor/mapper/map_union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalUnionAllToPhysical(
outputPositions.emplace_back(outSchema->getExpressionPos(*expression));
columnIndices.push_back(i);
}
auto info = std::make_unique<UnionAllScanInfo>(std::move(outputPositions), std::move(columnIndices));
auto info =
std::make_unique<UnionAllScanInfo>(std::move(outputPositions), std::move(columnIndices));
auto maxMorselSize = tables[0]->hasUnflatCol() ? 1 : common::DEFAULT_VECTOR_CAPACITY;
auto unionSharedState =
make_shared<UnionAllScanSharedState>(std::move(tables), maxMorselSize);
return make_unique<UnionAllScan>(std::move(info),
unionSharedState, std::move(prevOperators), getOperatorID(),
logicalUnionAll.getExpressionsForPrinting());
auto unionSharedState = make_shared<UnionAllScanSharedState>(std::move(tables), maxMorselSize);
return make_unique<UnionAllScan>(std::move(info), unionSharedState, std::move(prevOperators),
getOperatorID(), logicalUnionAll.getExpressionsForPrinting());
}

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ bool FactorizedTableScan::getNextTuplesInternal(ExecutionContext* context) {
}

} // namespace processor
} // namespace kuzu
} // namespace kuzu

0 comments on commit 22395ab

Please sign in to comment.