diff --git a/src/include/processor/operator/generic_extend.h b/src/include/processor/operator/generic_extend.h index 25aa81d8339..43b42388680 100644 --- a/src/include/processor/operator/generic_extend.h +++ b/src/include/processor/operator/generic_extend.h @@ -28,7 +28,7 @@ class AdjAndPropertyCollection { void populateListHandles(ListSyncState& syncState); - void initState(node_offset_t nodeOffset); + void resetState(node_offset_t nodeOffset); bool scan(const shared_ptr& inVector, const shared_ptr& outNodeVector, const vector>& outPropertyVectors, Transaction* transaction); @@ -45,6 +45,7 @@ class AdjAndPropertyCollection { inline bool hasColumnToScan() const { return nextColumnIdx < adjCollection->columns.size(); } inline bool hasListToScan() const { return nextListIdx < adjCollection->lists.size(); } + bool scanColumn(uint32_t idx, const shared_ptr& inVector, const shared_ptr& outNodeVector, const vector>& outPropertyVectors, Transaction* transaction); @@ -97,22 +98,8 @@ class GenericExtend : public PhysicalOperator { } private: - bool scanCurrentAdjAndPropertyCollection() { - if (currentAdjAndPropertyCollection == nullptr) { - return false; - } - return currentAdjAndPropertyCollection->scan( - inVector, outNodeVector, outPropertyVectors, transaction); - } - void initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID) { - if (adjAndPropertyCollectionPerNodeTable.contains(nodeID.tableID)) { - currentAdjAndPropertyCollection = - adjAndPropertyCollectionPerNodeTable.at(nodeID.tableID).get(); - currentAdjAndPropertyCollection->initState(nodeID.offset); - } else { - currentAdjAndPropertyCollection = nullptr; - } - } + bool scanCurrentAdjAndPropertyCollection(); + void initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID); private: // vector positions diff --git a/src/processor/mapper/map_extend.cpp b/src/processor/mapper/map_extend.cpp index ea169176805..1893fb9fe40 100644 --- a/src/processor/mapper/map_extend.cpp +++ b/src/processor/mapper/map_extend.cpp @@ -48,6 +48,48 @@ unique_ptr PlanMapper::mapLogicalExtendToPhysical( } } +static unique_ptr populateAdjCollection(table_id_t boundNodeTableID, + const RelExpression& rel, RelDirection direction, const RelsStore& relsStore) { + vector adjColumns; + vector adjLists; + for (auto relTableID : rel.getTableIDs()) { + if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) { + adjColumns.push_back(relsStore.getAdjColumn(direction, boundNodeTableID, relTableID)); + } + if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) { + adjLists.push_back(relsStore.getAdjLists(direction, boundNodeTableID, relTableID)); + } + } + return make_unique(std::move(adjColumns), std::move(adjLists)); +} + +static unique_ptr populatePropertyCollection(table_id_t boundNodeTableID, + const RelExpression& rel, RelDirection direction, const PropertyExpression& propertyExpression, + const RelsStore& relsStore) { + vector propertyColumns; + vector propertyLists; + for (auto relTableID : rel.getTableIDs()) { + if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) { + Column* propertyColumn = nullptr; + if (propertyExpression.hasPropertyID(relTableID)) { + propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID, + boundNodeTableID, propertyExpression.getPropertyID(relTableID)); + } + propertyColumns.push_back(propertyColumn); + } + if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) { + Lists* propertyList = nullptr; + if (propertyExpression.hasPropertyID(relTableID)) { + propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID, + relTableID, propertyExpression.getPropertyID(relTableID)); + } + propertyLists.push_back(propertyList); + } + } + return make_unique( + std::move(propertyColumns), std::move(propertyLists)); +} + unique_ptr PlanMapper::mapLogicalGenericExtendToPhysical( LogicalOperator* logicalOperator, MapperContext& mapperContext) { auto extend = (LogicalGenericExtend*)logicalOperator; @@ -68,48 +110,12 @@ unique_ptr PlanMapper::mapLogicalGenericExtendToPhysical( unordered_map> adjAndPropertyCollectionPerNodeTable; for (auto boundNodeTableID : boundNode->getTableIDs()) { - vector adjColumns; - vector adjLists; - for (auto relTableID : rel->getTableIDs()) { - if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) { - adjColumns.push_back( - relsStore.getAdjColumn(direction, boundNodeTableID, relTableID)); - } - if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) { - adjLists.push_back(relsStore.getAdjLists(direction, boundNodeTableID, relTableID)); - } - } - auto adjCollection = - make_unique(std::move(adjColumns), std::move(adjLists)); + auto adjCollection = populateAdjCollection(boundNodeTableID, *rel, direction, relsStore); vector> propertyCollections; for (auto& expression : extend->getProperties()) { auto propertyExpression = (PropertyExpression*)expression.get(); - vector propertyColumns; - vector propertyLists; - for (auto relTableID : rel->getTableIDs()) { - if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) { - Column* propertyColumn; - if (!propertyExpression->hasPropertyID(relTableID)) { - propertyColumn = nullptr; - } else { - propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID, - boundNodeTableID, propertyExpression->getPropertyID(relTableID)); - } - propertyColumns.push_back(propertyColumn); - } - if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) { - Lists* propertyList; - if (!propertyExpression->hasPropertyID(relTableID)) { - propertyList = nullptr; - } else { - propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID, - relTableID, propertyExpression->getPropertyID(relTableID)); - } - propertyLists.push_back(propertyList); - } - } - propertyCollections.push_back(make_unique( - std::move(propertyColumns), std::move(propertyLists))); + propertyCollections.push_back(populatePropertyCollection( + boundNodeTableID, *rel, direction, *propertyExpression, relsStore)); } adjAndPropertyCollectionPerNodeTable.insert( {boundNodeTableID, make_unique( diff --git a/src/processor/mapper/map_scan_rel_property.cpp b/src/processor/mapper/map_scan_rel_property.cpp index 65939184572..16fc8863010 100644 --- a/src/processor/mapper/map_scan_rel_property.cpp +++ b/src/processor/mapper/map_scan_rel_property.cpp @@ -11,7 +11,8 @@ unique_ptr PlanMapper::mapLogicalScanRelPropertyToPhysical( auto scanRelProperty = (LogicalScanRelProperty*)logicalOperator; auto boundNode = scanRelProperty->getBoundNode(); auto rel = scanRelProperty->getRel(); - assert(rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1); + assert( + !rel->isVariableLength() && rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1); auto relID = rel->getTableID(); auto direction = scanRelProperty->getDirection(); auto propertyExpression = (PropertyExpression*)scanRelProperty->getProperty().get(); diff --git a/src/processor/operator/generic_extend.cpp b/src/processor/operator/generic_extend.cpp index 1ba1d86f8ff..07250db8fd4 100644 --- a/src/processor/operator/generic_extend.cpp +++ b/src/processor/operator/generic_extend.cpp @@ -18,53 +18,7 @@ void AdjAndPropertyCollection::populateListHandles(ListSyncState& syncState) { } } -unique_ptr AdjAndPropertyCollection::clone() const { - auto clonedAdjCollection = - make_unique(adjCollection->columns, adjCollection->lists); - vector> clonedPropertyCollections; - for (auto& propertyCollection : propertyCollections) { - clonedPropertyCollections.push_back(make_unique( - propertyCollection->columns, propertyCollection->lists)); - } - return make_unique( - std::move(clonedAdjCollection), std::move(clonedPropertyCollections)); -} - -void GenericExtend::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - inVector = resultSet->getValueVector(inVectorPos); - listSyncState = make_unique(); - outNodeVector = resultSet->getValueVector(outNodeVectorPos); - for (auto& dataPos : outPropertyVectorsPos) { - auto vector = resultSet->getValueVector(dataPos); - outPropertyVectors.push_back(vector); - } - for (auto& [_, adjAndPropertyCollection] : adjAndPropertyCollectionPerNodeTable) { - adjAndPropertyCollection->populateListHandles(*listSyncState); - } - // config local state - currentAdjAndPropertyCollection = nullptr; -} - -bool GenericExtend::getNextTuplesInternal() { - while (true) { - if (scanCurrentAdjAndPropertyCollection()) { - metrics->numOutputTuple.increase(outNodeVector->state->selVector->selectedSize); - return true; - } - if (!children[0]->getNextTuple()) { - return false; - } - auto currentIdx = inVector->state->selVector->selectedPositions[0]; - if (inVector->isNull(currentIdx)) { - outNodeVector->state->selVector->selectedSize = 0; - continue; - } - auto nodeID = inVector->getValue(currentIdx); - initCurrentAdjAndPropertyCollection(nodeID); - } -} - -void AdjAndPropertyCollection::initState(node_offset_t nodeOffset) { +void AdjAndPropertyCollection::resetState(node_offset_t nodeOffset) { nextColumnIdx = 0; nextListIdx = 0; currentNodeOffset = nodeOffset; @@ -99,11 +53,13 @@ bool AdjAndPropertyCollection::scanColumns(const shared_ptr& inVect bool AdjAndPropertyCollection::scanLists(const shared_ptr& inVector, const shared_ptr& outNodeVector, const vector>& outPropertyVectors, Transaction* transaction) { - if (currentListIdx != UINT32_MAX) { // check current list // TODO: wrap + if (currentListIdx != UINT32_MAX) { // check current list auto currentAdjList = adjCollection->lists[currentListIdx]; auto currentAdjListHandle = adjCollection->listHandles[currentListIdx].get(); if (currentAdjListHandle->listSyncState.hasMoreToRead()) { + // scan current adjList currentAdjList->readValues(outNodeVector, *currentAdjListHandle); + // scan current propertyLists for (auto i = 0u; i < propertyCollections.size(); ++i) { auto currentPropertyList = propertyCollections[i]->lists[currentListIdx]; auto currentPropertyListHandle = @@ -117,6 +73,7 @@ bool AdjAndPropertyCollection::scanLists(const shared_ptr& inVector } return true; } else { + // no more to scan on current list, move to next list. nextListIdx++; currentListIdx = UINT32_MAX; } @@ -191,5 +148,69 @@ bool AdjAndPropertyCollection::scanList(uint32_t idx, const shared_ptrselectedSize != 0; } +unique_ptr AdjAndPropertyCollection::clone() const { + auto clonedAdjCollection = + make_unique(adjCollection->columns, adjCollection->lists); + vector> clonedPropertyCollections; + for (auto& propertyCollection : propertyCollections) { + clonedPropertyCollections.push_back(make_unique( + propertyCollection->columns, propertyCollection->lists)); + } + return make_unique( + std::move(clonedAdjCollection), std::move(clonedPropertyCollections)); +} + +void GenericExtend::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { + inVector = resultSet->getValueVector(inVectorPos); + listSyncState = make_unique(); + outNodeVector = resultSet->getValueVector(outNodeVectorPos); + for (auto& dataPos : outPropertyVectorsPos) { + auto vector = resultSet->getValueVector(dataPos); + outPropertyVectors.push_back(vector); + } + for (auto& [_, adjAndPropertyCollection] : adjAndPropertyCollectionPerNodeTable) { + adjAndPropertyCollection->populateListHandles(*listSyncState); + } + // config local state + currentAdjAndPropertyCollection = nullptr; +} + +bool GenericExtend::getNextTuplesInternal() { + while (true) { + if (scanCurrentAdjAndPropertyCollection()) { + metrics->numOutputTuple.increase(outNodeVector->state->selVector->selectedSize); + return true; + } + if (!children[0]->getNextTuple()) { + return false; + } + auto currentIdx = inVector->state->selVector->selectedPositions[0]; + if (inVector->isNull(currentIdx)) { + outNodeVector->state->selVector->selectedSize = 0; + continue; + } + auto nodeID = inVector->getValue(currentIdx); + initCurrentAdjAndPropertyCollection(nodeID); + } +} + +bool GenericExtend::scanCurrentAdjAndPropertyCollection() { + if (currentAdjAndPropertyCollection == nullptr) { + return false; + } + return currentAdjAndPropertyCollection->scan( + inVector, outNodeVector, outPropertyVectors, transaction); +} + +void GenericExtend::initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID) { + if (adjAndPropertyCollectionPerNodeTable.contains(nodeID.tableID)) { + currentAdjAndPropertyCollection = + adjAndPropertyCollectionPerNodeTable.at(nodeID.tableID).get(); + currentAdjAndPropertyCollection->resetState(nodeID.offset); + } else { + currentAdjAndPropertyCollection = nullptr; + } +} + } // namespace processor } // namespace kuzu