Skip to content

Commit

Permalink
add generic extend with property reading
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Dec 7, 2022
1 parent b943f09 commit b304f17
Show file tree
Hide file tree
Showing 28 changed files with 527 additions and 259 deletions.
22 changes: 12 additions & 10 deletions src/include/planner/join_order_enumerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ class JoinOrderEnumerator {
void planPropertyScansForNode(shared_ptr<NodeExpression> node, LogicalPlan& plan);

void planRelScan(uint32_t relPos);
inline void planRelExtendFiltersAndProperties(shared_ptr<RelExpression>& rel,
RelDirection direction, expression_vector& predicates, LogicalPlan& plan) {
appendExtend(rel, direction, plan);
planFiltersForRel(predicates, *rel, direction, plan);
planPropertyScansForRel(*rel, direction, plan);
}

void planExtendFiltersAndPropertyScans(shared_ptr<RelExpression> rel, RelDirection direction,
expression_vector& predicates, LogicalPlan& plan);
// Filter push down for rel table.
void planFiltersForRel(expression_vector& predicates, RelExpression& rel,
RelDirection direction, LogicalPlan& plan);
void planFiltersForRel(const expression_vector& predicates,
shared_ptr<NodeExpression> boundNode, shared_ptr<NodeExpression> nbrNode,
shared_ptr<RelExpression> rel, RelDirection direction, LogicalPlan& plan);
// Property push down for rel table.
void planPropertyScansForRel(RelExpression& rel, RelDirection direction, LogicalPlan& plan);
void planPropertyScansForRel(const expression_vector& properties,
shared_ptr<NodeExpression> boundNode, shared_ptr<NodeExpression> nbrNode,
shared_ptr<RelExpression> rel, RelDirection direction, LogicalPlan& plan);

void planLevel(uint32_t level);

Expand Down Expand Up @@ -111,7 +111,9 @@ class JoinOrderEnumerator {
bool needFlatInput(RelExpression& rel, NodeExpression& boundNode, RelDirection direction);
bool needExtendToNewGroup(
RelExpression& rel, NodeExpression& boundNode, RelDirection direction);
void appendExtend(shared_ptr<RelExpression>& rel, RelDirection direction, LogicalPlan& plan);
void appendExtend(shared_ptr<NodeExpression> boundNode, shared_ptr<NodeExpression> nbrNode,
shared_ptr<RelExpression> rel, RelDirection direction, const expression_vector& properties,
LogicalPlan& plan);

static void planJoin(const vector<shared_ptr<NodeExpression>>& joinNodes, JoinType joinType,
shared_ptr<Expression> mark, LogicalPlan& probePlan, LogicalPlan& buildPlan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum LogicalOperatorType : uint8_t {
LOGICAL_INDEX_SCAN_NODE,
LOGICAL_UNWIND,
LOGICAL_EXTEND,
LOGICAL_GENERIC_EXTEND,
LOGICAL_FLATTEN,
LOGICAL_FILTER,
LOGICAL_INTERSECT,
Expand Down Expand Up @@ -43,13 +44,13 @@ enum LogicalOperatorType : uint8_t {
};

const string LogicalOperatorTypeNames[] = {"LOGICAL_SCAN_NODE", "LOGICAL_INDEX_SCAN_NODE",
"LOGICAL_UNWIND", "LOGICAL_EXTEND", "LOGICAL_FLATTEN", "LOGICAL_FILTER", "LOGICAL_INTERSECT",
"LOGICAL_PROJECTION", "LOGICAL_SCAN_NODE_PROPERTY", "LOGICAL_SCAN_REL_PROPERTY",
"LOGICAL_CROSS_PRODUCT", "LOGICAL_SEMI_MASKER", "LOGICAL_HASH_JOIN",
"LOGICAL_MULTIPLICITY_REDUCER", "LOGICAL_LIMIT", "LOGICAL_SKIP", "LOGICAL_AGGREGATE",
"LOGICAL_ORDER_BY", "LOGICAL_UNION_ALL", "LOGICAL_DISTINCT", "LOGICAL_CREATE_NODE",
"LOGICAL_CREATE_REL", "LOGICAL_SET_NODE_PROPERTY", "LOGICAL_DELETE_NODE", "LOGICAL_DELETE_REL",
"LOGICAL_ACCUMULATE", "LOGICAL_EXPRESSIONS_SCAN", "LOGICAL_FTABLE_SCAN",
"LOGICAL_UNWIND", "LOGICAL_EXTEND", "LOGICAL_GENERIC_EXTEND", "LOGICAL_FLATTEN",
"LOGICAL_FILTER", "LOGICAL_INTERSECT", "LOGICAL_PROJECTION", "LOGICAL_SCAN_NODE_PROPERTY",
"LOGICAL_SCAN_REL_PROPERTY", "LOGICAL_CROSS_PRODUCT", "LOGICAL_SEMI_MASKER",
"LOGICAL_HASH_JOIN", "LOGICAL_MULTIPLICITY_REDUCER", "LOGICAL_LIMIT", "LOGICAL_SKIP",
"LOGICAL_AGGREGATE", "LOGICAL_ORDER_BY", "LOGICAL_UNION_ALL", "LOGICAL_DISTINCT",
"LOGICAL_CREATE_NODE", "LOGICAL_CREATE_REL", "LOGICAL_SET_NODE_PROPERTY", "LOGICAL_DELETE_NODE",
"LOGICAL_DELETE_REL", "LOGICAL_ACCUMULATE", "LOGICAL_EXPRESSIONS_SCAN", "LOGICAL_FTABLE_SCAN",
"LOGICAL_CREATE_NODE_TABLE", "LOGICAL_CREATE_REL_TABLE", "LOGICAL_COPY_CSV",
"LOGICAL_DROP_TABLE"};

Expand Down
36 changes: 34 additions & 2 deletions src/include/planner/logical_plan/logical_operator/logical_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class LogicalExtend : public LogicalOperator {
nbrNode->getRawName();
}

inline void computeSchema(Schema& schema) {
inline virtual void computeSchema(Schema& schema) {
auto boundGroupPos = schema.getGroupPos(boundNode->getInternalIDPropertyName());
uint32_t nbrGroupPos = 0u;
if (!extendToNewGroup) {
Expand All @@ -47,7 +47,7 @@ class LogicalExtend : public LogicalOperator {
boundNode, nbrNode, rel, direction, extendToNewGroup, children[0]->copy());
}

private:
protected:
shared_ptr<NodeExpression> boundNode;
shared_ptr<NodeExpression> nbrNode;
shared_ptr<RelExpression> rel;
Expand All @@ -56,5 +56,37 @@ class LogicalExtend : public LogicalOperator {
bool extendToNewGroup;
};

class LogicalGenericExtend : public LogicalExtend {
public:
LogicalGenericExtend(shared_ptr<NodeExpression> boundNode, shared_ptr<NodeExpression> nbrNode,
shared_ptr<RelExpression> rel, RelDirection direction, bool extendToNewGroup,
expression_vector properties, shared_ptr<LogicalOperator> child)
: LogicalExtend(std::move(boundNode), std::move(nbrNode), std::move(rel), direction,
extendToNewGroup, std::move(child)),
properties{std::move(properties)} {}

inline LogicalOperatorType getLogicalOperatorType() const override {
return LogicalOperatorType::LOGICAL_GENERIC_EXTEND;
}

inline void computeSchema(Schema& schema) override {
LogicalExtend::computeSchema(schema);
auto nbrNodePos = schema.getGroupPos(nbrNode->getInternalIDPropertyName());
for (auto& property : properties) {
schema.insertToGroupAndScope(property, nbrNodePos);
}
}

inline expression_vector getProperties() const { return properties; }

unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalGenericExtend>(
boundNode, nbrNode, rel, direction, extendToNewGroup, properties, children[0]->copy());
}

private:
expression_vector properties;
};

} // namespace planner
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -1,48 +1,48 @@
#pragma once

#include "base_logical_operator.h"
#include "binder/expression/node_expression.h"
#include "binder/expression/rel_expression.h"

namespace kuzu {
namespace planner {
using namespace kuzu::binder;

class LogicalScanRelProperty : public LogicalOperator {
public:
LogicalScanRelProperty(shared_ptr<NodeExpression> boundNodeExpression,
shared_ptr<NodeExpression> nbrNodeExpression, table_id_t relTableID, RelDirection direction,
string propertyName, uint32_t propertyID, bool isColumn, shared_ptr<LogicalOperator> child)
: LogicalOperator{move(child)}, boundNodeExpression{move(boundNodeExpression)},
nbrNodeExpression{move(nbrNodeExpression)}, relTableID{relTableID}, direction{direction},
propertyName{move(propertyName)}, propertyID{propertyID}, isColumn{isColumn} {}

LogicalOperatorType getLogicalOperatorType() const override {
LogicalScanRelProperty(shared_ptr<NodeExpression> boundNode, shared_ptr<NodeExpression> nbrNode,
shared_ptr<RelExpression> rel, RelDirection direction, shared_ptr<Expression> property,
shared_ptr<LogicalOperator> child)
: LogicalOperator{std::move(child)}, boundNode{std::move(boundNode)}, nbrNode{std::move(
nbrNode)},
rel{std::move(rel)}, direction{direction}, property{std::move(property)} {}

inline LogicalOperatorType getLogicalOperatorType() const override {
return LogicalOperatorType::LOGICAL_SCAN_REL_PROPERTY;
}

string getExpressionsForPrinting() const override { return propertyName; }
inline string getExpressionsForPrinting() const override { return property->getRawName(); }

inline shared_ptr<NodeExpression> getBoundNodeExpression() const { return boundNodeExpression; }
inline shared_ptr<NodeExpression> getNbrNodeExpression() const { return nbrNodeExpression; }
inline table_id_t getRelTableID() const { return relTableID; }
inline void computeSchema(Schema& schema) {
auto nbrGroupPos = schema.getGroupPos(nbrNode->getInternalIDPropertyName());
schema.insertToGroupAndScope(property, nbrGroupPos);
}

inline shared_ptr<NodeExpression> getBoundNode() const { return boundNode; }
inline shared_ptr<NodeExpression> getNbrNode() const { return nbrNode; }
inline shared_ptr<RelExpression> getRel() const { return rel; }
inline RelDirection getDirection() const { return direction; }
inline string getPropertyName() const { return propertyName; }
inline uint32_t getPropertyID() const { return propertyID; }
inline bool getIsColumn() const { return isColumn; }
inline shared_ptr<Expression> getProperty() const { return property; }

unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalScanRelProperty>(boundNodeExpression, nbrNodeExpression,
relTableID, direction, propertyName, propertyID, isColumn, children[0]->copy());
return make_unique<LogicalScanRelProperty>(
boundNode, nbrNode, rel, direction, property, children[0]->copy());
}

private:
shared_ptr<NodeExpression> boundNodeExpression;
shared_ptr<NodeExpression> nbrNodeExpression;
table_id_t relTableID;
shared_ptr<NodeExpression> boundNode;
shared_ptr<NodeExpression> nbrNode;
shared_ptr<RelExpression> rel;
RelDirection direction;
string propertyName;
uint32_t propertyID;
bool isColumn;
shared_ptr<Expression> property;
};

} // namespace planner
Expand Down
12 changes: 7 additions & 5 deletions src/include/planner/query_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ class QueryPlanner {
void appendScanNodePropIfNecessary(const expression_vector& propertyExpressions,
shared_ptr<NodeExpression> node, LogicalPlan& plan);

inline void appendScanRelPropsIfNecessary(expression_vector& properties, RelExpression& rel,
RelDirection direction, LogicalPlan& plan) {
inline void appendScanRelPropsIfNecessary(shared_ptr<NodeExpression> boundNode,
shared_ptr<NodeExpression> nbrNode, shared_ptr<RelExpression> rel, RelDirection direction,
const expression_vector& properties, LogicalPlan& plan) {
for (auto& property : properties) {
appendScanRelPropIfNecessary(property, rel, direction, plan);
appendScanRelPropIfNecessary(boundNode, nbrNode, rel, direction, property, plan);
}
}
void appendScanRelPropIfNecessary(shared_ptr<Expression>& expression, RelExpression& rel,
RelDirection direction, LogicalPlan& plan);
void appendScanRelPropIfNecessary(shared_ptr<NodeExpression> boundNode,
shared_ptr<NodeExpression> nbrNode, shared_ptr<RelExpression> rel, RelDirection direction,
shared_ptr<Expression> property, LogicalPlan& plan);

unique_ptr<LogicalPlan> createUnionPlan(
vector<unique_ptr<LogicalPlan>>& childrenPlans, bool isUnionAll);
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class PlanMapper {
LogicalOperator* logicalOperator, MapperContext& mapperContext);
unique_ptr<PhysicalOperator> mapLogicalExtendToPhysical(
LogicalOperator* logicalOperator, MapperContext& mapperContext);
unique_ptr<PhysicalOperator> mapLogicalGenericExtendToPhysical(
LogicalOperator* logicalOperator, MapperContext& mapperContext);
unique_ptr<PhysicalOperator> mapLogicalFlattenToPhysical(
LogicalOperator* logicalOperator, MapperContext& mapperContext);
unique_ptr<PhysicalOperator> mapLogicalFilterToPhysical(
Expand Down
55 changes: 37 additions & 18 deletions src/include/processor/operator/generic_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@
namespace kuzu {
namespace processor {

struct ColumnAndListCollection {
vector<Column*> columns;
vector<Lists*> lists;
vector<shared_ptr<ListHandle>> listHandles;
};

class GenericExtend : public PhysicalOperator {
public:
GenericExtend(const DataPos& inVectorPos, const DataPos& outVectorPos, vector<Column*> columns,
vector<Lists*> lists, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
GenericExtend(const DataPos& inVectorPos, const DataPos& outNodeVectorPos,
ColumnAndListCollection adjColumnAndListCollection, vector<DataPos> outPropertyVectorsPos,
vector<ColumnAndListCollection> propertyColumnAndListCollections,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString}, inVectorPos{inVectorPos},
outVectorPos{outVectorPos}, columns{std::move(columns)}, lists{std::move(lists)} {}
outNodeVectorPos{outNodeVectorPos}, adjColumnAndListCollection{std::move(
adjColumnAndListCollection)},
outPropertyVectorsPos{std::move(outPropertyVectorsPos)},
propertyColumnAndListCollections{std::move(propertyColumnAndListCollections)} {}
~GenericExtend() override = default;

inline PhysicalOperatorType getOperatorType() override {
Expand All @@ -25,34 +35,43 @@ class GenericExtend : public PhysicalOperator {
bool getNextTuplesInternal() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<GenericExtend>(
inVectorPos, outVectorPos, columns, lists, children[0]->clone(), id, paramsString);
return make_unique<GenericExtend>(inVectorPos, outNodeVectorPos, adjColumnAndListCollection,
outPropertyVectorsPos, propertyColumnAndListCollections, children[0]->clone(), id,
paramsString);
}

private:
bool findOutput();
bool scan();

inline bool hasColumnToScan() { return nextColumnIdx < columns.size(); }
inline bool hasListToScan() { return nextListIdx < lists.size(); }
bool findInColumns();
bool findInLists();
inline bool hasColumnToScan() const {
return nextColumnIdx < adjColumnAndListCollection.columns.size();
}
inline bool hasListToScan() const {
return nextListIdx < adjColumnAndListCollection.lists.size();
}
bool scanColumns();
bool scanLists();
bool scanColumn(uint32_t idx);
bool scanList(uint32_t idx);

private:
DataPos inVectorPos;
DataPos outVectorPos;
vector<Column*> columns;
vector<Lists*> lists;
vector<shared_ptr<ListHandle>> listHandles;
DataPos outNodeVectorPos;
ColumnAndListCollection adjColumnAndListCollection;
vector<DataPos> outPropertyVectorsPos;
vector<ColumnAndListCollection> propertyColumnAndListCollections;

shared_ptr<ValueVector> inVector;
shared_ptr<ValueVector> outVector;
shared_ptr<ValueVector> outNodeVector;
vector<shared_ptr<ValueVector>> outPropertyVectors;
unique_ptr<ListSyncState> listSyncState;
uint32_t nextColumnIdx;
uint32_t nextListIdx;
// A list may be scanned for multi getNext() call e.g. large list. So we track current list.
Lists* currentList;
ListHandle* currentListHandle;
AdjLists* currentAdjList;
ListHandle* currentAdjListHandle;
vector<Lists*> currentPropertyLists;
vector<ListHandle*> currentPropertyListHandles;
node_offset_t currentNodeOffset;
};

Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/storage_structure/lists/lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class ListsFactory {
const DataType& dataType, const shared_ptr<ListHeaders>& adjListsHeaders,
BufferManager& bufferManager, bool isInMemory, WAL* wal,
ListsUpdateStore* listsUpdateStore) {
// TODO(Ziyi): this is a super hacky design. Consider storing a relIDColumn/List in relTable
// just like adjColumn/List and we can have Extend read from both relIDColumn/List and
// adjColumn/List.
if (structureIDAndFName.storageStructureID.listFileID.relPropertyListID.propertyID ==
RelTableSchema::INTERNAL_REL_ID_PROPERTY_IDX) {
return make_unique<RelIDList>(structureIDAndFName, dataType,
Expand Down
16 changes: 8 additions & 8 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ class RelTable {

public:
inline Column* getPropertyColumn(
RelDirection relDirection, table_id_t tableID, uint64_t propertyIdx) {
return propertyColumns[relDirection].at(tableID)[propertyIdx].get();
RelDirection relDirection, table_id_t boundNodeTableID, uint64_t propertyIdx) {
return propertyColumns[relDirection].at(boundNodeTableID)[propertyIdx].get();
}
inline Lists* getPropertyLists(
RelDirection relDirection, table_id_t tableID, uint64_t propertyIdx) {
return propertyLists[relDirection].at(tableID)[propertyIdx].get();
RelDirection relDirection, table_id_t boundNodeTableID, uint64_t propertyIdx) {
return propertyLists[relDirection].at(boundNodeTableID)[propertyIdx].get();
}
inline bool hasAdjColumn(RelDirection relDirection, table_id_t boundNodeTableID) {
return adjColumns[relDirection].contains(boundNodeTableID);
}
inline AdjColumn* getAdjColumn(RelDirection relDirection, table_id_t tableID) {
return adjColumns[relDirection].at(tableID).get();
inline AdjColumn* getAdjColumn(RelDirection relDirection, table_id_t boundNodeTableID) {
return adjColumns[relDirection].at(boundNodeTableID).get();
}
inline bool hasAdjList(RelDirection relDirection, table_id_t boundNodeTableID) {
return adjLists[relDirection].contains(boundNodeTableID);
}
inline AdjLists* getAdjLists(RelDirection relDirection, table_id_t tableID) {
return adjLists[relDirection].at(tableID).get();
inline AdjLists* getAdjLists(RelDirection relDirection, table_id_t boundNodeTableID) {
return adjLists[relDirection].at(boundNodeTableID).get();
}
inline ListsUpdateStore* getListsUpdateStore() { return listsUpdateStore.get(); }
inline table_id_t getRelTableID() const { return tableID; }
Expand Down
18 changes: 10 additions & 8 deletions src/include/storage/store/rels_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ class RelsStore {
RelsStore(const Catalog& catalog, BufferManager& bufferManager, MemoryManager& memoryManager,
bool isInMemoryMode, WAL* wal);

inline Column* getRelPropertyColumn(const RelDirection& relDirection,
const table_id_t& relTableID, const table_id_t& nodeTableID,
const uint64_t& propertyIdx) const {
return relTables.at(relTableID)->getPropertyColumn(relDirection, nodeTableID, propertyIdx);
// TODO(Ziyi): other getters requires (direction, nodeID, relID) but this one is requiring
// (direction, relID, nodeID).
inline Column* getRelPropertyColumn(RelDirection relDirection, table_id_t relTableID,
table_id_t boundNodeTableID, uint64_t propertyIdx) const {
return relTables.at(relTableID)
->getPropertyColumn(relDirection, boundNodeTableID, propertyIdx);
}
inline Lists* getRelPropertyLists(const RelDirection& relDirection,
const table_id_t& nodeTableID, const table_id_t& relTableID,
const uint64_t& propertyIdx) const {
return relTables.at(relTableID)->getPropertyLists(relDirection, nodeTableID, propertyIdx);
inline Lists* getRelPropertyLists(RelDirection relDirection, table_id_t boundNodeTableID,
table_id_t relTableID, uint64_t propertyIdx) const {
return relTables.at(relTableID)
->getPropertyLists(relDirection, boundNodeTableID, propertyIdx);
}
inline bool hasAdjColumn(
RelDirection relDirection, table_id_t boundNodeTableID, table_id_t relTableID) const {
Expand Down
Loading

0 comments on commit b304f17

Please sign in to comment.