From 89598fdd451161a7b93ef32853bec2b7a9971c23 Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Mon, 4 Mar 2024 21:38:48 +0800 Subject: [PATCH] clean up transaction pointer in physical operator --- src/include/processor/operator/physical_operator.h | 5 +---- .../operator/recursive_extend/recursive_join.h | 2 +- src/include/processor/operator/scan_node_id.h | 4 ++-- src/processor/operator/ddl/add_node_property.cpp | 2 +- src/processor/operator/ddl/add_rel_property.cpp | 2 +- src/processor/operator/index_scan.cpp | 3 ++- src/processor/operator/persistent/insert.cpp | 4 ++-- src/processor/operator/persistent/merge.cpp | 4 ++-- src/processor/operator/physical_operator.cpp | 1 - .../operator/recursive_extend/recursive_join.cpp | 6 +++--- .../operator/scan/scan_multi_node_tables.cpp | 7 ++++--- src/processor/operator/scan/scan_multi_rel_tables.cpp | 3 ++- src/processor/operator/scan/scan_node_table.cpp | 5 +++-- src/processor/operator/scan/scan_rel_table.cpp | 6 +++--- src/processor/operator/scan_node_id.cpp | 11 ++++++----- src/processor/operator/semi_masker.cpp | 4 ++-- 16 files changed, 35 insertions(+), 34 deletions(-) diff --git a/src/include/processor/operator/physical_operator.h b/src/include/processor/operator/physical_operator.h index 55f5e26d34..9076d65047 100644 --- a/src/include/processor/operator/physical_operator.h +++ b/src/include/processor/operator/physical_operator.h @@ -95,8 +95,7 @@ class PhysicalOperator { public: // Leaf operator PhysicalOperator(PhysicalOperatorType operatorType, uint32_t id, std::string paramsString) - : id{id}, operatorType{operatorType}, transaction{nullptr}, paramsString{ - std::move(paramsString)} {} + : id{id}, operatorType{operatorType}, paramsString{std::move(paramsString)} {} // Unary operator PhysicalOperator(PhysicalOperatorType operatorType, std::unique_ptr child, uint32_t id, const std::string& paramsString); @@ -167,8 +166,6 @@ class PhysicalOperator { PhysicalOperatorType operatorType; physical_op_vector_t children; - // TODO(Xiyang/Guodong): Remove this field, as it should be covered in ExecutionContext now. - transaction::Transaction* transaction; ResultSet* resultSet; std::string paramsString; diff --git a/src/include/processor/operator/recursive_extend/recursive_join.h b/src/include/processor/operator/recursive_extend/recursive_join.h index 479442f950..ce47117468 100644 --- a/src/include/processor/operator/recursive_extend/recursive_join.h +++ b/src/include/processor/operator/recursive_extend/recursive_join.h @@ -103,7 +103,7 @@ class RecursiveJoin : public PhysicalOperator { private: void initLocalRecursivePlan(ExecutionContext* context); - void populateTargetDstNodes(); + void populateTargetDstNodes(ExecutionContext* context); bool scanOutput(); diff --git a/src/include/processor/operator/scan_node_id.h b/src/include/processor/operator/scan_node_id.h index 6ec3edd325..4f7f405df9 100644 --- a/src/include/processor/operator/scan_node_id.h +++ b/src/include/processor/operator/scan_node_id.h @@ -77,8 +77,8 @@ class ScanNodeID : public PhysicalOperator { sharedState->initialize(context->clientContext->getTx()); } - void setSelVector( - NodeTableScanState* tableState, common::offset_t startOffset, common::offset_t endOffset); + void setSelVector(ExecutionContext* context, NodeTableScanState* tableState, + common::offset_t startOffset, common::offset_t endOffset); private: DataPos outDataPos; diff --git a/src/processor/operator/ddl/add_node_property.cpp b/src/processor/operator/ddl/add_node_property.cpp index 5131fd1b0f..d8ea5ad4bd 100644 --- a/src/processor/operator/ddl/add_node_property.cpp +++ b/src/processor/operator/ddl/add_node_property.cpp @@ -9,7 +9,7 @@ void AddNodeProperty::executeDDLInternal(ExecutionContext* context) { auto addedPropID = schema->getPropertyID(propertyName); auto addedProp = schema->getProperty(addedPropID); storageManager.getNodeTable(tableID)->addColumn( - transaction, *addedProp, getDefaultValVector(context)); + context->clientContext->getTx(), *addedProp, getDefaultValVector(context)); storageManager.getWAL()->logAddPropertyRecord(tableID, addedProp->getPropertyID()); } diff --git a/src/processor/operator/ddl/add_rel_property.cpp b/src/processor/operator/ddl/add_rel_property.cpp index 4779e712d5..f0eacecf4e 100644 --- a/src/processor/operator/ddl/add_rel_property.cpp +++ b/src/processor/operator/ddl/add_rel_property.cpp @@ -13,7 +13,7 @@ void AddRelProperty::executeDDLInternal(ExecutionContext* context) { auto addedPropertyID = tableSchema->getPropertyID(propertyName); auto addedProp = tableSchema->getProperty(addedPropertyID); storageManager.getRelTable(tableID)->addColumn( - transaction, *addedProp, getDefaultValVector(context)); + context->clientContext->getTx(), *addedProp, getDefaultValVector(context)); storageManager.getWAL()->logAddPropertyRecord(tableID, addedProp->getPropertyID()); } diff --git a/src/processor/operator/index_scan.cpp b/src/processor/operator/index_scan.cpp index 984262d61e..28e04f66c2 100644 --- a/src/processor/operator/index_scan.cpp +++ b/src/processor/operator/index_scan.cpp @@ -27,7 +27,8 @@ bool IndexScan::getNextTuplesInternal(ExecutionContext* context) { } outVector->state->selVector->getSelectedPositionsBuffer()[numSelectedValues] = pos; offset_t nodeOffset = INVALID_OFFSET; - numSelectedValues += pkIndex->lookup(transaction, indexVector, pos, nodeOffset); + numSelectedValues += + pkIndex->lookup(context->clientContext->getTx(), indexVector, pos, nodeOffset); nodeID_t nodeID{nodeOffset, tableID}; outVector->setValue(pos, nodeID); } diff --git a/src/processor/operator/persistent/insert.cpp b/src/processor/operator/persistent/insert.cpp index 7414e96dc2..f3a8bee3be 100644 --- a/src/processor/operator/persistent/insert.cpp +++ b/src/processor/operator/persistent/insert.cpp @@ -20,10 +20,10 @@ bool Insert::getNextTuplesInternal(ExecutionContext* context) { return false; } for (auto& executor : nodeExecutors) { - executor.insert(transaction, context); + executor.insert(context->clientContext->getTx(), context); } for (auto& executor : relExecutors) { - executor.insert(transaction, context); + executor.insert(context->clientContext->getTx(), context); } return true; } diff --git a/src/processor/operator/persistent/merge.cpp b/src/processor/operator/persistent/merge.cpp index f488b9b13a..5f179a5b8c 100644 --- a/src/processor/operator/persistent/merge.cpp +++ b/src/processor/operator/persistent/merge.cpp @@ -40,10 +40,10 @@ bool Merge::getNextTuplesInternal(ExecutionContext* context) { } } else { for (auto& executor : nodeInsertExecutors) { - executor.insert(transaction, context); + executor.insert(context->clientContext->getTx(), context); } for (auto& executor : relInsertExecutors) { - executor.insert(transaction, context); + executor.insert(context->clientContext->getTx(), context); } for (auto& executor : onCreateNodeSetExecutors) { executor->set(context); diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index 625c095303..ebae013256 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -175,7 +175,6 @@ void PhysicalOperator::initLocalState(ResultSet* resultSet_, ExecutionContext* c if (!isSource()) { children[0]->initLocalState(resultSet_, context); } - transaction = context->clientContext->getTx(); resultSet = resultSet_; registerProfilingMetrics(context->profiler); initLocalStateInternal(resultSet_, context); diff --git a/src/processor/operator/recursive_extend/recursive_join.cpp b/src/processor/operator/recursive_extend/recursive_join.cpp index a44af6439a..acb44bf42b 100644 --- a/src/processor/operator/recursive_extend/recursive_join.cpp +++ b/src/processor/operator/recursive_extend/recursive_join.cpp @@ -11,7 +11,7 @@ namespace kuzu { namespace processor { void RecursiveJoin::initLocalStateInternal(ResultSet* /*resultSet_*/, ExecutionContext* context) { - populateTargetDstNodes(); + populateTargetDstNodes(context); vectors = std::make_unique(); vectors->srcNodeIDVector = resultSet->getValueVector(dataInfo->srcNodePos).get(); vectors->dstNodeIDVector = resultSet->getValueVector(dataInfo->dstNodePos).get(); @@ -221,12 +221,12 @@ void RecursiveJoin::initLocalRecursivePlan(ExecutionContext* context) { recursiveRoot->initLocalState(localResultSet.get(), context); } -void RecursiveJoin::populateTargetDstNodes() { +void RecursiveJoin::populateTargetDstNodes(ExecutionContext* context) { frontier::node_id_set_t targetNodeIDs; uint64_t numTargetNodes = 0; for (auto& semiMask : sharedState->semiMasks) { auto nodeTable = semiMask->getNodeTable(); - auto numNodes = nodeTable->getMaxNodeOffset(transaction) + 1; + auto numNodes = nodeTable->getMaxNodeOffset(context->clientContext->getTx()) + 1; if (semiMask->isEnabled()) { for (auto offset = 0u; offset < numNodes; ++offset) { if (semiMask->isNodeMasked(offset)) { diff --git a/src/processor/operator/scan/scan_multi_node_tables.cpp b/src/processor/operator/scan/scan_multi_node_tables.cpp index b1c17fae57..115853de4b 100644 --- a/src/processor/operator/scan/scan_multi_node_tables.cpp +++ b/src/processor/operator/scan/scan_multi_node_tables.cpp @@ -13,9 +13,10 @@ bool ScanMultiNodeTables::getNextTuplesInternal(ExecutionContext* context) { inVector->getValue(inVector->state->selVector->selectedPositions[0]).tableID; KU_ASSERT(readStates.contains(tableID) && tables.contains(tableID)); auto scanTableInfo = tables.at(tableID).get(); - scanTableInfo->table->initializeReadState( - transaction, scanTableInfo->columnIDs, inVector, readStates[tableID].get()); - scanTableInfo->table->read(transaction, *readStates.at(tableID), inVector, outVectors); + scanTableInfo->table->initializeReadState(context->clientContext->getTx(), + scanTableInfo->columnIDs, inVector, readStates[tableID].get()); + scanTableInfo->table->read( + context->clientContext->getTx(), *readStates.at(tableID), inVector, outVectors); return true; } diff --git a/src/processor/operator/scan/scan_multi_rel_tables.cpp b/src/processor/operator/scan/scan_multi_rel_tables.cpp index feb59ce0ec..ac2b3e0448 100644 --- a/src/processor/operator/scan/scan_multi_rel_tables.cpp +++ b/src/processor/operator/scan/scan_multi_rel_tables.cpp @@ -56,7 +56,8 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo bool ScanMultiRelTable::getNextTuplesInternal(ExecutionContext* context) { while (true) { - if (currentScanner != nullptr && currentScanner->scan(inVector, outVectors, transaction)) { + if (currentScanner != nullptr && + currentScanner->scan(inVector, outVectors, context->clientContext->getTx())) { metrics->numOutputTuple.increase(outVectors[0]->state->selVector->selectedSize); return true; } diff --git a/src/processor/operator/scan/scan_node_table.cpp b/src/processor/operator/scan/scan_node_table.cpp index c0223aebda..747dc856e8 100644 --- a/src/processor/operator/scan/scan_node_table.cpp +++ b/src/processor/operator/scan/scan_node_table.cpp @@ -12,8 +12,9 @@ bool ScanSingleNodeTable::getNextTuplesInternal(ExecutionContext* context) { for (auto& outputVector : outVectors) { outputVector->resetAuxiliaryBuffer(); } - info->table->initializeReadState(transaction, info->columnIDs, inVector, readState.get()); - info->table->read(transaction, *readState, inVector, outVectors); + info->table->initializeReadState( + context->clientContext->getTx(), info->columnIDs, inVector, readState.get()); + info->table->read(context->clientContext->getTx(), *readState, inVector, outVectors); return true; } diff --git a/src/processor/operator/scan/scan_rel_table.cpp b/src/processor/operator/scan/scan_rel_table.cpp index 2a836fb82d..426baa0cbb 100644 --- a/src/processor/operator/scan/scan_rel_table.cpp +++ b/src/processor/operator/scan/scan_rel_table.cpp @@ -6,14 +6,14 @@ namespace processor { bool ScanRelTable::getNextTuplesInternal(ExecutionContext* context) { while (true) { if (scanState->hasMoreToRead(context->clientContext->getTx())) { - info->table->read(transaction, *scanState, inVector, outVectors); + info->table->read(context->clientContext->getTx(), *scanState, inVector, outVectors); return true; } if (!children[0]->getNextTuple(context)) { return false; } - info->table->initializeReadState( - transaction, info->direction, info->columnIDs, inVector, scanState.get()); + info->table->initializeReadState(context->clientContext->getTx(), info->direction, + info->columnIDs, inVector, scanState.get()); } } diff --git a/src/processor/operator/scan_node_id.cpp b/src/processor/operator/scan_node_id.cpp index ef0ea2756f..698a2c1f4f 100644 --- a/src/processor/operator/scan_node_id.cpp +++ b/src/processor/operator/scan_node_id.cpp @@ -57,7 +57,7 @@ void ScanNodeID::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* outValueVector->setSequential(); } -bool ScanNodeID::getNextTuplesInternal(ExecutionContext* /*context*/) { +bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) { do { auto [state, startOffset, endOffset] = sharedState->getNextRangeToRead(); if (state == nullptr) { @@ -70,14 +70,14 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* /*context*/) { nodeIDValues[i].tableID = state->getTable()->getTableID(); } outValueVector->state->initOriginalAndSelectedSize(size); - setSelVector(state, startOffset, endOffset); + setSelVector(context, state, startOffset, endOffset); } while (outValueVector->state->selVector->selectedSize == 0); metrics->numOutputTuple.increase(outValueVector->state->selVector->selectedSize); return true; } -void ScanNodeID::setSelVector( - NodeTableScanState* tableState, offset_t startOffset, offset_t endOffset) { +void ScanNodeID::setSelVector(ExecutionContext* context, NodeTableScanState* tableState, + offset_t startOffset, offset_t endOffset) { if (tableState->isSemiMaskEnabled()) { outValueVector->state->selVector->resetSelectorToValuePosBuffer(); // Fill selected positions based on node mask for nodes between the given startOffset and @@ -94,7 +94,8 @@ void ScanNodeID::setSelVector( outValueVector->state->selVector->resetSelectorToUnselected(); } // Apply changes to the selVector from nodes metadata. - tableState->getTable()->setSelVectorForDeletedOffsets(transaction, outValueVector); + tableState->getTable()->setSelVectorForDeletedOffsets( + context->clientContext->getTx(), outValueVector); } } // namespace processor diff --git a/src/processor/operator/semi_masker.cpp b/src/processor/operator/semi_masker.cpp index 8b217f5e55..418ebd7bf9 100644 --- a/src/processor/operator/semi_masker.cpp +++ b/src/processor/operator/semi_masker.cpp @@ -16,11 +16,11 @@ void BaseSemiMasker::initGlobalStateInternal(ExecutionContext* /*context*/) { } } -void BaseSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) { +void BaseSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { keyVector = resultSet->getValueVector(info->keyPos).get(); for (auto& [table, masks] : info->masksPerTable) { for (auto& maskWithIdx : masks) { - maskWithIdx.first->init(transaction); + maskWithIdx.first->init(context->clientContext->getTx()); } } }