diff --git a/src/expression_evaluator/function_evaluator.cpp b/src/expression_evaluator/function_evaluator.cpp index fd720774f9..0bfed88d9d 100644 --- a/src/expression_evaluator/function_evaluator.cpp +++ b/src/expression_evaluator/function_evaluator.cpp @@ -12,7 +12,7 @@ void FunctionExpressionEvaluator::init(const ResultSet& resultSet, MemoryManager selectFunc = ((ScalarFunctionExpression&)*expression).selectFunc; } resultVector = make_shared(expression->dataType, memoryManager); - if (children.empty()) { + if (children.empty()) { // const function, e.g. PI() resultVector->state = DataChunkState::getSingleValueDataChunkState(); } for (auto& child : children) { diff --git a/src/include/planner/logical_plan/logical_operator/logical_accumulate.h b/src/include/planner/logical_plan/logical_operator/logical_accumulate.h index b73aa6779c..c790aab400 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_accumulate.h +++ b/src/include/planner/logical_plan/logical_operator/logical_accumulate.h @@ -8,37 +8,29 @@ namespace planner { class LogicalAccumulate : public LogicalOperator { public: - LogicalAccumulate(expression_vector expressions, vector flatOutputGroupPositions, - unique_ptr schemaBeforeSink, shared_ptr child) - : LogicalOperator{move(child)}, expressions{move(expressions)}, - flatOutputGroupPositions{move(flatOutputGroupPositions)}, schemaBeforeSink{ - move(schemaBeforeSink)} {} + LogicalAccumulate(expression_vector expressions, unique_ptr schemaBeforeSink, + shared_ptr child) + : LogicalOperator{std::move(child)}, expressions{std::move(expressions)}, + schemaBeforeSink{std::move(schemaBeforeSink)} {} LogicalOperatorType getLogicalOperatorType() const override { return LogicalOperatorType::LOGICAL_ACCUMULATE; } string getExpressionsForPrinting() const override { - string result; - for (auto& expression : expressions) { - result += expression->getRawName() + ","; - } - return result; + return ExpressionUtil::toString(expressions); } inline expression_vector getExpressions() const { return expressions; } - inline vector getFlatOutputGroupPositions() const { return flatOutputGroupPositions; } inline Schema* getSchemaBeforeSink() const { return schemaBeforeSink.get(); } unique_ptr copy() override { return make_unique( - expressions, flatOutputGroupPositions, schemaBeforeSink->copy(), children[0]->copy()); + expressions, schemaBeforeSink->copy(), children[0]->copy()); } private: expression_vector expressions; - // TODO(Xiyang): remove this when fixing issue #606 - vector flatOutputGroupPositions; unique_ptr schemaBeforeSink; }; diff --git a/src/include/planner/logical_plan/logical_operator/logical_cross_product.h b/src/include/planner/logical_plan/logical_operator/logical_cross_product.h index 1d5827f693..2cbb05406e 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_cross_product.h +++ b/src/include/planner/logical_plan/logical_operator/logical_cross_product.h @@ -8,11 +8,9 @@ namespace planner { class LogicalCrossProduct : public LogicalOperator { public: LogicalCrossProduct(unique_ptr buildSideSchema, - vector flatOutputGroupPositions, shared_ptr probeSideChild, - shared_ptr buildSideChild) + shared_ptr probeSideChild, shared_ptr buildSideChild) : LogicalOperator{std::move(probeSideChild), std::move(buildSideChild)}, - buildSideSchema{std::move(buildSideSchema)}, flatOutputGroupPositions{ - std::move(flatOutputGroupPositions)} {} + buildSideSchema{std::move(buildSideSchema)} {} inline LogicalOperatorType getLogicalOperatorType() const override { return LogicalOperatorType::LOGICAL_CROSS_PRODUCT; @@ -21,16 +19,14 @@ class LogicalCrossProduct : public LogicalOperator { inline string getExpressionsForPrinting() const override { return string(); } inline Schema* getBuildSideSchema() const { return buildSideSchema.get(); } - inline vector getFlatOutputGroupPositions() const { return flatOutputGroupPositions; } inline unique_ptr copy() override { - return make_unique(buildSideSchema->copy(), flatOutputGroupPositions, - children[0]->copy(), children[1]->copy()); + return make_unique( + buildSideSchema->copy(), children[0]->copy(), children[1]->copy()); } private: unique_ptr buildSideSchema; - vector flatOutputGroupPositions; }; } // namespace planner diff --git a/src/include/planner/logical_plan/logical_operator/logical_extend.h b/src/include/planner/logical_plan/logical_operator/logical_extend.h index 4e1a2b07d8..120fa907ec 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_extend.h +++ b/src/include/planner/logical_plan/logical_operator/logical_extend.h @@ -31,7 +31,7 @@ class LogicalExtend : public LogicalOperator { if (!extendToNewGroup) { nbrGroupPos = boundGroupPos; } else { - assert(schema.getGroup(boundGroupPos)->getIsFlat()); + assert(schema.getGroup(boundGroupPos)->isFlat()); nbrGroupPos = schema.createGroup(); } schema.insertToGroupAndScope(nbrNode->getInternalIDProperty(), nbrGroupPos); diff --git a/src/include/planner/logical_plan/logical_operator/logical_flatten.h b/src/include/planner/logical_plan/logical_operator/logical_flatten.h index e21bf513ff..4610940636 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_flatten.h +++ b/src/include/planner/logical_plan/logical_operator/logical_flatten.h @@ -22,9 +22,8 @@ class LogicalFlatten : public LogicalOperator { inline shared_ptr getExpression() const { return expression; } inline void computeSchema(Schema& schema) { - auto group = schema.getGroup(expression); - assert(!group->getIsFlat()); - group->setIsFlat(true); + auto groupPos = schema.getGroupPos(expression->getUniqueName()); + schema.flattenGroup(groupPos); } unique_ptr copy() override { diff --git a/src/include/planner/logical_plan/logical_operator/logical_ftable_scan.h b/src/include/planner/logical_plan/logical_operator/logical_ftable_scan.h index 4da6ba44ec..b80d61c24b 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_ftable_scan.h +++ b/src/include/planner/logical_plan/logical_operator/logical_ftable_scan.h @@ -9,11 +9,9 @@ using namespace kuzu::binder; class LogicalFTableScan : public LogicalOperator { public: - LogicalFTableScan(expression_vector expressionsToScan, expression_vector expressionsAccumulated, - vector flatOutputGroupPositions) - : expressionsToScan{std::move(expressionsToScan)}, expressionsAccumulated{std::move( - expressionsAccumulated)}, - flatOutputGroupPositions{std::move(flatOutputGroupPositions)} {} + LogicalFTableScan(expression_vector expressionsToScan, expression_vector expressionsAccumulated) + : expressionsToScan{std::move(expressionsToScan)}, expressionsAccumulated{ + std::move(expressionsAccumulated)} {} inline LogicalOperatorType getLogicalOperatorType() const override { return LogicalOperatorType::LOGICAL_FTABLE_SCAN; @@ -25,11 +23,9 @@ class LogicalFTableScan : public LogicalOperator { inline expression_vector getExpressionsToScan() const { return expressionsToScan; } inline expression_vector getExpressionsAccumulated() const { return expressionsAccumulated; } - inline vector getFlatOutputGroupPositions() const { return flatOutputGroupPositions; } unique_ptr copy() override { - return make_unique( - expressionsToScan, expressionsAccumulated, flatOutputGroupPositions); + return make_unique(expressionsToScan, expressionsAccumulated); } private: @@ -37,7 +33,6 @@ class LogicalFTableScan : public LogicalOperator { // expressionsToScan can be a subset of expressionsAccumulated (i.e. partially scan a factorized // table). expression_vector expressionsAccumulated; - vector flatOutputGroupPositions; }; } // namespace planner diff --git a/src/include/planner/logical_plan/logical_operator/logical_hash_join.h b/src/include/planner/logical_plan/logical_operator/logical_hash_join.h index 3910589a19..d3babf1bd5 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_hash_join.h +++ b/src/include/planner/logical_plan/logical_operator/logical_hash_join.h @@ -17,32 +17,29 @@ class LogicalHashJoin : public LogicalOperator { // Inner and left join. LogicalHashJoin(vector> joinNodes, JoinType joinType, bool isProbeAcc, unique_ptr buildSideSchema, - vector flatOutputGroupPositions, expression_vector expressionsToMaterialize, - shared_ptr probeSideChild, shared_ptr buildSideChild) + expression_vector expressionsToMaterialize, shared_ptr probeSideChild, + shared_ptr buildSideChild) : LogicalHashJoin{std::move(joinNodes), joinType, nullptr, isProbeAcc, - std::move(buildSideSchema), std::move(flatOutputGroupPositions), - std::move(expressionsToMaterialize), std::move(probeSideChild), - std::move(buildSideChild)} {} + std::move(buildSideSchema), std::move(expressionsToMaterialize), + std::move(probeSideChild), std::move(buildSideChild)} {} // Mark join. LogicalHashJoin(vector> joinNodes, shared_ptr mark, bool isProbeAcc, unique_ptr buildSideSchema, shared_ptr probeSideChild, shared_ptr buildSideChild) : LogicalHashJoin{std::move(joinNodes), JoinType::MARK, std::move(mark), isProbeAcc, - std::move(buildSideSchema), vector{} /* flatOutputGroupPositions */, - expression_vector{} /* expressionsToMaterialize */, std::move(probeSideChild), - std::move(buildSideChild)} {} + std::move(buildSideSchema), expression_vector{} /* expressionsToMaterialize */, + std::move(probeSideChild), std::move(buildSideChild)} {} LogicalHashJoin(vector> joinNodes, JoinType joinType, shared_ptr mark, bool isProbeAcc, unique_ptr buildSideSchema, - vector flatOutputGroupPositions, expression_vector expressionsToMaterialize, - shared_ptr probeSideChild, shared_ptr buildSideChild) + expression_vector expressionsToMaterialize, shared_ptr probeSideChild, + shared_ptr buildSideChild) : LogicalOperator{std::move(probeSideChild), std::move(buildSideChild)}, joinNodes(std::move(joinNodes)), joinType{joinType}, mark{std::move(mark)}, isProbeAcc{isProbeAcc}, - buildSideSchema(std::move(buildSideSchema)), flatOutputGroupPositions{std::move( - flatOutputGroupPositions)}, - expressionsToMaterialize{std::move(expressionsToMaterialize)} {} + buildSideSchema(std::move(buildSideSchema)), expressionsToMaterialize{ + std::move(expressionsToMaterialize)} {} inline LogicalOperatorType getLogicalOperatorType() const override { return LogicalOperatorType::LOGICAL_HASH_JOIN; @@ -67,12 +64,11 @@ class LogicalHashJoin : public LogicalOperator { } inline bool getIsProbeAcc() const { return isProbeAcc; } inline Schema* getBuildSideSchema() const { return buildSideSchema.get(); } - inline vector getFlatOutputGroupPositions() const { return flatOutputGroupPositions; } inline unique_ptr copy() override { return make_unique(joinNodes, joinType, mark, isProbeAcc, - buildSideSchema->copy(), flatOutputGroupPositions, expressionsToMaterialize, - children[0]->copy(), children[1]->copy()); + buildSideSchema->copy(), expressionsToMaterialize, children[0]->copy(), + children[1]->copy()); } private: @@ -81,8 +77,6 @@ class LogicalHashJoin : public LogicalOperator { shared_ptr mark; // when joinType is Mark bool isProbeAcc; unique_ptr buildSideSchema; - // TODO(Xiyang): solve this with issue 606 - vector flatOutputGroupPositions; expression_vector expressionsToMaterialize; }; diff --git a/src/include/planner/logical_plan/logical_operator/logical_scan_node.h b/src/include/planner/logical_plan/logical_operator/logical_scan_node.h index 50bd3b2a11..4718119508 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_scan_node.h +++ b/src/include/planner/logical_plan/logical_operator/logical_scan_node.h @@ -44,7 +44,7 @@ class LogicalIndexScanNode : public LogicalScanNode { inline void computeSchema(Schema& schema) override { LogicalScanNode::computeSchema(schema); auto groupPos = schema.getGroupPos(node->getInternalIDPropertyName()); - schema.getGroup(groupPos)->setIsFlat(true); + schema.setGroupAsSingleState(groupPos); } inline shared_ptr getIndexExpression() const { return indexExpression; } diff --git a/src/include/planner/logical_plan/logical_operator/schema.h b/src/include/planner/logical_plan/logical_operator/schema.h index c21f632e6b..b303afa596 100644 --- a/src/include/planner/logical_plan/logical_operator/schema.h +++ b/src/include/planner/logical_plan/logical_operator/schema.h @@ -14,13 +14,22 @@ class FactorizationGroup { friend class Schema; public: - FactorizationGroup() : isFlat{false}, cardinalityMultiplier{1} {} + FactorizationGroup() : flat{false}, singleState{false}, cardinalityMultiplier{1} {} FactorizationGroup(const FactorizationGroup& other) - : isFlat{other.isFlat}, cardinalityMultiplier{other.cardinalityMultiplier}, - expressions{other.expressions} {} + : flat{other.flat}, singleState{other.singleState}, + cardinalityMultiplier{other.cardinalityMultiplier}, expressions{other.expressions} {} - inline void setIsFlat(bool flag) { isFlat = flag; } - inline bool getIsFlat() const { return isFlat; } + inline void setFlat() { + assert(!flat); + flat = true; + } + inline bool isFlat() const { return flat; } + inline void setSingleState() { + assert(!singleState); + singleState = true; + setFlat(); + } + inline bool isSingleState() const { return singleState; } inline void setMultiplier(uint64_t multiplier) { cardinalityMultiplier = multiplier; } inline uint64_t getMultiplier() const { return cardinalityMultiplier; } @@ -31,7 +40,8 @@ class FactorizationGroup { inline expression_vector getExpressions() const { return expressions; } private: - bool isFlat; + bool flat; + bool singleState; uint64_t cardinalityMultiplier; expression_vector expressions; }; @@ -67,7 +77,8 @@ class Schema { return expressionNameToGroupPos.at(expressionName); } - inline void flattenGroup(uint32_t pos) { groups[pos]->isFlat = true; } + inline void flattenGroup(uint32_t pos) { groups[pos]->setFlat(); } + inline void setGroupAsSingleState(uint32_t pos) { groups[pos]->setSingleState(); } bool isExpressionInScope(const Expression& expression) const; diff --git a/src/include/processor/operator/cross_product.h b/src/include/processor/operator/cross_product.h index 0255a48844..7171ad69f0 100644 --- a/src/include/processor/operator/cross_product.h +++ b/src/include/processor/operator/cross_product.h @@ -9,23 +9,20 @@ class CrossProduct : public PhysicalOperator { public: CrossProduct(shared_ptr sharedState, vector> outVecPosAndTypePairs, vector colIndicesToScan, - vector flatDataChunkPositions, unique_ptr probeChild, - unique_ptr buildChild, uint32_t id, const string& paramsString) + unique_ptr probeChild, unique_ptr buildChild, + uint32_t id, const string& paramsString) : PhysicalOperator{std::move(probeChild), std::move(buildChild), id, paramsString}, - sharedState{std::move(sharedState)}, outVecPosAndTypePairs{std::move( - outVecPosAndTypePairs)}, - colIndicesToScan{std::move(colIndicesToScan)}, flatDataChunkPositions{ - std::move(flatDataChunkPositions)} {} + sharedState{std::move(sharedState)}, + outVecPosAndTypePairs{std::move(outVecPosAndTypePairs)}, colIndicesToScan{std::move( + colIndicesToScan)} {} // Clone only. CrossProduct(shared_ptr sharedState, vector> outVecPosAndTypePairs, vector colIndicesToScan, - vector flatDataChunkPositions, unique_ptr child, uint32_t id, - const string& paramsString) + unique_ptr child, uint32_t id, const string& paramsString) : PhysicalOperator{std::move(child), id, paramsString}, sharedState{std::move(sharedState)}, - outVecPosAndTypePairs{std::move(outVecPosAndTypePairs)}, - colIndicesToScan{std::move(colIndicesToScan)}, flatDataChunkPositions{ - std::move(flatDataChunkPositions)} {} + outVecPosAndTypePairs{std::move(outVecPosAndTypePairs)}, colIndicesToScan{std::move( + colIndicesToScan)} {} PhysicalOperatorType getOperatorType() override { return PhysicalOperatorType::CROSS_PRODUCT; } @@ -35,14 +32,13 @@ class CrossProduct : public PhysicalOperator { unique_ptr clone() override { return make_unique(sharedState, outVecPosAndTypePairs, colIndicesToScan, - flatDataChunkPositions, children[0]->clone(), id, paramsString); + children[0]->clone(), id, paramsString); } private: shared_ptr sharedState; vector> outVecPosAndTypePairs; vector colIndicesToScan; - vector flatDataChunkPositions; uint64_t startIdx = 0u; vector> vectorsToScan; diff --git a/src/include/processor/operator/hash_join/hash_join_probe.h b/src/include/processor/operator/hash_join/hash_join_probe.h index 8a6d0497a2..a0bc88adda 100644 --- a/src/include/processor/operator/hash_join/hash_join_probe.h +++ b/src/include/processor/operator/hash_join/hash_join_probe.h @@ -51,23 +51,20 @@ struct ProbeDataInfo { class HashJoinProbe : public PhysicalOperator, FilteringOperator { public: HashJoinProbe(shared_ptr sharedState, JoinType joinType, - vector flatDataChunkPositions, const ProbeDataInfo& probeDataInfo, - unique_ptr probeChild, unique_ptr buildChild, - uint32_t id, const string& paramsString) + const ProbeDataInfo& probeDataInfo, unique_ptr probeChild, + unique_ptr buildChild, uint32_t id, const string& paramsString) : PhysicalOperator{std::move(probeChild), std::move(buildChild), id, paramsString}, FilteringOperator{probeDataInfo.keysDataPos.size()}, - sharedState{std::move(sharedState)}, joinType{joinType}, - flatDataChunkPositions{std::move(flatDataChunkPositions)}, probeDataInfo{probeDataInfo} {} + sharedState{std::move(sharedState)}, joinType{joinType}, probeDataInfo{probeDataInfo} {} // This constructor is used for cloning only. // HashJoinProbe do not need to clone hashJoinBuild which is on a different pipeline. HashJoinProbe(shared_ptr sharedState, JoinType joinType, - vector flatDataChunkPositions, const ProbeDataInfo& probeDataInfo, - unique_ptr probeChild, uint32_t id, const string& paramsString) + const ProbeDataInfo& probeDataInfo, unique_ptr probeChild, uint32_t id, + const string& paramsString) : PhysicalOperator{std::move(probeChild), id, paramsString}, FilteringOperator{probeDataInfo.keysDataPos.size()}, - sharedState{std::move(sharedState)}, joinType{joinType}, - flatDataChunkPositions{std::move(flatDataChunkPositions)}, probeDataInfo{probeDataInfo} {} + sharedState{std::move(sharedState)}, joinType{joinType}, probeDataInfo{probeDataInfo} {} inline PhysicalOperatorType getOperatorType() override { return HASH_JOIN_PROBE; } @@ -76,8 +73,8 @@ class HashJoinProbe : public PhysicalOperator, FilteringOperator { bool getNextTuplesInternal() override; inline unique_ptr clone() override { - return make_unique(sharedState, joinType, flatDataChunkPositions, - probeDataInfo, children[0]->clone(), id, paramsString); + return make_unique( + sharedState, joinType, probeDataInfo, children[0]->clone(), id, paramsString); } private: @@ -93,7 +90,6 @@ class HashJoinProbe : public PhysicalOperator, FilteringOperator { private: shared_ptr sharedState; JoinType joinType; - vector flatDataChunkPositions; ProbeDataInfo probeDataInfo; vector> vectorsToReadInto; diff --git a/src/include/processor/operator/source_operator.h b/src/include/processor/operator/source_operator.h index 6e5960fb18..43cdd83aed 100644 --- a/src/include/processor/operator/source_operator.h +++ b/src/include/processor/operator/source_operator.h @@ -17,7 +17,11 @@ class SourceOperator { auto resultSet = make_shared(numDataChunks); for (auto i = 0u; i < numDataChunks; ++i) { auto dataChunkDescriptor = resultSetDescriptor->getDataChunkDescriptor(i); - resultSet->insert(i, make_shared(dataChunkDescriptor->getNumValueVectors())); + auto dataChunk = make_shared(dataChunkDescriptor->getNumValueVectors()); + if (dataChunkDescriptor->isSingleState()) { + dataChunk->state = DataChunkState::getSingleValueDataChunkState(); + } + resultSet->insert(i, dataChunk); } return resultSet; } diff --git a/src/include/processor/operator/table_scan/factorized_table_scan.h b/src/include/processor/operator/table_scan/factorized_table_scan.h index 4d8b8e9154..fcebde9522 100644 --- a/src/include/processor/operator/table_scan/factorized_table_scan.h +++ b/src/include/processor/operator/table_scan/factorized_table_scan.h @@ -11,32 +11,27 @@ class FactorizedTableScan : public BaseTableScan { FactorizedTableScan(unique_ptr resultSetDescriptor, vector outVecPositions, vector outVecDataTypes, vector colIndicesToScan, shared_ptr sharedState, - vector flatDataChunkPositions, unique_ptr child, uint32_t id, - const string& paramsString) + unique_ptr child, uint32_t id, const string& paramsString) : BaseTableScan{std::move(resultSetDescriptor), std::move(outVecPositions), move(outVecDataTypes), std::move(colIndicesToScan), std::move(child), id, paramsString}, - sharedState{std::move(sharedState)}, flatDataChunkPositions{ - std::move(flatDataChunkPositions)} {} + sharedState{std::move(sharedState)} {} // Scan some columns. FactorizedTableScan(unique_ptr resultSetDescriptor, vector outVecPositions, vector outVecDataTypes, - vector colIndicesToScan, vector flatDataChunkPositions, uint32_t id, - const string& paramsString) + vector colIndicesToScan, uint32_t id, const string& paramsString) : BaseTableScan{std::move(resultSetDescriptor), std::move(outVecPositions), - std::move(outVecDataTypes), std::move(colIndicesToScan), id, paramsString}, - flatDataChunkPositions{std::move(flatDataChunkPositions)} {} + std::move(outVecDataTypes), std::move(colIndicesToScan), id, paramsString} {} // For clone only. FactorizedTableScan(unique_ptr resultSetDescriptor, vector outVecPositions, vector outVecDataTypes, - vector colIndicesToScan, shared_ptr sharedState, - vector flatDataChunkPositions, uint32_t id, const string& paramsString) + vector colIndicesToScan, shared_ptr sharedState, uint32_t id, + const string& paramsString) : BaseTableScan{std::move(resultSetDescriptor), std::move(outVecPositions), std::move(outVecDataTypes), std::move(colIndicesToScan), id, paramsString}, - sharedState{std::move(sharedState)}, flatDataChunkPositions{ - std::move(flatDataChunkPositions)} {} + sharedState{std::move(sharedState)} {} inline void setSharedState(shared_ptr state) { sharedState = std::move(state); @@ -52,13 +47,11 @@ class FactorizedTableScan : public BaseTableScan { inline unique_ptr clone() override { assert(sharedState != nullptr); return make_unique(resultSetDescriptor->copy(), outVecPositions, - outVecDataTypes, colIndicesToScan, sharedState, flatDataChunkPositions, id, - paramsString); + outVecDataTypes, colIndicesToScan, sharedState, id, paramsString); } private: shared_ptr sharedState; - vector flatDataChunkPositions; }; } // namespace processor diff --git a/src/include/processor/result/result_set_descriptor.h b/src/include/processor/result/result_set_descriptor.h index 20b102615a..0b0ed1eb88 100644 --- a/src/include/processor/result/result_set_descriptor.h +++ b/src/include/processor/result/result_set_descriptor.h @@ -19,9 +19,13 @@ class DataChunkDescriptor { DataChunkDescriptor() = default; DataChunkDescriptor(const DataChunkDescriptor& other) - : expressionNameToValueVectorPosMap{other.expressionNameToValueVectorPosMap}, + : singleState{other.singleState}, + expressionNameToValueVectorPosMap{other.expressionNameToValueVectorPosMap}, expressionNames{other.expressionNames} {} + inline void setSingleState() { singleState = true; } + inline bool isSingleState() const { return singleState; } + inline uint32_t getValueVectorPos(const string& name) const { assert(expressionNameToValueVectorPosMap.contains(name)); return expressionNameToValueVectorPosMap.at(name); @@ -35,6 +39,7 @@ class DataChunkDescriptor { } private: + bool singleState = false; unordered_map expressionNameToValueVectorPosMap; vector expressionNames; }; diff --git a/src/planner/join_order_enumerator.cpp b/src/planner/join_order_enumerator.cpp index 0bcf548254..433d44367d 100644 --- a/src/planner/join_order_enumerator.cpp +++ b/src/planner/join_order_enumerator.cpp @@ -503,21 +503,19 @@ void JoinOrderEnumerator::appendFTableScan( } groupPosToExpressionsMap.at(outerPos).push_back(expression); } - vector flatOutputGroupPositions; auto schema = plan.getSchema(); for (auto& [outerPos, expressions] : groupPosToExpressionsMap) { auto innerPos = schema->createGroup(); schema->insertToGroupAndScope(expressions, innerPos); - if (outerPlan->getSchema()->getGroup(outerPos)->getIsFlat()) { - schema->flattenGroup(innerPos); - flatOutputGroupPositions.push_back(innerPos); + if (outerPlan->getSchema()->getGroup(outerPos)->isFlat()) { + schema->setGroupAsSingleState(innerPos); } } assert(outerPlan->getLastOperator()->getLogicalOperatorType() == LogicalOperatorType::LOGICAL_ACCUMULATE); auto logicalAcc = (LogicalAccumulate*)outerPlan->getLastOperator().get(); - auto fTableScan = make_shared( - expressionsToScan, logicalAcc->getExpressions(), flatOutputGroupPositions); + auto fTableScan = + make_shared(expressionsToScan, logicalAcc->getExpressions()); plan.setLastOperator(std::move(fTableScan)); } @@ -645,7 +643,7 @@ static bool isJoinKeyUniqueOnBuildSide(const string& joinNodeID, LogicalPlan& bu void JoinOrderEnumerator::appendHashJoin(const vector>& joinNodes, JoinType joinType, bool isProbeAcc, LogicalPlan& probePlan, LogicalPlan& buildPlan) { - auto& buildSideSchema = *buildPlan.getSchema(); + auto buildSideSchema = buildPlan.getSchema(); auto probeSideSchema = probePlan.getSchema(); probePlan.increaseCost(probePlan.getCardinality() + buildPlan.getCardinality()); // Flat probe side key group in either of the following two cases: @@ -670,7 +668,7 @@ void JoinOrderEnumerator::appendHashJoin(const vector unordered_set joinNodesGroupPos; for (auto& joinNode : joinNodes) { joinNodesGroupPos.insert( - buildSideSchema.getGroupPos(joinNode->getInternalIDPropertyName())); + buildSideSchema->getGroupPos(joinNode->getInternalIDPropertyName())); } QueryPlanner::appendFlattensButOne(joinNodesGroupPos, buildPlan); @@ -679,15 +677,9 @@ void JoinOrderEnumerator::appendHashJoin(const vector for (auto& joinNode : joinNodes) { keys.push_back(joinNode->getInternalIDPropertyName()); } - SinkOperatorUtil::mergeSchema(buildSideSchema, *probeSideSchema, keys); - vector flatOutputGroupPositions; - for (auto i = numGroupsBeforeMerging; i < probeSideSchema->getNumGroups(); ++i) { - if (probeSideSchema->getGroup(i)->getIsFlat()) { - flatOutputGroupPositions.push_back(i); - } - } + SinkOperatorUtil::mergeSchema(*buildSideSchema, *probeSideSchema, keys); auto hashJoin = make_shared(joinNodes, joinType, isProbeAcc, - buildSideSchema.copy(), flatOutputGroupPositions, buildSideSchema.getExpressionsInScope(), + buildSideSchema->copy(), buildSideSchema->getExpressionsInScope(), probePlan.getLastOperator(), buildPlan.getLastOperator()); probePlan.setLastOperator(move(hashJoin)); } @@ -759,14 +751,8 @@ void JoinOrderEnumerator::appendCrossProduct(LogicalPlan& probePlan, LogicalPlan probePlan.increaseCost(probePlan.getCardinality() + buildPlan.getCardinality()); auto numGroupsBeforeMerging = probeSideSchema->getNumGroups(); SinkOperatorUtil::mergeSchema(*buildSideSchema, *probeSideSchema); - vector flatOutputGroupPositions; - for (auto i = numGroupsBeforeMerging; i < probeSideSchema->getNumGroups(); ++i) { - if (probeSideSchema->getGroup(i)->getIsFlat()) { - flatOutputGroupPositions.push_back(i); - } - } - auto crossProduct = make_shared(buildSideSchema->copy(), - flatOutputGroupPositions, probePlan.getLastOperator(), buildPlan.getLastOperator()); + auto crossProduct = make_shared( + buildSideSchema->copy(), probePlan.getLastOperator(), buildPlan.getLastOperator()); probePlan.setLastOperator(std::move(crossProduct)); } diff --git a/src/planner/operator/sink_util.cpp b/src/planner/operator/sink_util.cpp index da16f3c6a5..acaa4274d1 100644 --- a/src/planner/operator/sink_util.cpp +++ b/src/planner/operator/sink_util.cpp @@ -19,11 +19,11 @@ void SinkOperatorUtil::mergeSchema( auto flatPayloads = getFlatPayloadsIgnoringKeyGroup(inputSchema, keys); if (!flatPayloads.empty()) { auto flatPayloadsOutputGroupPos = appendPayloadsToNewGroup(result, flatPayloads); - result.flattenGroup(flatPayloadsOutputGroupPos); + result.setGroupAsSingleState(flatPayloadsOutputGroupPos); } for (auto& payloadGroupPos : getGroupsPosIgnoringKeyGroups(inputSchema, keys)) { auto payloadGroup = inputSchema.getGroup(payloadGroupPos); - if (!payloadGroup->getIsFlat()) { + if (!payloadGroup->isFlat()) { auto payloads = inputSchema.getExpressionsInScope(payloadGroupPos); auto outputGroupPos = appendPayloadsToNewGroup(result, payloads); result.getGroup(outputGroupPos)->setMultiplier(payloadGroup->getMultiplier()); @@ -38,11 +38,11 @@ void SinkOperatorUtil::mergeSchema(const Schema& inputSchema, Schema& result) { } else { if (!flatPayloads.empty()) { auto flatPayloadsOutputGroupPos = appendPayloadsToNewGroup(result, flatPayloads); - result.flattenGroup(flatPayloadsOutputGroupPos); + result.setGroupAsSingleState(flatPayloadsOutputGroupPos); } for (auto& payloadGroupPos : inputSchema.getGroupsPosInScope()) { auto payloadGroup = inputSchema.getGroup(payloadGroupPos); - if (!payloadGroup->getIsFlat()) { + if (!payloadGroup->isFlat()) { auto payloads = inputSchema.getExpressionsInScope(payloadGroupPos); auto outputGroupPos = appendPayloadsToNewGroup(result, payloads); result.getGroup(outputGroupPos)->setMultiplier(payloadGroup->getMultiplier()); @@ -83,7 +83,7 @@ expression_vector SinkOperatorUtil::getFlatPayloads( const Schema& schema, const unordered_set& payloadGroupsPos) { expression_vector result; for (auto& payloadGroupPos : payloadGroupsPos) { - if (schema.getGroup(payloadGroupPos)->getIsFlat()) { + if (schema.getGroup(payloadGroupPos)->isFlat()) { for (auto& payload : schema.getExpressionsInScope(payloadGroupPos)) { result.push_back(payload); } @@ -95,7 +95,7 @@ expression_vector SinkOperatorUtil::getFlatPayloads( bool SinkOperatorUtil::hasUnFlatPayload( const Schema& schema, const unordered_set& payloadGroupsPos) { for (auto& payloadGroupPos : payloadGroupsPos) { - if (!schema.getGroup(payloadGroupPos)->getIsFlat()) { + if (!schema.getGroup(payloadGroupPos)->isFlat()) { return true; } } diff --git a/src/planner/projection_planner.cpp b/src/planner/projection_planner.cpp index c3f9e98fea..b588a69f13 100644 --- a/src/planner/projection_planner.cpp +++ b/src/planner/projection_planner.cpp @@ -112,7 +112,8 @@ void ProjectionPlanner::appendProjection( uint32_t outputPos; if (dependentGroupsPos.empty()) { // e.g. constant that does not depend on any input. outputPos = schema->createGroup(); - schema->flattenGroup(outputPos); // Mark group holding constant as flat. + // Mark group holding constant as single state. + schema->setGroupAsSingleState(outputPos); } else { outputPos = QueryPlanner::appendFlattensButOne(dependentGroupsPos, plan); } diff --git a/src/planner/query_planner.cpp b/src/planner/query_planner.cpp index 01bc410b5f..a9164e7e18 100644 --- a/src/planner/query_planner.cpp +++ b/src/planner/query_planner.cpp @@ -269,14 +269,8 @@ void QueryPlanner::appendAccumulate(kuzu::planner::LogicalPlan& plan) { auto schema = plan.getSchema(); auto schemaBeforeSink = schema->copy(); SinkOperatorUtil::recomputeSchema(*schemaBeforeSink, *schema); - vector flatOutputGroupPositions; - for (auto i = 0u; i < schema->getNumGroups(); ++i) { - if (schema->getGroup(i)->getIsFlat()) { - flatOutputGroupPositions.push_back(i); - } - } auto sink = make_shared(schemaBeforeSink->getExpressionsInScope(), - flatOutputGroupPositions, std::move(schemaBeforeSink), plan.getLastOperator()); + std::move(schemaBeforeSink), plan.getLastOperator()); plan.setLastOperator(sink); } @@ -284,7 +278,7 @@ void QueryPlanner::appendExpressionsScan(const expression_vector& expressions, L assert(plan.isEmpty()); auto schema = plan.getSchema(); auto groupPos = schema->createGroup(); - schema->flattenGroup(groupPos); // Mark group holding constant as flat. + schema->setGroupAsSingleState(groupPos); // Mark group holding constant as single state. for (auto& expression : expressions) { // No need to insert repeated constant. if (schema->isExpressionInScope(*expression)) { @@ -340,7 +334,7 @@ uint32_t QueryPlanner::appendFlattensButOne( } vector unFlatGroupsPos; for (auto& groupPos : groupsPos) { - if (!plan.getSchema()->getGroup(groupPos)->getIsFlat()) { + if (!plan.getSchema()->getGroup(groupPos)->isFlat()) { unFlatGroupsPos.push_back(groupPos); } } @@ -357,7 +351,7 @@ void QueryPlanner::appendFlattenIfNecessary( const shared_ptr& expression, LogicalPlan& plan) { auto schema = plan.getSchema(); auto group = schema->getGroup(expression); - if (group->getIsFlat()) { + if (group->isFlat()) { return; } auto flatten = make_shared(expression, plan.getLastOperator()); @@ -433,7 +427,7 @@ unique_ptr QueryPlanner::createUnionPlan( for (auto& childPlan : childrenPlans) { auto childSchema = childPlan->getSchema(); auto expressionName = childSchema->getExpressionsInScope()[i]->getUniqueName(); - hasFlatExpression |= childSchema->getGroup(expressionName)->getIsFlat(); + hasFlatExpression |= childSchema->getGroup(expressionName)->isFlat(); } if (hasFlatExpression) { for (auto& childPlan : childrenPlans) { diff --git a/src/planner/update_planner.cpp b/src/planner/update_planner.cpp index 6cca4c9b31..91a049e378 100644 --- a/src/planner/update_planner.cpp +++ b/src/planner/update_planner.cpp @@ -48,12 +48,12 @@ void UpdatePlanner::planSetItem(expression_pair setItem, LogicalPlan& plan) { assert(lhs->getChild(0)->dataType.typeID == NODE); auto nodeExpression = static_pointer_cast(lhs->getChild(0)); auto lhsGroupPos = schema->getGroupPos(nodeExpression->getInternalIDPropertyName()); - auto isLhsFlat = schema->getGroup(lhsGroupPos)->getIsFlat(); + auto isLhsFlat = schema->getGroup(lhsGroupPos)->isFlat(); // Check RHS auto rhsDependentGroupsPos = schema->getDependentGroupsPos(rhs); if (!rhsDependentGroupsPos.empty()) { // RHS is not constant auto rhsPos = QueryPlanner::appendFlattensButOne(rhsDependentGroupsPos, plan); - auto isRhsFlat = schema->getGroup(rhsPos)->getIsFlat(); + auto isRhsFlat = schema->getGroup(rhsPos)->isFlat(); // If both are unflat and from different groups, we flatten LHS. if (!isRhsFlat && !isLhsFlat && lhsGroupPos != rhsPos) { QueryPlanner::appendFlattenIfNecessary(lhsGroupPos, plan); @@ -84,7 +84,7 @@ void UpdatePlanner::appendCreateNode( auto node = createNode->getNode(); auto groupPos = schema->createGroup(); schema->insertToGroupAndScope(node->getInternalIDProperty(), groupPos); - schema->flattenGroup(groupPos); // create output is always flat + schema->setGroupAsSingleState(groupPos); nodeAndPrimaryKeyPairs.emplace_back(node, createNode->getPrimaryKeyExpression()); for (auto& setItem : createNode->getSetItems()) { setItems.push_back(setItem); diff --git a/src/processor/mapper/map_accumulate.cpp b/src/processor/mapper/map_accumulate.cpp index e31f0a1bb3..8eb9fcc2ee 100644 --- a/src/processor/mapper/map_accumulate.cpp +++ b/src/processor/mapper/map_accumulate.cpp @@ -33,8 +33,8 @@ unique_ptr PlanMapper::mapLogicalAccumulateToPhysical( auto sharedState = resultCollector->getSharedState(); return make_unique(mapperContext.getResultSetDescriptor()->copy(), std::move(outDataPoses), std::move(outVecDataTypes), std::move(colIndicesToScan), - sharedState, logicalAccumulate->getFlatOutputGroupPositions(), std::move(resultCollector), - getOperatorID(), logicalAccumulate->getExpressionsForPrinting()); + sharedState, std::move(resultCollector), getOperatorID(), + logicalAccumulate->getExpressionsForPrinting()); } unique_ptr PlanMapper::mapLogicalFTableScanToPhysical( @@ -55,8 +55,7 @@ unique_ptr PlanMapper::mapLogicalFTableScanToPhysical( } return make_unique(mapperContext.getResultSetDescriptor()->copy(), std::move(outDataPoses), std::move(outDataTypes), std::move(colIndicesToScan), - logicalFTableScan->getFlatOutputGroupPositions(), getOperatorID(), - logicalFTableScan->getExpressionsForPrinting()); + getOperatorID(), logicalFTableScan->getExpressionsForPrinting()); } } // namespace processor diff --git a/src/processor/mapper/map_aggregate.cpp b/src/processor/mapper/map_aggregate.cpp index bb77a79414..8bee4cf978 100644 --- a/src/processor/mapper/map_aggregate.cpp +++ b/src/processor/mapper/map_aggregate.cpp @@ -104,7 +104,7 @@ void PlanMapper::appendGroupByExpressions(const expression_vector& groupByExpres vector& outputGroupByKeyVectorsDataTypes, MapperContext& mapperContextBeforeAggregate, MapperContext& mapperContext, Schema* schema, vector& isInputGroupByHashKeyVectorFlat) { for (auto& expression : groupByExpressions) { - if (schema->getGroup(expression->getUniqueName())->getIsFlat()) { + if (schema->getGroup(expression->getUniqueName())->isFlat()) { inputGroupByHashKeyVectorsPos.push_back( mapperContextBeforeAggregate.getDataPos(expression->getUniqueName())); outputGroupByKeyVectorsPos.push_back( @@ -116,7 +116,7 @@ void PlanMapper::appendGroupByExpressions(const expression_vector& groupByExpres } for (auto& expression : groupByExpressions) { - if (!schema->getGroup(expression->getUniqueName())->getIsFlat()) { + if (!schema->getGroup(expression->getUniqueName())->isFlat()) { inputGroupByHashKeyVectorsPos.push_back( mapperContextBeforeAggregate.getDataPos(expression->getUniqueName())); outputGroupByKeyVectorsPos.push_back( diff --git a/src/processor/mapper/map_cross_product.cpp b/src/processor/mapper/map_cross_product.cpp index 3e736367df..64882bfdcb 100644 --- a/src/processor/mapper/map_cross_product.cpp +++ b/src/processor/mapper/map_cross_product.cpp @@ -31,8 +31,7 @@ unique_ptr PlanMapper::mapLogicalCrossProductToPhysical( } return make_unique(resultCollector->getSharedState(), std::move(outVecPosAndTypePairs), std::move(colIndicesToScan), - logicalCrossProduct->getFlatOutputGroupPositions(), std::move(probeSidePrevOperator), - std::move(resultCollector), getOperatorID(), + std::move(probeSidePrevOperator), std::move(resultCollector), getOperatorID(), logicalCrossProduct->getExpressionsForPrinting()); } diff --git a/src/processor/mapper/map_expressions_scan.cpp b/src/processor/mapper/map_expressions_scan.cpp index bffc4b59fc..e8dc6e7852 100644 --- a/src/processor/mapper/map_expressions_scan.cpp +++ b/src/processor/mapper/map_expressions_scan.cpp @@ -39,12 +39,9 @@ unique_ptr PlanMapper::mapLogicalExpressionsScanToPhysical( mapperContext.addComputedExpressions(expressionName); colIndicesToScan.push_back(i); } - // static expressions must be output to one flat data chunk. - auto flatOutputDataChunkPositions = vector{0}; return make_unique(mapperContext.getResultSetDescriptor()->copy(), std::move(outDataPoses), std::move(outVecDataTypes), std::move(colIndicesToScan), - sharedState, std::move(flatOutputDataChunkPositions), getOperatorID(), - logicalExpressionsScan.getExpressionsForPrinting()); + sharedState, getOperatorID(), logicalExpressionsScan.getExpressionsForPrinting()); } } // namespace processor diff --git a/src/processor/mapper/map_hash_join.cpp b/src/processor/mapper/map_hash_join.cpp index 3a30ca5b4f..37bb5775a8 100644 --- a/src/processor/mapper/map_hash_join.cpp +++ b/src/processor/mapper/map_hash_join.cpp @@ -114,7 +114,7 @@ BuildDataInfo PlanMapper::generateBuildDataInfo(MapperContext& mapperContext, auto payloadPos = buildSideMapperContext.getDataPos(payloadUniqueName); buildPayloadsPosAndTypes.emplace_back(payloadPos, payload->dataType); auto payloadGroup = buildSideSchema->getGroup(payloadPos.dataChunkPos); - isBuildPayloadsFlat.push_back(payloadGroup->getIsFlat()); + isBuildPayloadsFlat.push_back(payloadGroup->isFlat()); isBuildPayloadsInKeyChunk.push_back(isBuildDataChunkContainKeys[payloadPos.dataChunkPos]); } return BuildDataInfo(buildKeysPosAndType, buildPayloadsPosAndTypes, isBuildPayloadsFlat, @@ -159,8 +159,8 @@ unique_ptr PlanMapper::mapLogicalHashJoinToPhysical( probeDataInfo.markDataPos = markOutputPos; } auto hashJoinProbe = make_unique(sharedState, hashJoin->getJoinType(), - hashJoin->getFlatOutputGroupPositions(), probeDataInfo, std::move(probeSidePrevOperator), - std::move(hashJoinBuild), getOperatorID(), paramsString); + probeDataInfo, std::move(probeSidePrevOperator), std::move(hashJoinBuild), getOperatorID(), + paramsString); if (hashJoin->getIsProbeAcc()) { if (containASPOnPipeline(hashJoin)) { mapASPJoin(hashJoin->getJoinNodes()[0].get(), hashJoinProbe.get()); diff --git a/src/processor/mapper/map_order_by.cpp b/src/processor/mapper/map_order_by.cpp index 2589660a5b..24e74f6563 100644 --- a/src/processor/mapper/map_order_by.cpp +++ b/src/processor/mapper/map_order_by.cpp @@ -29,7 +29,7 @@ unique_ptr PlanMapper::mapLogicalOrderByToPhysical( auto expressionName = expression->getUniqueName(); payloadsPosAndType.emplace_back( mapperContextBeforeOrderBy.getDataPos(expressionName), expression->dataType); - isPayloadFlat.push_back(schemaBeforeOrderBy.getGroup(expressionName)->getIsFlat()); + isPayloadFlat.push_back(schemaBeforeOrderBy.getGroup(expressionName)->isFlat()); outVectorPosAndTypes.emplace_back( mapperContext.getDataPos(expressionName), expression->dataType); mapperContext.addComputedExpressions(expressionName); diff --git a/src/processor/mapper/plan_mapper.cpp b/src/processor/mapper/plan_mapper.cpp index 745dd1fd04..378a76c9a0 100644 --- a/src/processor/mapper/plan_mapper.cpp +++ b/src/processor/mapper/plan_mapper.cpp @@ -138,7 +138,7 @@ unique_ptr PlanMapper::appendResultCollector( for (auto& expression : expressionsToCollect) { auto expressionName = expression->getUniqueName(); auto dataPos = mapperContext.getDataPos(expressionName); - auto isFlat = schema.getGroup(expressionName)->getIsFlat(); + auto isFlat = schema.getGroup(expressionName)->isFlat(); payloadsPosAndType.emplace_back(dataPos, expression->dataType); isPayloadFlat.push_back(isFlat); } diff --git a/src/processor/operator/cross_product.cpp b/src/processor/operator/cross_product.cpp index 11db8ecc17..f113c40d9b 100644 --- a/src/processor/operator/cross_product.cpp +++ b/src/processor/operator/cross_product.cpp @@ -5,10 +5,6 @@ namespace processor { shared_ptr CrossProduct::init(ExecutionContext* context) { resultSet = PhysicalOperator::init(context); - for (auto pos : flatDataChunkPositions) { - auto dataChunk = resultSet->dataChunks[pos]; - dataChunk->state = DataChunkState::getSingleValueDataChunkState(); - } for (auto& [pos, dataType] : outVecPosAndTypePairs) { auto vector = make_shared(dataType, context->memoryManager); resultSet->dataChunks[pos.dataChunkPos]->insert(pos.valueVectorPos, vector); diff --git a/src/processor/operator/hash_join/hash_join_probe.cpp b/src/processor/operator/hash_join/hash_join_probe.cpp index b2076b349e..036ef6f6ca 100644 --- a/src/processor/operator/hash_join/hash_join_probe.cpp +++ b/src/processor/operator/hash_join/hash_join_probe.cpp @@ -22,10 +22,6 @@ shared_ptr HashJoinProbe::init(ExecutionContext* context) { resultSet->dataChunks[probeDataInfo.markDataPos.dataChunkPos]->insert( probeDataInfo.markDataPos.valueVectorPos, markVector); } - for (auto pos : flatDataChunkPositions) { - auto dataChunk = resultSet->dataChunks[pos]; - dataChunk->state = DataChunkState::getSingleValueDataChunkState(); - } for (auto& [dataPos, dataType] : probeDataInfo.payloadsOutPosAndType) { auto probePayloadVector = make_shared(dataType, context->memoryManager); auto [dataChunkPos, valueVectorPos] = dataPos; diff --git a/src/processor/operator/index_scan.cpp b/src/processor/operator/index_scan.cpp index 3fcf5358c5..44f2481e6c 100644 --- a/src/processor/operator/index_scan.cpp +++ b/src/processor/operator/index_scan.cpp @@ -9,7 +9,6 @@ shared_ptr IndexScan::init(ExecutionContext* context) { PhysicalOperator::init(context); resultSet = populateResultSet(); auto dataChunk = resultSet->dataChunks[outDataPos.dataChunkPos]; - dataChunk->state = DataChunkState::getSingleValueDataChunkState(); outVector = make_shared(NODE_ID); dataChunk->insert(outDataPos.valueVectorPos, outVector); indexKeyEvaluator->init(*resultSet, context->memoryManager); diff --git a/src/processor/operator/table_scan/factorized_table_scan.cpp b/src/processor/operator/table_scan/factorized_table_scan.cpp index c147ffad1c..8520159cdb 100644 --- a/src/processor/operator/table_scan/factorized_table_scan.cpp +++ b/src/processor/operator/table_scan/factorized_table_scan.cpp @@ -6,9 +6,6 @@ namespace processor { shared_ptr FactorizedTableScan::init(ExecutionContext* context) { PhysicalOperator::init(context); resultSet = populateResultSet(); - for (auto& pos : flatDataChunkPositions) { - resultSet->dataChunks[pos]->state = DataChunkState::getSingleValueDataChunkState(); - } initFurther(context); return resultSet; } diff --git a/src/processor/operator/update/create.cpp b/src/processor/operator/update/create.cpp index 8c55569eca..f2530f05b3 100644 --- a/src/processor/operator/update/create.cpp +++ b/src/processor/operator/update/create.cpp @@ -11,7 +11,6 @@ shared_ptr CreateNode::init(ExecutionContext* context) { auto valueVector = make_shared(NODE_ID, context->memoryManager); outValueVectors.push_back(valueVector.get()); auto dataChunk = resultSet->dataChunks[pos.dataChunkPos]; - dataChunk->state = DataChunkState::getSingleValueDataChunkState(); dataChunk->insert(pos.valueVectorPos, valueVector); } return resultSet; diff --git a/src/processor/result/result_set_descriptor.cpp b/src/processor/result/result_set_descriptor.cpp index e8babd98b2..6e54506909 100644 --- a/src/processor/result/result_set_descriptor.cpp +++ b/src/processor/result/result_set_descriptor.cpp @@ -7,12 +7,15 @@ ResultSetDescriptor::ResultSetDescriptor(const Schema& schema) { for (auto i = 0u; i < schema.getNumGroups(); ++i) { auto group = schema.getGroup(i); auto dataChunkDescriptor = make_unique(); + if (group->isSingleState()) { + dataChunkDescriptor->setSingleState(); + } for (auto& expression : group->getExpressions()) { expressionNameToDataChunkPosMap.insert( {expression->getUniqueName(), dataChunkDescriptors.size()}); dataChunkDescriptor->addExpressionName(expression->getUniqueName()); } - dataChunkDescriptors.push_back(move(dataChunkDescriptor)); + dataChunkDescriptors.push_back(std::move(dataChunkDescriptor)); } }