Skip to content

Commit

Permalink
clean up transaction pointer in physical operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 4, 2024
1 parent 0d84c73 commit 89598fd
Show file tree
Hide file tree
Showing 16 changed files with 35 additions and 34 deletions.
5 changes: 1 addition & 4 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ class PhysicalOperator {
public:
// Leaf operator
PhysicalOperator(PhysicalOperatorType operatorType, uint32_t id, std::string paramsString)
: id{id}, operatorType{operatorType}, transaction{nullptr}, paramsString{
std::move(paramsString)} {}
: id{id}, operatorType{operatorType}, paramsString{std::move(paramsString)} {}
// Unary operator
PhysicalOperator(PhysicalOperatorType operatorType, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString);
Expand Down Expand Up @@ -167,8 +166,6 @@ class PhysicalOperator {
PhysicalOperatorType operatorType;

physical_op_vector_t children;
// TODO(Xiyang/Guodong): Remove this field, as it should be covered in ExecutionContext now.
transaction::Transaction* transaction;
ResultSet* resultSet;

std::string paramsString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class RecursiveJoin : public PhysicalOperator {
private:
void initLocalRecursivePlan(ExecutionContext* context);

void populateTargetDstNodes();
void populateTargetDstNodes(ExecutionContext* context);

bool scanOutput();

Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/scan_node_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class ScanNodeID : public PhysicalOperator {
sharedState->initialize(context->clientContext->getTx());
}

void setSelVector(
NodeTableScanState* tableState, common::offset_t startOffset, common::offset_t endOffset);
void setSelVector(ExecutionContext* context, NodeTableScanState* tableState,
common::offset_t startOffset, common::offset_t endOffset);

private:
DataPos outDataPos;
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/ddl/add_node_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void AddNodeProperty::executeDDLInternal(ExecutionContext* context) {
auto addedPropID = schema->getPropertyID(propertyName);
auto addedProp = schema->getProperty(addedPropID);
storageManager.getNodeTable(tableID)->addColumn(
transaction, *addedProp, getDefaultValVector(context));
context->clientContext->getTx(), *addedProp, getDefaultValVector(context));
storageManager.getWAL()->logAddPropertyRecord(tableID, addedProp->getPropertyID());
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/ddl/add_rel_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void AddRelProperty::executeDDLInternal(ExecutionContext* context) {
auto addedPropertyID = tableSchema->getPropertyID(propertyName);
auto addedProp = tableSchema->getProperty(addedPropertyID);
storageManager.getRelTable(tableID)->addColumn(
transaction, *addedProp, getDefaultValVector(context));
context->clientContext->getTx(), *addedProp, getDefaultValVector(context));
storageManager.getWAL()->logAddPropertyRecord(tableID, addedProp->getPropertyID());
}

Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ bool IndexScan::getNextTuplesInternal(ExecutionContext* context) {
}
outVector->state->selVector->getSelectedPositionsBuffer()[numSelectedValues] = pos;
offset_t nodeOffset = INVALID_OFFSET;
numSelectedValues += pkIndex->lookup(transaction, indexVector, pos, nodeOffset);
numSelectedValues +=
pkIndex->lookup(context->clientContext->getTx(), indexVector, pos, nodeOffset);
nodeID_t nodeID{nodeOffset, tableID};
outVector->setValue<nodeID_t>(pos, nodeID);
}
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ bool Insert::getNextTuplesInternal(ExecutionContext* context) {
return false;
}
for (auto& executor : nodeExecutors) {
executor.insert(transaction, context);
executor.insert(context->clientContext->getTx(), context);
}
for (auto& executor : relExecutors) {
executor.insert(transaction, context);
executor.insert(context->clientContext->getTx(), context);
}
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ bool Merge::getNextTuplesInternal(ExecutionContext* context) {
}
} else {
for (auto& executor : nodeInsertExecutors) {
executor.insert(transaction, context);
executor.insert(context->clientContext->getTx(), context);
}
for (auto& executor : relInsertExecutors) {
executor.insert(transaction, context);
executor.insert(context->clientContext->getTx(), context);
}
for (auto& executor : onCreateNodeSetExecutors) {
executor->set(context);
Expand Down
1 change: 0 additions & 1 deletion src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ void PhysicalOperator::initLocalState(ResultSet* resultSet_, ExecutionContext* c
if (!isSource()) {
children[0]->initLocalState(resultSet_, context);
}
transaction = context->clientContext->getTx();
resultSet = resultSet_;
registerProfilingMetrics(context->profiler);
initLocalStateInternal(resultSet_, context);
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/recursive_extend/recursive_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace kuzu {
namespace processor {

void RecursiveJoin::initLocalStateInternal(ResultSet* /*resultSet_*/, ExecutionContext* context) {
populateTargetDstNodes();
populateTargetDstNodes(context);
vectors = std::make_unique<RecursiveJoinVectors>();
vectors->srcNodeIDVector = resultSet->getValueVector(dataInfo->srcNodePos).get();
vectors->dstNodeIDVector = resultSet->getValueVector(dataInfo->dstNodePos).get();
Expand Down Expand Up @@ -221,12 +221,12 @@ void RecursiveJoin::initLocalRecursivePlan(ExecutionContext* context) {
recursiveRoot->initLocalState(localResultSet.get(), context);
}

void RecursiveJoin::populateTargetDstNodes() {
void RecursiveJoin::populateTargetDstNodes(ExecutionContext* context) {
frontier::node_id_set_t targetNodeIDs;
uint64_t numTargetNodes = 0;
for (auto& semiMask : sharedState->semiMasks) {
auto nodeTable = semiMask->getNodeTable();
auto numNodes = nodeTable->getMaxNodeOffset(transaction) + 1;
auto numNodes = nodeTable->getMaxNodeOffset(context->clientContext->getTx()) + 1;
if (semiMask->isEnabled()) {
for (auto offset = 0u; offset < numNodes; ++offset) {
if (semiMask->isNodeMasked(offset)) {
Expand Down
7 changes: 4 additions & 3 deletions src/processor/operator/scan/scan_multi_node_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ bool ScanMultiNodeTables::getNextTuplesInternal(ExecutionContext* context) {
inVector->getValue<nodeID_t>(inVector->state->selVector->selectedPositions[0]).tableID;
KU_ASSERT(readStates.contains(tableID) && tables.contains(tableID));
auto scanTableInfo = tables.at(tableID).get();
scanTableInfo->table->initializeReadState(
transaction, scanTableInfo->columnIDs, inVector, readStates[tableID].get());
scanTableInfo->table->read(transaction, *readStates.at(tableID), inVector, outVectors);
scanTableInfo->table->initializeReadState(context->clientContext->getTx(),
scanTableInfo->columnIDs, inVector, readStates[tableID].get());
scanTableInfo->table->read(
context->clientContext->getTx(), *readStates.at(tableID), inVector, outVectors);
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/scan/scan_multi_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo

bool ScanMultiRelTable::getNextTuplesInternal(ExecutionContext* context) {
while (true) {
if (currentScanner != nullptr && currentScanner->scan(inVector, outVectors, transaction)) {
if (currentScanner != nullptr &&
currentScanner->scan(inVector, outVectors, context->clientContext->getTx())) {
metrics->numOutputTuple.increase(outVectors[0]->state->selVector->selectedSize);
return true;
}
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/scan/scan_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ bool ScanSingleNodeTable::getNextTuplesInternal(ExecutionContext* context) {
for (auto& outputVector : outVectors) {
outputVector->resetAuxiliaryBuffer();
}
info->table->initializeReadState(transaction, info->columnIDs, inVector, readState.get());
info->table->read(transaction, *readState, inVector, outVectors);
info->table->initializeReadState(
context->clientContext->getTx(), info->columnIDs, inVector, readState.get());
info->table->read(context->clientContext->getTx(), *readState, inVector, outVectors);
return true;
}

Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/scan/scan_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ namespace processor {
bool ScanRelTable::getNextTuplesInternal(ExecutionContext* context) {
while (true) {
if (scanState->hasMoreToRead(context->clientContext->getTx())) {
info->table->read(transaction, *scanState, inVector, outVectors);
info->table->read(context->clientContext->getTx(), *scanState, inVector, outVectors);
return true;
}
if (!children[0]->getNextTuple(context)) {
return false;
}
info->table->initializeReadState(
transaction, info->direction, info->columnIDs, inVector, scanState.get());
info->table->initializeReadState(context->clientContext->getTx(), info->direction,
info->columnIDs, inVector, scanState.get());
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/processor/operator/scan_node_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void ScanNodeID::initLocalStateInternal(ResultSet* resultSet, ExecutionContext*
outValueVector->setSequential();
}

bool ScanNodeID::getNextTuplesInternal(ExecutionContext* /*context*/) {
bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) {
do {
auto [state, startOffset, endOffset] = sharedState->getNextRangeToRead();
if (state == nullptr) {
Expand All @@ -70,14 +70,14 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* /*context*/) {
nodeIDValues[i].tableID = state->getTable()->getTableID();
}
outValueVector->state->initOriginalAndSelectedSize(size);
setSelVector(state, startOffset, endOffset);
setSelVector(context, state, startOffset, endOffset);
} while (outValueVector->state->selVector->selectedSize == 0);
metrics->numOutputTuple.increase(outValueVector->state->selVector->selectedSize);
return true;
}

void ScanNodeID::setSelVector(
NodeTableScanState* tableState, offset_t startOffset, offset_t endOffset) {
void ScanNodeID::setSelVector(ExecutionContext* context, NodeTableScanState* tableState,
offset_t startOffset, offset_t endOffset) {
if (tableState->isSemiMaskEnabled()) {
outValueVector->state->selVector->resetSelectorToValuePosBuffer();
// Fill selected positions based on node mask for nodes between the given startOffset and
Expand All @@ -94,7 +94,8 @@ void ScanNodeID::setSelVector(
outValueVector->state->selVector->resetSelectorToUnselected();
}
// Apply changes to the selVector from nodes metadata.
tableState->getTable()->setSelVectorForDeletedOffsets(transaction, outValueVector);
tableState->getTable()->setSelVectorForDeletedOffsets(
context->clientContext->getTx(), outValueVector);
}

} // namespace processor
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/semi_masker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ void BaseSemiMasker::initGlobalStateInternal(ExecutionContext* /*context*/) {
}
}

void BaseSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) {
void BaseSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
keyVector = resultSet->getValueVector(info->keyPos).get();
for (auto& [table, masks] : info->masksPerTable) {
for (auto& maskWithIdx : masks) {
maskWithIdx.first->init(transaction);
maskWithIdx.first->init(context->clientContext->getTx());
}
}
}
Expand Down

0 comments on commit 89598fd

Please sign in to comment.