Skip to content

Commit

Permalink
X
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Dec 8, 2022
1 parent 303d281 commit 784f5b6
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 105 deletions.
21 changes: 4 additions & 17 deletions src/include/processor/operator/generic_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class AdjAndPropertyCollection {

void populateListHandles(ListSyncState& syncState);

void initState(node_offset_t nodeOffset);
void resetState(node_offset_t nodeOffset);

bool scan(const shared_ptr<ValueVector>& inVector, const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);
Expand All @@ -45,6 +45,7 @@ class AdjAndPropertyCollection {

inline bool hasColumnToScan() const { return nextColumnIdx < adjCollection->columns.size(); }
inline bool hasListToScan() const { return nextListIdx < adjCollection->lists.size(); }

bool scanColumn(uint32_t idx, const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);
Expand Down Expand Up @@ -97,22 +98,8 @@ class GenericExtend : public PhysicalOperator {
}

private:
bool scanCurrentAdjAndPropertyCollection() {
if (currentAdjAndPropertyCollection == nullptr) {
return false;
}
return currentAdjAndPropertyCollection->scan(
inVector, outNodeVector, outPropertyVectors, transaction);
}
void initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID) {
if (adjAndPropertyCollectionPerNodeTable.contains(nodeID.tableID)) {
currentAdjAndPropertyCollection =
adjAndPropertyCollectionPerNodeTable.at(nodeID.tableID).get();
currentAdjAndPropertyCollection->initState(nodeID.offset);
} else {
currentAdjAndPropertyCollection = nullptr;
}
}
bool scanCurrentAdjAndPropertyCollection();
void initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID);

private:
// vector positions
Expand Down
84 changes: 45 additions & 39 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,48 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
}
}

static unique_ptr<ColumnAndListCollection> populateAdjCollection(table_id_t boundNodeTableID,
const RelExpression& rel, RelDirection direction, const RelsStore& relsStore) {
vector<Column*> adjColumns;
vector<Lists*> adjLists;
for (auto relTableID : rel.getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
adjColumns.push_back(relsStore.getAdjColumn(direction, boundNodeTableID, relTableID));
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
adjLists.push_back(relsStore.getAdjLists(direction, boundNodeTableID, relTableID));
}
}
return make_unique<ColumnAndListCollection>(std::move(adjColumns), std::move(adjLists));
}

static unique_ptr<ColumnAndListCollection> populatePropertyCollection(table_id_t boundNodeTableID,
const RelExpression& rel, RelDirection direction, const PropertyExpression& propertyExpression,
const RelsStore& relsStore) {
vector<Column*> propertyColumns;
vector<Lists*> propertyLists;
for (auto relTableID : rel.getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
Column* propertyColumn = nullptr;
if (propertyExpression.hasPropertyID(relTableID)) {
propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID,
boundNodeTableID, propertyExpression.getPropertyID(relTableID));
}
propertyColumns.push_back(propertyColumn);
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
Lists* propertyList = nullptr;
if (propertyExpression.hasPropertyID(relTableID)) {
propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID,
relTableID, propertyExpression.getPropertyID(relTableID));
}
propertyLists.push_back(propertyList);
}
}
return make_unique<ColumnAndListCollection>(
std::move(propertyColumns), std::move(propertyLists));
}

