Skip to content

Commit

Permalink
combine multi-labeled node and rel
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Dec 9, 2022
1 parent e569972 commit 2555bcc
Show file tree
Hide file tree
Showing 15 changed files with 353 additions and 159 deletions.
109 changes: 76 additions & 33 deletions src/include/processor/operator/generic_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,79 @@
namespace kuzu {
namespace processor {

// Each output vector is scanned from a collection of Columns and Lists
struct ColumnAndListCollection {
vector<Column*> columns;
vector<Lists*> lists;
vector<shared_ptr<ListHandle>> listHandles;

ColumnAndListCollection(vector<Column*> columns, vector<Lists*> lists)
: columns{std::move(columns)}, lists{std::move(lists)} {}

void populateListHandles(ListSyncState& syncState);
};

class AdjAndPropertyCollection {
public:
AdjAndPropertyCollection(unique_ptr<ColumnAndListCollection> adjCollection,
vector<unique_ptr<ColumnAndListCollection>> propertyCollections)
: adjCollection{std::move(adjCollection)}, propertyCollections{
std::move(propertyCollections)} {}

void populateListHandles();

void resetState(node_offset_t nodeOffset);

bool scan(const shared_ptr<ValueVector>& inVector, const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);

unique_ptr<AdjAndPropertyCollection> clone() const;

private:
bool scanColumns(const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);
bool scanLists(const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);

inline bool hasColumnToScan() const { return nextColumnIdx < adjCollection->columns.size(); }
inline bool hasListToScan() const { return nextListIdx < adjCollection->lists.size(); }

bool scanColumn(uint32_t idx, const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);
bool scanList(uint32_t idx, const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);

private:
unique_ptr<ColumnAndListCollection> adjCollection;
vector<unique_ptr<ColumnAndListCollection>> propertyCollections;

// Next column idx to scan.
uint32_t nextColumnIdx = UINT32_MAX;
// Next list idx to scan.
uint32_t nextListIdx = UINT32_MAX;
// Current node offset to extend from.
node_offset_t currentNodeOffset = INVALID_NODE_OFFSET;
// Current list idx to scan. Note that a list may be scanned multiple times.
uint32_t currentListIdx = UINT32_MAX;
// Sync between adjList and propertyLists
unique_ptr<ListSyncState> listSyncState = nullptr;
};

class GenericExtend : public PhysicalOperator {
public:
GenericExtend(const DataPos& inVectorPos, const DataPos& outNodeVectorPos,
ColumnAndListCollection adjColumnAndListCollection, vector<DataPos> outPropertyVectorsPos,
vector<ColumnAndListCollection> propertyColumnAndListCollections,
vector<DataPos> outPropertyVectorsPos,
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString}, inVectorPos{inVectorPos},
outNodeVectorPos{outNodeVectorPos}, adjColumnAndListCollection{std::move(
adjColumnAndListCollection)},
outPropertyVectorsPos{std::move(outPropertyVectorsPos)},
propertyColumnAndListCollections{std::move(propertyColumnAndListCollections)} {}
outNodeVectorPos{outNodeVectorPos}, outPropertyVectorsPos{std::move(
outPropertyVectorsPos)},
adjAndPropertyCollectionPerNodeTable{std::move(adjAndPropertyCollectionPerNodeTable)} {}
~GenericExtend() override = default;

inline PhysicalOperatorType getOperatorType() override {
Expand All @@ -35,44 +91,31 @@ class GenericExtend : public PhysicalOperator {
bool getNextTuplesInternal() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<GenericExtend>(inVectorPos, outNodeVectorPos, adjColumnAndListCollection,
outPropertyVectorsPos, propertyColumnAndListCollections, children[0]->clone(), id,
paramsString);
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>> clonedCollections;
for (auto& [tableID, adjAndPropertyCollection] : adjAndPropertyCollectionPerNodeTable) {
clonedCollections.insert({tableID, adjAndPropertyCollection->clone()});
}
return make_unique<GenericExtend>(inVectorPos, outNodeVectorPos, outPropertyVectorsPos,
std::move(clonedCollections), children[0]->clone(), id, paramsString);
}

private:
bool scan();

inline bool hasColumnToScan() const {
return nextColumnIdx < adjColumnAndListCollection.columns.size();
}
inline bool hasListToScan() const {
return nextListIdx < adjColumnAndListCollection.lists.size();
}
bool scanColumns();
bool scanLists();
bool scanColumn(uint32_t idx);
bool scanList(uint32_t idx);
bool scanCurrentAdjAndPropertyCollection();
void initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID);

private:
// vector positions
DataPos inVectorPos;
DataPos outNodeVectorPos;
ColumnAndListCollection adjColumnAndListCollection;
vector<DataPos> outPropertyVectorsPos;
vector<ColumnAndListCollection> propertyColumnAndListCollections;

// vectors
shared_ptr<ValueVector> inVector;
shared_ptr<ValueVector> outNodeVector;
vector<shared_ptr<ValueVector>> outPropertyVectors;
unique_ptr<ListSyncState> listSyncState;
uint32_t nextColumnIdx;
uint32_t nextListIdx;
// A list may be scanned for multi getNext() call e.g. large list. So we track current list.
AdjLists* currentAdjList;
ListHandle* currentAdjListHandle;
vector<Lists*> currentPropertyLists;
vector<ListHandle*> currentPropertyListHandles;
node_offset_t currentNodeOffset;
// storage structures
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable;
AdjAndPropertyCollection* currentAdjAndPropertyCollection = nullptr;
};

} // namespace processor
Expand Down
4 changes: 4 additions & 0 deletions src/planner/asp_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ bool ASPOptimizer::canApplyASP(const vector<shared_ptr<NodeExpression>>& joinNod
return false;
}
auto rightScanNodeID = (LogicalScanNode*)rightScanNodeIDs[0];
// Semi mask cannot be applied to a ScanNodeID on multiple node tables.
if (rightScanNodeID->getNode()->getNumTableIDs() > 1) {
return false;
}
// Semi mask can only be pushed to ScanNodeIDs.
if (joinNodes[0]->getUniqueName() != rightScanNodeID->getNode()->getUniqueName()) {
return false;
Expand Down
25 changes: 13 additions & 12 deletions src/planner/join_order_enumerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ void JoinOrderEnumerator::appendIndexScanNode(
bool JoinOrderEnumerator::needExtendToNewGroup(
RelExpression& rel, NodeExpression& boundNode, RelDirection direction) {
auto extendToNewGroup = false;
extendToNewGroup |= boundNode.getNumTableIDs() > 1;
extendToNewGroup |= rel.getNumTableIDs() > 1;
if (rel.getNumTableIDs() == 1) {
auto relTableID = *rel.getTableIDs().begin();
Expand All @@ -589,15 +590,13 @@ void JoinOrderEnumerator::appendExtend(shared_ptr<NodeExpression> boundNode,
shared_ptr<NodeExpression> nbrNode, shared_ptr<RelExpression> rel, RelDirection direction,
const expression_vector& properties, LogicalPlan& plan) {
auto schema = plan.getSchema();
if (boundNode->getNumTableIDs() > 1) {
throw NotImplementedException("Extend from multi-labeled node is not supported.");
}
auto extendToNewGroup = needExtendToNewGroup(*rel, *boundNode, direction);
if (needFlatInput(*rel, *boundNode, direction)) {
QueryPlanner::appendFlattenIfNecessary(boundNode->getInternalIDProperty(), plan);
}
shared_ptr<LogicalExtend> extend;
if (rel->getNumTableIDs() > 1) {
// TODO(Xiyang): merge extend and generic extend on the logical level.
if (rel->getNumTableIDs() > 1 || boundNode->getNumTableIDs() > 1) {
extend = make_shared<LogicalGenericExtend>(boundNode, nbrNode, rel, direction,
extendToNewGroup, properties, plan.getLastOperator());
} else {
Expand Down Expand Up @@ -792,15 +791,17 @@ expression_vector JoinOrderEnumerator::getPropertiesForVariable(

uint64_t JoinOrderEnumerator::getExtensionRate(
const RelExpression& rel, const NodeExpression& boundNode, RelDirection direction) {
auto boundNodeTableID = boundNode.getTableID();
double numBoundNodes =
nodesStatistics.getNodeStatisticsAndDeletedIDs(boundNodeTableID)->getNumTuples();
double numBoundNodes = 0;
double numRels = 0;
for (auto relTableID : rel.getTableIDs()) {
auto relStatistic = (RelStatistics*)relsStatistics.getReadOnlyVersion()
->tableStatisticPerTable[relTableID]
.get();
numRels += relStatistic->getNumRelsForDirectionBoundTable(direction, boundNodeTableID);
for (auto boundNodeTableID : boundNode.getTableIDs()) {
numBoundNodes +=
nodesStatistics.getNodeStatisticsAndDeletedIDs(boundNodeTableID)->getNumTuples();
for (auto relTableID : rel.getTableIDs()) {
auto relStatistic = (RelStatistics*)relsStatistics.getReadOnlyVersion()
->tableStatisticPerTable[relTableID]
.get();
numRels += relStatistic->getNumRelsForDirectionBoundTable(direction, boundNodeTableID);
}
}
return ceil(numRels / numBoundNodes);
}
Expand Down
3 changes: 2 additions & 1 deletion src/planner/query_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ void QueryPlanner::appendScanRelPropIfNecessary(shared_ptr<NodeExpression> bound
if (schema->isExpressionInScope(*property)) {
return;
}
assert(!rel->isVariableLength() && rel->getNumTableIDs() == 1);
assert(
!rel->isVariableLength() && rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto scanProperty = make_shared<LogicalScanRelProperty>(std::move(boundNode),
std::move(nbrNode), std::move(rel), direction, std::move(property), plan.getLastOperator());
scanProperty->computeSchema(*schema);
Expand Down
100 changes: 58 additions & 42 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
mapperContext.addComputedExpressions(nbrNode->getInternalIDPropertyName());
auto& relsStore = storageManager.getRelsStore();
auto boundNodeTableID = boundNode->getTableID();
assert(rel->getNumTableIDs() == 1);
assert(rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto relTableID = *rel->getTableIDs().begin();
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
auto column = relsStore.getAdjColumn(direction, boundNodeTableID, relTableID);
Expand All @@ -48,6 +48,48 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
}
}

static unique_ptr<ColumnAndListCollection> populateAdjCollection(table_id_t boundNodeTableID,
const RelExpression& rel, RelDirection direction, const RelsStore& relsStore) {
vector<Column*> adjColumns;
vector<Lists*> adjLists;
for (auto relTableID : rel.getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
adjColumns.push_back(relsStore.getAdjColumn(direction, boundNodeTableID, relTableID));
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
adjLists.push_back(relsStore.getAdjLists(direction, boundNodeTableID, relTableID));
}
}
return make_unique<ColumnAndListCollection>(std::move(adjColumns), std::move(adjLists));
}

static unique_ptr<ColumnAndListCollection> populatePropertyCollection(table_id_t boundNodeTableID,
const RelExpression& rel, RelDirection direction, const PropertyExpression& propertyExpression,
const RelsStore& relsStore) {
vector<Column*> propertyColumns;
vector<Lists*> propertyLists;
for (auto relTableID : rel.getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
Column* propertyColumn = nullptr;
if (propertyExpression.hasPropertyID(relTableID)) {
propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID,
boundNodeTableID, propertyExpression.getPropertyID(relTableID));
}
propertyColumns.push_back(propertyColumn);
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
Lists* propertyList = nullptr;
if (propertyExpression.hasPropertyID(relTableID)) {
propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID,
relTableID, propertyExpression.getPropertyID(relTableID));
}
propertyLists.push_back(propertyList);
}
}
return make_unique<ColumnAndListCollection>(
std::move(propertyColumns), std::move(propertyLists));
}

unique_ptr<PhysicalOperator> PlanMapper::mapLogicalGenericExtendToPhysical(
LogicalOperator* logicalOperator, MapperContext& mapperContext) {
auto extend = (LogicalGenericExtend*)logicalOperator;
Expand All @@ -65,49 +107,23 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalGenericExtendToPhysical(
mapperContext.addComputedExpressions(expression->getUniqueName());
}
auto& relsStore = storageManager.getRelsStore();
auto boundNodeTableID = boundNode->getTableID();
ColumnAndListCollection adjColumnAndListCollection;
for (auto relTableID : rel->getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
adjColumnAndListCollection.columns.push_back(
relsStore.getAdjColumn(direction, boundNodeTableID, relTableID));
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
adjColumnAndListCollection.lists.push_back(
relsStore.getAdjLists(direction, boundNodeTableID, relTableID));
}
}
vector<ColumnAndListCollection> propertyColumnAndListCollections;
for (auto& expression : extend->getProperties()) {
ColumnAndListCollection propertyColumnAndListCollection;
auto propertyExpression = (PropertyExpression*)expression.get();
for (auto relTableID : rel->getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
Column* propertyColumn;
if (!propertyExpression->hasPropertyID(relTableID)) {
propertyColumn = nullptr;
} else {
propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID,
boundNodeTableID, propertyExpression->getPropertyID(relTableID));
}
propertyColumnAndListCollection.columns.push_back(propertyColumn);
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
Lists* propertyList;
if (!propertyExpression->hasPropertyID(relTableID)) {
propertyList = nullptr;
} else {
propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID,
relTableID, propertyExpression->getPropertyID(relTableID));
}
propertyColumnAndListCollection.lists.push_back(propertyList);
}
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable;
for (auto boundNodeTableID : boundNode->getTableIDs()) {
auto adjCollection = populateAdjCollection(boundNodeTableID, *rel, direction, relsStore);
vector<unique_ptr<ColumnAndListCollection>> propertyCollections;
for (auto& expression : extend->getProperties()) {
auto propertyExpression = (PropertyExpression*)expression.get();
propertyCollections.push_back(populatePropertyCollection(
boundNodeTableID, *rel, direction, *propertyExpression, relsStore));
}
propertyColumnAndListCollections.push_back(std::move(propertyColumnAndListCollection));
adjAndPropertyCollectionPerNodeTable.insert(
{boundNodeTableID, make_unique<AdjAndPropertyCollection>(
std::move(adjCollection), std::move(propertyCollections))});
}
return make_unique<GenericExtend>(inDataPos, outNodePos, std::move(adjColumnAndListCollection),
outPropertyVectorsPos, std::move(propertyColumnAndListCollections), std::move(prevOperator),
getOperatorID(), extend->getExpressionsForPrinting());
return make_unique<GenericExtend>(inDataPos, outNodePos, outPropertyVectorsPos,
std::move(adjAndPropertyCollectionPerNodeTable), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
}

} // namespace processor
Expand Down
3 changes: 2 additions & 1 deletion src/processor/mapper/map_scan_rel_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalScanRelPropertyToPhysical(
auto scanRelProperty = (LogicalScanRelProperty*)logicalOperator;
auto boundNode = scanRelProperty->getBoundNode();
auto rel = scanRelProperty->getRel();
assert(rel->getNumTableIDs() == 1);
assert(
!rel->isVariableLength() && rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto relID = rel->getTableID();
auto direction = scanRelProperty->getDirection();
auto propertyExpression = (PropertyExpression*)scanRelProperty->getProperty().get();
Expand Down
Loading

0 comments on commit 2555bcc

Please sign in to comment.