Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi labeled graph pattern #1104

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 76 additions & 33 deletions src/include/processor/operator/generic_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,79 @@
namespace kuzu {
namespace processor {

// Each output vector is scanned from a collection of Columns and Lists
struct ColumnAndListCollection {
vector<Column*> columns;
vector<Lists*> lists;
vector<shared_ptr<ListHandle>> listHandles;

ColumnAndListCollection(vector<Column*> columns, vector<Lists*> lists)
: columns{std::move(columns)}, lists{std::move(lists)} {}

void populateListHandles(ListSyncState& syncState);
};

class AdjAndPropertyCollection {
public:
AdjAndPropertyCollection(unique_ptr<ColumnAndListCollection> adjCollection,
vector<unique_ptr<ColumnAndListCollection>> propertyCollections)
: adjCollection{std::move(adjCollection)}, propertyCollections{
std::move(propertyCollections)} {}

void populateListHandles();

void resetState(node_offset_t nodeOffset);

bool scan(const shared_ptr<ValueVector>& inVector, const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);

unique_ptr<AdjAndPropertyCollection> clone() const;

private:
bool scanColumns(const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);
bool scanLists(const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);

inline bool hasColumnToScan() const { return nextColumnIdx < adjCollection->columns.size(); }
inline bool hasListToScan() const { return nextListIdx < adjCollection->lists.size(); }

bool scanColumn(uint32_t idx, const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);
bool scanList(uint32_t idx, const shared_ptr<ValueVector>& inVector,
const shared_ptr<ValueVector>& outNodeVector,
const vector<shared_ptr<ValueVector>>& outPropertyVectors, Transaction* transaction);

private:
unique_ptr<ColumnAndListCollection> adjCollection;
vector<unique_ptr<ColumnAndListCollection>> propertyCollections;

// Next column idx to scan.
uint32_t nextColumnIdx = UINT32_MAX;
// Next list idx to scan.
uint32_t nextListIdx = UINT32_MAX;
// Current node offset to extend from.
node_offset_t currentNodeOffset = INVALID_NODE_OFFSET;
// Current list idx to scan. Note that a list may be scanned multiple times.
uint32_t currentListIdx = UINT32_MAX;
// Sync between adjList and propertyLists
unique_ptr<ListSyncState> listSyncState = nullptr;
};

class GenericExtend : public PhysicalOperator {
public:
GenericExtend(const DataPos& inVectorPos, const DataPos& outNodeVectorPos,
ColumnAndListCollection adjColumnAndListCollection, vector<DataPos> outPropertyVectorsPos,
vector<ColumnAndListCollection> propertyColumnAndListCollections,
vector<DataPos> outPropertyVectorsPos,
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString}, inVectorPos{inVectorPos},
outNodeVectorPos{outNodeVectorPos}, adjColumnAndListCollection{std::move(
adjColumnAndListCollection)},
outPropertyVectorsPos{std::move(outPropertyVectorsPos)},
propertyColumnAndListCollections{std::move(propertyColumnAndListCollections)} {}
outNodeVectorPos{outNodeVectorPos}, outPropertyVectorsPos{std::move(
outPropertyVectorsPos)},
adjAndPropertyCollectionPerNodeTable{std::move(adjAndPropertyCollectionPerNodeTable)} {}
~GenericExtend() override = default;

inline PhysicalOperatorType getOperatorType() override {
Expand All @@ -35,44 +91,31 @@ class GenericExtend : public PhysicalOperator {
bool getNextTuplesInternal() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<GenericExtend>(inVectorPos, outNodeVectorPos, adjColumnAndListCollection,
outPropertyVectorsPos, propertyColumnAndListCollections, children[0]->clone(), id,
paramsString);
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>> clonedCollections;
for (auto& [tableID, adjAndPropertyCollection] : adjAndPropertyCollectionPerNodeTable) {
clonedCollections.insert({tableID, adjAndPropertyCollection->clone()});
}
return make_unique<GenericExtend>(inVectorPos, outNodeVectorPos, outPropertyVectorsPos,
std::move(clonedCollections), children[0]->clone(), id, paramsString);
}

private:
bool scan();

inline bool hasColumnToScan() const {
return nextColumnIdx < adjColumnAndListCollection.columns.size();
}
inline bool hasListToScan() const {
return nextListIdx < adjColumnAndListCollection.lists.size();
}
bool scanColumns();
bool scanLists();
bool scanColumn(uint32_t idx);
bool scanList(uint32_t idx);
bool scanCurrentAdjAndPropertyCollection();
void initCurrentAdjAndPropertyCollection(const nodeID_t& nodeID);

private:
// vector positions
DataPos inVectorPos;
DataPos outNodeVectorPos;
ColumnAndListCollection adjColumnAndListCollection;
vector<DataPos> outPropertyVectorsPos;
vector<ColumnAndListCollection> propertyColumnAndListCollections;

// vectors
shared_ptr<ValueVector> inVector;
shared_ptr<ValueVector> outNodeVector;
vector<shared_ptr<ValueVector>> outPropertyVectors;
unique_ptr<ListSyncState> listSyncState;
uint32_t nextColumnIdx;
uint32_t nextListIdx;
// A list may be scanned for multi getNext() call e.g. large list. So we track current list.
AdjLists* currentAdjList;
ListHandle* currentAdjListHandle;
vector<Lists*> currentPropertyLists;
vector<ListHandle*> currentPropertyListHandles;
node_offset_t currentNodeOffset;
// storage structures
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable;
AdjAndPropertyCollection* currentAdjAndPropertyCollection = nullptr;
};

} // namespace processor
Expand Down
4 changes: 4 additions & 0 deletions src/planner/asp_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ bool ASPOptimizer::canApplyASP(const vector<shared_ptr<NodeExpression>>& joinNod
return false;
}
auto rightScanNodeID = (LogicalScanNode*)rightScanNodeIDs[0];
// Semi mask cannot be applied to a ScanNodeID on multiple node tables.
if (rightScanNodeID->getNode()->getNumTableIDs() > 1) {
return false;
}
// Semi mask can only be pushed to ScanNodeIDs.
if (joinNodes[0]->getUniqueName() != rightScanNodeID->getNode()->getUniqueName()) {
return false;
Expand Down
25 changes: 13 additions & 12 deletions src/planner/join_order_enumerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ void JoinOrderEnumerator::appendIndexScanNode(
bool JoinOrderEnumerator::needExtendToNewGroup(
RelExpression& rel, NodeExpression& boundNode, RelDirection direction) {
auto extendToNewGroup = false;
extendToNewGroup |= boundNode.getNumTableIDs() > 1;
extendToNewGroup |= rel.getNumTableIDs() > 1;
if (rel.getNumTableIDs() == 1) {
auto relTableID = *rel.getTableIDs().begin();
Expand All @@ -589,15 +590,13 @@ void JoinOrderEnumerator::appendExtend(shared_ptr<NodeExpression> boundNode,
shared_ptr<NodeExpression> nbrNode, shared_ptr<RelExpression> rel, RelDirection direction,
const expression_vector& properties, LogicalPlan& plan) {
auto schema = plan.getSchema();
if (boundNode->getNumTableIDs() > 1) {
throw NotImplementedException("Extend from multi-labeled node is not supported.");
}
auto extendToNewGroup = needExtendToNewGroup(*rel, *boundNode, direction);
if (needFlatInput(*rel, *boundNode, direction)) {
QueryPlanner::appendFlattenIfNecessary(boundNode->getInternalIDProperty(), plan);
}
shared_ptr<LogicalExtend> extend;
if (rel->getNumTableIDs() > 1) {
// TODO(Xiyang): merge extend and generic extend on the logical level.
if (rel->getNumTableIDs() > 1 || boundNode->getNumTableIDs() > 1) {
extend = make_shared<LogicalGenericExtend>(boundNode, nbrNode, rel, direction,
extendToNewGroup, properties, plan.getLastOperator());
} else {
Expand Down Expand Up @@ -792,15 +791,17 @@ expression_vector JoinOrderEnumerator::getPropertiesForVariable(

uint64_t JoinOrderEnumerator::getExtensionRate(
const RelExpression& rel, const NodeExpression& boundNode, RelDirection direction) {
auto boundNodeTableID = boundNode.getTableID();
double numBoundNodes =
nodesStatistics.getNodeStatisticsAndDeletedIDs(boundNodeTableID)->getNumTuples();
double numBoundNodes = 0;
double numRels = 0;
for (auto relTableID : rel.getTableIDs()) {
auto relStatistic = (RelStatistics*)relsStatistics.getReadOnlyVersion()
->tableStatisticPerTable[relTableID]
.get();
numRels += relStatistic->getNumRelsForDirectionBoundTable(direction, boundNodeTableID);
for (auto boundNodeTableID : boundNode.getTableIDs()) {
numBoundNodes +=
nodesStatistics.getNodeStatisticsAndDeletedIDs(boundNodeTableID)->getNumTuples();
for (auto relTableID : rel.getTableIDs()) {
auto relStatistic = (RelStatistics*)relsStatistics.getReadOnlyVersion()
->tableStatisticPerTable[relTableID]
.get();
numRels += relStatistic->getNumRelsForDirectionBoundTable(direction, boundNodeTableID);
}
}
return ceil(numRels / numBoundNodes);
}
Expand Down
3 changes: 2 additions & 1 deletion src/planner/query_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ void QueryPlanner::appendScanRelPropIfNecessary(shared_ptr<NodeExpression> bound
if (schema->isExpressionInScope(*property)) {
return;
}
assert(!rel->isVariableLength() && rel->getNumTableIDs() == 1);
assert(
!rel->isVariableLength() && rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto scanProperty = make_shared<LogicalScanRelProperty>(std::move(boundNode),
std::move(nbrNode), std::move(rel), direction, std::move(property), plan.getLastOperator());
scanProperty->computeSchema(*schema);
Expand Down
100 changes: 58 additions & 42 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
mapperContext.addComputedExpressions(nbrNode->getInternalIDPropertyName());
auto& relsStore = storageManager.getRelsStore();
auto boundNodeTableID = boundNode->getTableID();
assert(rel->getNumTableIDs() == 1);
assert(rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto relTableID = *rel->getTableIDs().begin();
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
auto column = relsStore.getAdjColumn(direction, boundNodeTableID, relTableID);
Expand All @@ -48,6 +48,48 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
}
}

static unique_ptr<ColumnAndListCollection> populateAdjCollection(table_id_t boundNodeTableID,
const RelExpression& rel, RelDirection direction, const RelsStore& relsStore) {
vector<Column*> adjColumns;
vector<Lists*> adjLists;
for (auto relTableID : rel.getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
adjColumns.push_back(relsStore.getAdjColumn(direction, boundNodeTableID, relTableID));
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
adjLists.push_back(relsStore.getAdjLists(direction, boundNodeTableID, relTableID));
}
}
return make_unique<ColumnAndListCollection>(std::move(adjColumns), std::move(adjLists));
}

static unique_ptr<ColumnAndListCollection> populatePropertyCollection(table_id_t boundNodeTableID,
const RelExpression& rel, RelDirection direction, const PropertyExpression& propertyExpression,
const RelsStore& relsStore) {
vector<Column*> propertyColumns;
vector<Lists*> propertyLists;
for (auto relTableID : rel.getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
Column* propertyColumn = nullptr;
if (propertyExpression.hasPropertyID(relTableID)) {
propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID,
boundNodeTableID, propertyExpression.getPropertyID(relTableID));
}
propertyColumns.push_back(propertyColumn);
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
Lists* propertyList = nullptr;
if (propertyExpression.hasPropertyID(relTableID)) {
propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID,
relTableID, propertyExpression.getPropertyID(relTableID));
}
propertyLists.push_back(propertyList);
}
}
return make_unique<ColumnAndListCollection>(
std::move(propertyColumns), std::move(propertyLists));
}

unique_ptr<PhysicalOperator> PlanMapper::mapLogicalGenericExtendToPhysical(
LogicalOperator* logicalOperator, MapperContext& mapperContext) {
auto extend = (LogicalGenericExtend*)logicalOperator;
Expand All @@ -65,49 +107,23 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalGenericExtendToPhysical(
mapperContext.addComputedExpressions(expression->getUniqueName());
}
auto& relsStore = storageManager.getRelsStore();
auto boundNodeTableID = boundNode->getTableID();
ColumnAndListCollection adjColumnAndListCollection;
for (auto relTableID : rel->getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
adjColumnAndListCollection.columns.push_back(
relsStore.getAdjColumn(direction, boundNodeTableID, relTableID));
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
adjColumnAndListCollection.lists.push_back(
relsStore.getAdjLists(direction, boundNodeTableID, relTableID));
}
}
vector<ColumnAndListCollection> propertyColumnAndListCollections;
for (auto& expression : extend->getProperties()) {
ColumnAndListCollection propertyColumnAndListCollection;
auto propertyExpression = (PropertyExpression*)expression.get();
for (auto relTableID : rel->getTableIDs()) {
if (relsStore.hasAdjColumn(direction, boundNodeTableID, relTableID)) {
Column* propertyColumn;
if (!propertyExpression->hasPropertyID(relTableID)) {
propertyColumn = nullptr;
} else {
propertyColumn = relsStore.getRelPropertyColumn(direction, relTableID,
boundNodeTableID, propertyExpression->getPropertyID(relTableID));
}
propertyColumnAndListCollection.columns.push_back(propertyColumn);
}
if (relsStore.hasAdjList(direction, boundNodeTableID, relTableID)) {
Lists* propertyList;
if (!propertyExpression->hasPropertyID(relTableID)) {
propertyList = nullptr;
} else {
propertyList = relsStore.getRelPropertyLists(direction, boundNodeTableID,
relTableID, propertyExpression->getPropertyID(relTableID));
}
propertyColumnAndListCollection.lists.push_back(propertyList);
}
unordered_map<table_id_t, unique_ptr<AdjAndPropertyCollection>>
adjAndPropertyCollectionPerNodeTable;
for (auto boundNodeTableID : boundNode->getTableIDs()) {
auto adjCollection = populateAdjCollection(boundNodeTableID, *rel, direction, relsStore);
vector<unique_ptr<ColumnAndListCollection>> propertyCollections;
for (auto& expression : extend->getProperties()) {
auto propertyExpression = (PropertyExpression*)expression.get();
propertyCollections.push_back(populatePropertyCollection(
boundNodeTableID, *rel, direction, *propertyExpression, relsStore));
}
propertyColumnAndListCollections.push_back(std::move(propertyColumnAndListCollection));
adjAndPropertyCollectionPerNodeTable.insert(
{boundNodeTableID, make_unique<AdjAndPropertyCollection>(
std::move(adjCollection), std::move(propertyCollections))});
}
return make_unique<GenericExtend>(inDataPos, outNodePos, std::move(adjColumnAndListCollection),
outPropertyVectorsPos, std::move(propertyColumnAndListCollections), std::move(prevOperator),
getOperatorID(), extend->getExpressionsForPrinting());
return make_unique<GenericExtend>(inDataPos, outNodePos, outPropertyVectorsPos,
std::move(adjAndPropertyCollectionPerNodeTable), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
}

} // namespace processor
Expand Down
3 changes: 2 additions & 1 deletion src/processor/mapper/map_scan_rel_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalScanRelPropertyToPhysical(
auto scanRelProperty = (LogicalScanRelProperty*)logicalOperator;
auto boundNode = scanRelProperty->getBoundNode();
auto rel = scanRelProperty->getRel();
assert(rel->getNumTableIDs() == 1);
assert(
!rel->isVariableLength() && rel->getNumTableIDs() == 1 && boundNode->getNumTableIDs() == 1);
auto relID = rel->getTableID();
auto direction = scanRelProperty->getDirection();
auto propertyExpression = (PropertyExpression*)scanRelProperty->getProperty().get();
Expand Down
Loading