unique_ptr<PhysicalOperator> PlanMapper::mapLogicalGenericExtendToPhysical(
LogicalOperator* logicalOperator, MapperContext& mapperContext) {
auto extend = (LogicalGenericExtend*)logicalOperator;
Expand All @@ -68,48 +110,12 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalGenericExtendToPhysical(
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable;
for (auto boundNodeTableID : boundNode->getTableIDs()) {
vector<Column*> adjColumns;
vector<Lists*> adjLists;
for (auto relTableID : rel->getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
adjColumns.push_back(
relsStore.getAdjColumn(direction, boundNodeTableID, relTableID));
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
adjLists.push_back(relsStore.getAdjLists(direction, boundNodeTableID, relTableID));
}
}
auto adjCollection =
make_unique<ColumnAndListCollection>(std::move(adjColumns), std::move(adjLists));
auto adjCollection = populateAdjCollection(boundNodeTableID, *rel, direction, relsStore);
vector<unique_ptr<ColumnAndListCollection>> propertyCollections;
for (auto& expression : extend->getProperties()) {
auto propertyExpression = (PropertyExpression*)expression.get();
vector<Column*> propertyColumns;
vector<Lists*> propertyLists;
for (auto relTableID : rel->getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
Column* propertyColumn;
if (!propertyExpression->hasPropertyID(relTableID)) {
propertyColumn = nullptr;
} else {
propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID,
boundNodeTableID, propertyExpression->getPropertyID(relTableID));
}
propertyColumns.push_back(propertyColumn);
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
Lists* propertyList;
if (!propertyExpression->hasPropertyID(relTableID)) {
propertyList = nullptr;
} else {
propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID,
relTableID, propertyExpression->getPropertyID(relTableID));
}
propertyLists.push_back(propertyList);
}
}
propertyCollections.push_back(make_unique<ColumnAndListCollection>(
std::move(propertyColumns), std::move(propertyLists)));
propertyCollections.push_back(populatePropertyCollection(
boundNodeTableID, *rel, direction, *propertyExpression, relsStore));
}
adjAndPropertyCollectionPerNodeTable.insert(
{boundNodeTableID, make_unique<AdjAndPropertyCollection>(
Expand Down
3 changes: 2 additions & 1 deletion src/processor/mapper/map_scan_rel_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalScanRelPropertyToPhysical(
auto scanRelProperty = (LogicalScanRelProperty*)logicalOperator;
auto boundNode = scanRelProperty->getBoundNode();
auto rel = scanRelProperty->getRel();
assert(rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
assert(
!rel->isVariableLength() && rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto relID = rel->getTableID();
auto direction = scanRelProperty->getDirection();
auto propertyExpression = (PropertyExpression*)scanRelProperty->getProperty().get();
Expand Down
117 changes: 69 additions & 48 deletions src/processor/operator/generic_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,7 @@ void AdjAndPropertyCollection::populateListHandles(ListSyncState& syncState) {
}
}

unique_ptr<AdjAndPropertyCollection> AdjAndPropertyCollection::clone() const {
auto clonedAdjCollection =
make_unique<ColumnAndListCollection>(adjCollection->columns, adjCollection->lists);
vector<unique_ptr<ColumnAndListCollection>> clonedPropertyCollections;
for (auto& propertyCollection : propertyCollections) {
clonedPropertyCollections.push_back(make_unique<ColumnAndListCollection>(
propertyCollection->columns, propertyCollection->lists));
}
return make_unique<AdjAndPropertyCollection>(
std::move(clonedAdjCollection), std::move(clonedPropertyCollections));
}

void GenericExtend::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
inVector = resultSet->getValueVector(inVectorPos);
listSyncState = make_unique<ListSyncState>();
outNodeVector = resultSet->getValueVector(outNodeVectorPos);
for (auto& dataPos : outPropertyVectorsPos) {
auto vector = resultSet->getValueVector(dataPos);
outPropertyVectors.push_back(vector);
}
for (auto& [_, adjAndPropertyCollection] : adjAndPropertyCollectionPerNodeTable) {
adjAndPropertyCollection->populateListHandles(*listSyncState);
}
// config local state
currentAdjAndPropertyCollection = nullptr;
}

bool GenericExtend::getNextTuplesInternal() {
while (true) {
if (scanCurrentAdjAndPropertyCollection()) {
metrics->numOutputTuple.increase(outNodeVector->state->selVector->selectedSize);
return true;
}
if (!children[0]->getNextTuple()) {
return false;
}
auto currentIdx = inVector->state->selVector->selectedPositions[0];
if (inVector->isNull(currentIdx)) {
outNodeVector->state->selVector->selectedSize = 0;
continue;
}
auto nodeID = inVector->getValue<nodeID_t>(currentIdx);
initCurrentAdjAndPropertyCollection(nodeID);
}
}

void AdjAndPropertyCollection::initState(node_offset_t nodeOffset) {
void AdjAndPropertyCollection::resetState(node_offset_t nodeOffset) {
nextColumnIdx = 0;
nextListIdx = 0;
currentNodeOffset = nodeOffset;
Expand Down Expand Up @@ -99,11 +53,13 @@ bool AdjAndPropertyCollection::scanColumns(const shared_ptr<ValueVector>& inVect
bool AdjAndPropertyCollection::scanLists(const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction) {
if (currentListIdx != UINT32_MAX) { // check current list // TODO: wrap
if (currentListIdx != UINT32_MAX) { // check current list
auto currentAdjList = adjCollection->lists[currentListIdx];
auto currentAdjListHandle = adjCollection->listHandles[currentListIdx].get();
if (currentAdjListHandle->listSyncState.hasMoreToRead()) {
// scan current adjList
currentAdjList->readValues(outNodeVector, *currentAdjListHandle);
// scan current propertyLists
for (auto i = 0u; i < propertyCollections.size(); ++i) {
auto currentPropertyList = propertyCollections[i]->lists[currentListIdx];
auto currentPropertyListHandle =
Expand All @@ -117,6 +73,7 @@ bool AdjAndPropertyCollection::scanLists(const shared_ptr<ValueVector>& inVector
}
return true;
} else {
// no more to scan on current list, move to next list.
nextListIdx++;
currentListIdx = UINT32_MAX;
}
Expand Down Expand Up @@ -191,5 +148,69 @@ bool AdjAndPropertyCollection::scanList(uint32_t idx, const shared_ptr<ValueVect
return selVector->selectedSize != 0;
}

unique_ptr<AdjAndPropertyCollection> AdjAndPropertyCollection::clone() const {
auto clonedAdjCollection =
make_unique<ColumnAndListCollection>(adjCollection->columns, adjCollection->lists);
vector<unique_ptr<ColumnAndListCollection>> clonedPropertyCollections;
for (auto& propertyCollection : propertyCollections) {
clonedPropertyCollections.push_back(make_unique<ColumnAndListCollection>(
propertyCollection->columns, propertyCollection->lists));
}
return make_unique<AdjAndPropertyCollection>(
std::move(clonedAdjCollection), std::move(clonedPropertyCollections));
}

void GenericExtend::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
inVector = resultSet->getValueVector(inVectorPos);
listSyncState = make_unique<ListSyncState>();
outNodeVector = resultSet->getValueVector(outNodeVectorPos);
for (auto& dataPos : outPropertyVectorsPos) {
auto vector = resultSet->getValueVector(dataPos);
outPropertyVectors.push_back(vector);
}
for (auto& [_, adjAndPropertyCollection] : adjAndPropertyCollectionPerNodeTable) {
adjAndPropertyCollection->populateListHandles(*listSyncState);
}
// config local state
currentAdjAndPropertyCollection = nullptr;
}

bool GenericExtend::getNextTuplesInternal() {
while (true) {
if (scanCurrentAdjAndPropertyCollection()) {
metrics->numOutputTuple.increase(outNodeVector->state->selVector->selectedSize);
return true;
}
if (!children[0]->getNextTuple()) {
return false;
}
auto currentIdx = inVector->state->selVector->selectedPositions[0];
if (inVector->isNull(currentIdx)) {
outNodeVector->state->selVector->selectedSize = 0;
continue;
}
auto nodeID = inVector->getValue<nodeID_t>(currentIdx);
initCurrentAdjAndPropertyCollection(nodeID);
}
}

bool GenericExtend::scanCurrentAdjAndPropertyCollection() {
if (currentAdjAndPropertyCollection == nullptr) {
return false;
}
return currentAdjAndPropertyCollection->scan(
inVector, outNodeVector, outPropertyVectors, transaction);
}

void GenericExtend::initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID) {
if (adjAndPropertyCollectionPerNodeTable.contains(nodeID.tableID)) {
currentAdjAndPropertyCollection =
adjAndPropertyCollectionPerNodeTable.at(nodeID.tableID).get();
currentAdjAndPropertyCollection->resetState(nodeID.offset);
} else {
currentAdjAndPropertyCollection = nullptr;
}
}

} // namespace processor
} // namespace kuzu

0 comments on commit 784f5b6

Please sign in to comment.