diff --git a/src/binder/bind/bind_graph_pattern.cpp b/src/binder/bind/bind_graph_pattern.cpp index 7a2344717b..b835b44683 100644 --- a/src/binder/bind/bind_graph_pattern.cpp +++ b/src/binder/bind/bind_graph_pattern.cpp @@ -179,11 +179,23 @@ void Binder::bindQueryRel(const RelPattern& relPattern, " are not connected through rel " + parsedName + "."); } } - auto dataType = isVariableLength ? - common::LogicalType(common::LogicalTypeID::RECURSIVE_REL, - std::make_unique( - std::make_unique(LogicalTypeID::INTERNAL_ID))) : - common::LogicalType(common::LogicalTypeID::REL); + common::LogicalType dataType; + if (isVariableLength) { + std::vector> structFields; + auto varListTypeInfo = std::make_unique( + std::make_unique(LogicalTypeID::INTERNAL_ID)); + auto nodeStructField = std::make_unique(InternalKeyword::NODES, + std::make_unique(LogicalTypeID::VAR_LIST, varListTypeInfo->copy())); + auto relStructField = std::make_unique(InternalKeyword::RELS, + std::make_unique(LogicalTypeID::VAR_LIST, varListTypeInfo->copy())); + structFields.push_back(std::move(nodeStructField)); + structFields.push_back(std::move(relStructField)); + auto structTypeInfo = std::make_unique(std::move(structFields)); + dataType = + common::LogicalType(common::LogicalTypeID::RECURSIVE_REL, std::move(structTypeInfo)); + } else { + dataType = common::LogicalType(common::LogicalTypeID::REL); + } auto queryRel = make_shared(dataType, getUniqueExpressionName(parsedName), parsedName, tableIDs, srcNode, dstNode, directionType, relPattern.getRelType()); if (isVariableLength) { diff --git a/src/binder/bind_expression/bind_function_expression.cpp b/src/binder/bind_expression/bind_function_expression.cpp index 4122d018c0..8364dcca16 100644 --- a/src/binder/bind_expression/bind_function_expression.cpp +++ b/src/binder/bind_expression/bind_function_expression.cpp @@ -154,7 +154,7 @@ std::unique_ptr ExpressionBinder::createInternalNodeIDExpression( propertyIDPerTable.insert({tableID, INVALID_PROPERTY_ID}); } return std::make_unique(LogicalType(LogicalTypeID::INTERNAL_ID), - INTERNAL_ID_SUFFIX, node, std::move(propertyIDPerTable), false /* isPrimaryKey */); + InternalKeyword::ID, node, std::move(propertyIDPerTable), false /* isPrimaryKey */); } std::shared_ptr ExpressionBinder::bindInternalIDExpression( @@ -165,7 +165,7 @@ std::shared_ptr ExpressionBinder::bindInternalIDExpression( return node.getInternalIDProperty(); } case common::LogicalTypeID::REL: { - return bindRelPropertyExpression(expression, INTERNAL_ID_SUFFIX); + return bindRelPropertyExpression(expression, InternalKeyword::ID); } default: throw NotImplementedException("ExpressionBinder::bindInternalIDExpression"); @@ -240,7 +240,7 @@ std::unique_ptr ExpressionBinder::createInternalLengthExpression( propertyIDPerTable.insert({tableID, INVALID_PROPERTY_ID}); } return std::make_unique(LogicalType(common::LogicalTypeID::INT64), - INTERNAL_LENGTH_SUFFIX, rel, std::move(propertyIDPerTable), false /* isPrimaryKey */); + InternalKeyword::LENGTH, rel, std::move(propertyIDPerTable), false /* isPrimaryKey */); } std::shared_ptr ExpressionBinder::bindRecursiveJoinLengthFunction( diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index d0e6985aea..da92718c86 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -210,7 +210,7 @@ table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplic nodeTableSchemas[srcTableID]->addFwdRelTableID(tableID); nodeTableSchemas[dstTableID]->addBwdRelTableID(tableID); auto relInternalIDProperty = - Property(INTERNAL_ID_SUFFIX, LogicalType{LogicalTypeID::INTERNAL_ID}); + Property(InternalKeyword::ID, LogicalType{LogicalTypeID::INTERNAL_ID}); properties.insert(properties.begin(), relInternalIDProperty); for (auto i = 0u; i < properties.size(); ++i) { properties[i].propertyID = i; diff --git a/src/common/types/types.cpp b/src/common/types/types.cpp index 5829cc5859..3723c62257 100644 --- a/src/common/types/types.cpp +++ b/src/common/types/types.cpp @@ -227,13 +227,13 @@ void LogicalType::setPhysicalType() { case LogicalTypeID::STRING: { physicalType = PhysicalTypeID::STRING; } break; - case LogicalTypeID::RECURSIVE_REL: case LogicalTypeID::MAP: case LogicalTypeID::VAR_LIST: { physicalType = PhysicalTypeID::VAR_LIST; } break; case LogicalTypeID::NODE: case LogicalTypeID::REL: + case LogicalTypeID::RECURSIVE_REL: case LogicalTypeID::STRUCT: { physicalType = PhysicalTypeID::STRUCT; } break; diff --git a/src/common/types/value.cpp b/src/common/types/value.cpp index 548a716008..ed9740e980 100644 --- a/src/common/types/value.cpp +++ b/src/common/types/value.cpp @@ -293,7 +293,6 @@ std::string Value::toString() const { } return result; } - case LogicalTypeID::RECURSIVE_REL: case LogicalTypeID::VAR_LIST: case LogicalTypeID::FIXED_LIST: { std::string result = "["; @@ -306,6 +305,7 @@ std::string Value::toString() const { result += "]"; return result; } + case LogicalTypeID::RECURSIVE_REL: case LogicalTypeID::STRUCT: { std::string result = "{"; auto fieldNames = StructType::getFieldNames(&dataType); diff --git a/src/common/vector/auxiliary_buffer.cpp b/src/common/vector/auxiliary_buffer.cpp index 3d28fb70a0..728d56e065 100644 --- a/src/common/vector/auxiliary_buffer.cpp +++ b/src/common/vector/auxiliary_buffer.cpp @@ -33,7 +33,6 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) { while (size + listSize > capacity) { capacity *= 2; } - auto numBytesPerElement = dataVector->getNumBytesPerValue(); if (needResizeDataVector) { resizeDataVector(dataVector.get()); } diff --git a/src/common/vector/value_vector.cpp b/src/common/vector/value_vector.cpp index 35bdcebd8b..2e851414a7 100644 --- a/src/common/vector/value_vector.cpp +++ b/src/common/vector/value_vector.cpp @@ -66,6 +66,14 @@ void ValueVector::resetAuxiliaryBuffer() { reinterpret_cast(auxiliaryBuffer.get())->resetSize(); return; } + case PhysicalTypeID::STRUCT: { + auto structAuxiliaryBuffer = + reinterpret_cast(auxiliaryBuffer.get()); + for (auto& vector : structAuxiliaryBuffer->getChildrenVectors()) { + vector->resetAuxiliaryBuffer(); + } + return; + } default: return; } diff --git a/src/include/binder/expression/property_expression.h b/src/include/binder/expression/property_expression.h index 17ad802ca9..7e96cb08cb 100644 --- a/src/include/binder/expression/property_expression.h +++ b/src/include/binder/expression/property_expression.h @@ -38,7 +38,7 @@ class PropertyExpression : public Expression { return propertyIDPerTable.at(tableID); } - inline bool isInternalID() const { return getPropertyName() == common::INTERNAL_ID_SUFFIX; } + inline bool isInternalID() const { return getPropertyName() == common::InternalKeyword::ID; } inline std::unique_ptr copy() const override { return make_unique(*this); diff --git a/src/include/binder/expression/rel_expression.h b/src/include/binder/expression/rel_expression.h index b47fb45b91..cb193a2396 100644 --- a/src/include/binder/expression/rel_expression.h +++ b/src/include/binder/expression/rel_expression.h @@ -49,7 +49,7 @@ class RelExpression : public NodeOrRelExpression { inline RelDirectionType getDirectionType() const { return directionType; } inline std::shared_ptr getInternalIDProperty() const { - return getPropertyExpression(common::INTERNAL_ID_SUFFIX); + return getPropertyExpression(common::InternalKeyword::ID); } inline void setRecursiveInfo(std::unique_ptr recursiveInfo_) { diff --git a/src/include/catalog/catalog_structs.h b/src/include/catalog/catalog_structs.h index 739b9495bb..1b12ee9d5a 100644 --- a/src/include/catalog/catalog_structs.h +++ b/src/include/catalog/catalog_structs.h @@ -48,7 +48,7 @@ struct TableSchema { virtual ~TableSchema() = default; static inline bool isReservedPropertyName(const std::string& propertyName) { - return propertyName == common::INTERNAL_ID_SUFFIX; + return propertyName == common::InternalKeyword::ID; } inline uint32_t getNumProperties() const { return properties.size(); } @@ -132,7 +132,7 @@ struct RelTableSchema : TableSchema { inline Property& getRelIDDefinition() { for (auto& property : properties) { - if (property.name == common::INTERNAL_ID_SUFFIX) { + if (property.name == common::InternalKeyword::ID) { return property; } } diff --git a/src/include/common/constants.h b/src/include/common/constants.h index 0c348abd13..0301926500 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -19,8 +19,12 @@ constexpr uint64_t THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS = 500; constexpr uint64_t DEFAULT_CHECKPOINT_WAIT_TIMEOUT_FOR_TRANSACTIONS_TO_LEAVE_IN_MICROS = 5000000; -const std::string INTERNAL_ID_SUFFIX = "_id"; -const std::string INTERNAL_LENGTH_SUFFIX = "_length"; +struct InternalKeyword { + static constexpr char ID[] = "_id"; + static constexpr char LENGTH[] = "_length"; + static constexpr char NODES[] = "_nodes"; + static constexpr char RELS[] = "_rels"; +}; enum PageSizeClass : uint8_t { PAGE_4KB = 0, diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 0c4a461499..a3f73ccd04 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -278,7 +278,7 @@ struct StructType { return structTypeInfo->getStructFields(); } - static inline struct_field_idx_t getFieldIdx(const LogicalType* type, std::string& key) { + static inline struct_field_idx_t getFieldIdx(const LogicalType* type, const std::string& key) { assert(type->getPhysicalType() == PhysicalTypeID::STRUCT); auto structTypeInfo = reinterpret_cast(type->extraTypeInfo.get()); return structTypeInfo->getStructFieldIdx(key); diff --git a/src/include/processor/operator/recursive_extend/frontier_scanner.h b/src/include/processor/operator/recursive_extend/frontier_scanner.h index fd2dc83ed0..4980365dbb 100644 --- a/src/include/processor/operator/recursive_extend/frontier_scanner.h +++ b/src/include/processor/operator/recursive_extend/frontier_scanner.h @@ -7,6 +7,7 @@ namespace kuzu { namespace processor { +struct RecursiveJoinVectors; /* * BaseFrontierScanner scans all dst nodes from k'th frontier. To identify the * destination nodes in the k'th frontier, we use a semi mask that marks the destination nodes (or @@ -19,22 +20,20 @@ class BaseFrontierScanner { currentDstNodeID{common::INVALID_OFFSET, common::INVALID_TABLE_ID} {} virtual ~BaseFrontierScanner() = default; - size_t scan(common::ValueVector* pathVector, common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos, - common::sel_t& dataVectorPos); + size_t scan(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos); void resetState(const BaseBFSState& bfsState); protected: virtual void initScanFromDstOffset() = 0; - virtual void scanFromDstOffset(common::ValueVector* pathVector, - common::ValueVector* dstNodeIDVector, common::ValueVector* pathLengthVector, - common::sel_t& offsetVectorPos, common::sel_t& dataVectorPos) = 0; + virtual void scanFromDstOffset(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) = 0; inline void writeDstNodeOffsetAndLength(common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos) { - dstNodeIDVector->setValue(offsetVectorPos, currentDstNodeID); - pathLengthVector->setValue(offsetVectorPos, (int64_t)k); + common::ValueVector* pathLengthVector, common::sel_t& vectorPos) { + dstNodeIDVector->setValue(vectorPos, currentDstNodeID); + pathLengthVector->setValue(vectorPos, (int64_t)k); } protected: @@ -57,13 +56,8 @@ class DstNodeScanner : public BaseFrontierScanner { private: inline void initScanFromDstOffset() final {} - inline void scanFromDstOffset(common::ValueVector* pathVector, - common::ValueVector* dstNodeIDVector, common::ValueVector* pathLengthVector, - common::sel_t& offsetVectorPos, common::sel_t& dataVectorPos) final { - assert(offsetVectorPos < common::DEFAULT_VECTOR_CAPACITY); - writeDstNodeOffsetAndLength(dstNodeIDVector, pathLengthVector, offsetVectorPos); - offsetVectorPos++; - } + void scanFromDstOffset(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) final; }; /* @@ -78,7 +72,6 @@ class PathScanner : public BaseFrontierScanner { public: PathScanner(TargetDstNodes* targetDstNodes, size_t k) : BaseFrontierScanner{targetDstNodes, k} { - listEntrySize = 2 * k + 1; nodeIDs.resize(k + 1); relIDs.resize(k + 1); } @@ -89,20 +82,17 @@ class PathScanner : public BaseFrontierScanner { initDfs(std::make_pair(currentDstNodeID, dummyRelID), k); } // Scan current stacks until exhausted or vector is filled up. - void scanFromDstOffset(common::ValueVector* pathVector, common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos, - common::sel_t& dataVectorPos) final; + void scanFromDstOffset(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) final; // Initialize stacks for given offset. void initDfs(const frontier::node_rel_id_t& nodeAndRelID, size_t currentDepth); - void writePathToVector(common::ValueVector* pathVector, common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos, - common::sel_t& dataVectorPos); + void writePathToVector(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos); private: // DFS states - size_t listEntrySize; std::vector nodeIDs; std::vector relIDs; std::stack nbrsStack; @@ -120,9 +110,8 @@ class DstNodeWithMultiplicityScanner : public BaseFrontierScanner { private: inline void initScanFromDstOffset() final {} - void scanFromDstOffset(common::ValueVector* pathVector, common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos, - common::sel_t& dataVectorPos) final; + void scanFromDstOffset(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) final; }; /* @@ -142,9 +131,8 @@ struct FrontiersScanner { explicit FrontiersScanner(std::vector> scanners) : scanners{std::move(scanners)}, cursor{0} {} - void scan(common::ValueVector* pathVector, common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos, - common::sel_t& dataVectorPos); + void scan(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos); inline void resetState(const BaseBFSState& bfsState) { cursor = 0; diff --git a/src/include/processor/operator/recursive_extend/recursive_join.h b/src/include/processor/operator/recursive_extend/recursive_join.h index f0588b1b29..ed9ddc6d2c 100644 --- a/src/include/processor/operator/recursive_extend/recursive_join.h +++ b/src/include/processor/operator/recursive_extend/recursive_join.h @@ -63,6 +63,18 @@ struct RecursiveJoinDataInfo { } }; +struct RecursiveJoinVectors { + common::ValueVector* srcNodeIDVector = nullptr; + common::ValueVector* dstNodeIDVector = nullptr; + common::ValueVector* pathLengthVector = nullptr; + common::ValueVector* pathVector = nullptr; + common::ValueVector* pathNodeIDVector = nullptr; + common::ValueVector* pathRelIDVector = nullptr; + + common::ValueVector* recursiveEdgeIDVector = nullptr; + common::ValueVector* recursiveDstNodeIDVector = nullptr; +}; + class RecursiveJoin : public PhysicalOperator { public: RecursiveJoin(uint8_t lowerBound, uint8_t upperBound, common::QueryRelType queryRelType, @@ -100,7 +112,7 @@ class RecursiveJoin : public PhysicalOperator { void updateVisitedNodes(common::nodeID_t boundNodeID); -protected: +private: uint8_t lowerBound; uint8_t upperBound; common::QueryRelType queryRelType; @@ -114,17 +126,7 @@ class RecursiveJoin : public PhysicalOperator { std::unique_ptr recursiveRoot; ScanFrontier* scanFrontier; - // Vectors - std::vector vectorsToScan; - common::ValueVector* srcNodeIDVector; - common::ValueVector* dstNodeIDVector; - common::ValueVector* pathLengthVector; - common::ValueVector* pathVector; - - // temporary recursive join result. - common::ValueVector* recursiveEdgeIDVector; - common::ValueVector* recursiveDstNodeIDVector; - + std::unique_ptr vectors; std::unique_ptr bfsState; std::unique_ptr frontiersScanner; std::unique_ptr targetDstNodes; diff --git a/src/processor/operator/recursive_extend/frontier_scanner.cpp b/src/processor/operator/recursive_extend/frontier_scanner.cpp index 027f83c0bd..8368f2aa6b 100644 --- a/src/processor/operator/recursive_extend/frontier_scanner.cpp +++ b/src/processor/operator/recursive_extend/frontier_scanner.cpp @@ -1,23 +1,23 @@ #include "processor/operator/recursive_extend/frontier_scanner.h" +#include "processor/operator/recursive_extend/recursive_join.h" + namespace kuzu { namespace processor { -size_t BaseFrontierScanner::scan(common::ValueVector* pathVector, - common::ValueVector* dstNodeIDVector, common::ValueVector* pathLengthVector, - common::sel_t& offsetVectorPos, common::sel_t& dataVectorPos) { +size_t BaseFrontierScanner::scan(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) { if (k >= frontiers.size()) { // BFS terminate before current depth. No need to scan. return 0; } - auto offsetVectorPosBeforeScanning = offsetVectorPos; + auto vectorPosBeforeScanning = vectorPos; auto lastFrontier = frontiers[k]; while (true) { if (currentDstNodeID.offset != common::INVALID_OFFSET) { - scanFromDstOffset( - pathVector, dstNodeIDVector, pathLengthVector, offsetVectorPos, dataVectorPos); + scanFromDstOffset(vectors, vectorPos, nodeIDDataVectorPos, relIDDataVectorPos); } - if (offsetVectorPos == common::DEFAULT_VECTOR_CAPACITY) { + if (vectorPos == common::DEFAULT_VECTOR_CAPACITY) { break; } if (lastFrontierCursor == lastFrontier->nodeIDs.size()) { @@ -33,7 +33,7 @@ size_t BaseFrontierScanner::scan(common::ValueVector* pathVector, } initScanFromDstOffset(); } - return offsetVectorPos - offsetVectorPosBeforeScanning; + return vectorPos - vectorPosBeforeScanning; } void BaseFrontierScanner::resetState(const BaseBFSState& bfsState) { @@ -45,9 +45,15 @@ void BaseFrontierScanner::resetState(const BaseBFSState& bfsState) { } } -void PathScanner::scanFromDstOffset(common::ValueVector* pathVector, - common::ValueVector* dstNodeIDVector, common::ValueVector* pathLengthVector, - common::sel_t& offsetVectorPos, common::sel_t& dataVectorPos) { +void DstNodeScanner::scanFromDstOffset(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) { + assert(vectorPos < common::DEFAULT_VECTOR_CAPACITY); + writeDstNodeOffsetAndLength(vectors->dstNodeIDVector, vectors->pathLengthVector, vectorPos); + vectorPos++; +} + +void PathScanner::scanFromDstOffset(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) { auto level = 0; while (!nbrsStack.empty()) { auto& cursor = cursorStack.top(); @@ -57,9 +63,8 @@ void PathScanner::scanFromDstOffset(common::ValueVector* pathVector, nodeIDs[level] = nbr.first; relIDs[level] = nbr.second; if (level == 0) { // Found a new nbr at level 0. Found a new path. - writePathToVector( - pathVector, dstNodeIDVector, pathLengthVector, offsetVectorPos, dataVectorPos); - if (offsetVectorPos == common::DEFAULT_VECTOR_CAPACITY) { + writePathToVector(vectors, vectorPos, nodeIDDataVectorPos, relIDDataVectorPos); + if (vectorPos == common::DEFAULT_VECTOR_CAPACITY) { return; } continue; @@ -89,39 +94,40 @@ void PathScanner::initDfs(const frontier::node_rel_id_t& nodeAndRelID, size_t cu initDfs(nbrs->at(0), currentDepth - 1); } -void PathScanner::writePathToVector(common::ValueVector* pathVector, - common::ValueVector* dstNodeIDVector, common::ValueVector* pathLengthVector, - common::sel_t& offsetVectorPos, common::sel_t& dataVectorPos) { - assert(offsetVectorPos < common::DEFAULT_VECTOR_CAPACITY); - auto listEntry = common::ListVector::addList(pathVector, listEntrySize); - pathVector->setValue(offsetVectorPos, listEntry); - writeDstNodeOffsetAndLength(dstNodeIDVector, pathLengthVector, offsetVectorPos); - offsetVectorPos++; - auto pathDataVector = common::ListVector::getDataVector(pathVector); +void PathScanner::writePathToVector(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) { + assert(vectorPos < common::DEFAULT_VECTOR_CAPACITY); + auto nodeIDEntry = common::ListVector::addList(vectors->pathNodeIDVector, k + 1); + auto relIDEntry = common::ListVector::addList(vectors->pathRelIDVector, k); + vectors->pathNodeIDVector->setValue(vectorPos, nodeIDEntry); + vectors->pathRelIDVector->setValue(vectorPos, relIDEntry); + writeDstNodeOffsetAndLength(vectors->dstNodeIDVector, vectors->pathLengthVector, vectorPos); + vectorPos++; + auto nodeIDDataVector = common::ListVector::getDataVector(vectors->pathNodeIDVector); + auto relIDDataVector = common::ListVector::getDataVector(vectors->pathRelIDVector); for (auto i = 0u; i < k; ++i) { - pathDataVector->setValue(dataVectorPos++, nodeIDs[i]); - pathDataVector->setValue(dataVectorPos++, relIDs[i]); + nodeIDDataVector->setValue(nodeIDDataVectorPos++, nodeIDs[i]); + relIDDataVector->setValue(relIDDataVectorPos++, relIDs[i]); } - pathDataVector->setValue(dataVectorPos++, nodeIDs[k]); + nodeIDDataVector->setValue(nodeIDDataVectorPos++, nodeIDs[k]); } -void DstNodeWithMultiplicityScanner::scanFromDstOffset(common::ValueVector* pathVector, - common::ValueVector* dstNodeIDVector, common::ValueVector* pathLengthVector, - common::sel_t& offsetVectorPos, common::sel_t& dataVectorPos) { +void DstNodeWithMultiplicityScanner::scanFromDstOffset(RecursiveJoinVectors* vectors, + common::sel_t& vectorPos, common::sel_t& nodeIDDataVectorPos, + common::sel_t& relIDDataVectorPos) { auto& multiplicity = frontiers[k]->nodeIDToMultiplicity.at(currentDstNodeID); - while (multiplicity > 0 && offsetVectorPos < common::DEFAULT_VECTOR_CAPACITY) { - writeDstNodeOffsetAndLength(dstNodeIDVector, pathLengthVector, offsetVectorPos); - offsetVectorPos++; + while (multiplicity > 0 && vectorPos < common::DEFAULT_VECTOR_CAPACITY) { + writeDstNodeOffsetAndLength(vectors->dstNodeIDVector, vectors->pathLengthVector, vectorPos); + vectorPos++; multiplicity--; } } -void FrontiersScanner::scan(common::ValueVector* pathVector, common::ValueVector* dstNodeIDVector, - common::ValueVector* pathLengthVector, common::sel_t& offsetVectorPos, - common::sel_t& dataVectorPos) { - while (offsetVectorPos < common::DEFAULT_VECTOR_CAPACITY && cursor < scanners.size()) { - if (scanners[cursor]->scan(pathVector, dstNodeIDVector, pathLengthVector, offsetVectorPos, - dataVectorPos) == 0) { +void FrontiersScanner::scan(RecursiveJoinVectors* vectors, common::sel_t& vectorPos, + common::sel_t& nodeIDDataVectorPos, common::sel_t& relIDDataVectorPos) { + while (vectorPos < common::DEFAULT_VECTOR_CAPACITY && cursor < scanners.size()) { + if (scanners[cursor]->scan(vectors, vectorPos, nodeIDDataVectorPos, relIDDataVectorPos) == + 0) { cursor++; } } diff --git a/src/processor/operator/recursive_extend/recursive_join.cpp b/src/processor/operator/recursive_extend/recursive_join.cpp index 0e160663c1..f6ef6f92ea 100644 --- a/src/processor/operator/recursive_extend/recursive_join.cpp +++ b/src/processor/operator/recursive_extend/recursive_join.cpp @@ -10,15 +10,16 @@ namespace processor { void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) { populateTargetDstNodes(); - srcNodeIDVector = resultSet->getValueVector(dataInfo->srcNodePos).get(); - dstNodeIDVector = resultSet->getValueVector(dataInfo->dstNodePos).get(); - pathLengthVector = resultSet->getValueVector(dataInfo->pathLengthPos).get(); + vectors = std::make_unique(); + vectors->srcNodeIDVector = resultSet->getValueVector(dataInfo->srcNodePos).get(); + vectors->dstNodeIDVector = resultSet->getValueVector(dataInfo->dstNodePos).get(); + vectors->pathLengthVector = resultSet->getValueVector(dataInfo->pathLengthPos).get(); std::vector> scanners; switch (queryRelType) { case common::QueryRelType::VARIABLE_LENGTH: { switch (joinType) { case planner::RecursiveJoinType::TRACK_PATH: { - pathVector = resultSet->getValueVector(dataInfo->pathPos).get(); + vectors->pathVector = resultSet->getValueVector(dataInfo->pathPos).get(); bfsState = std::make_unique>( upperBound, targetDstNodes.get()); for (auto i = lowerBound; i <= upperBound; ++i) { @@ -26,7 +27,6 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte } } break; case planner::RecursiveJoinType::TRACK_NONE: { - pathVector = nullptr; bfsState = std::make_unique>( upperBound, targetDstNodes.get()); for (auto i = lowerBound; i <= upperBound; ++i) { @@ -41,7 +41,7 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte case common::QueryRelType::SHORTEST: { switch (joinType) { case planner::RecursiveJoinType::TRACK_PATH: { - pathVector = resultSet->getValueVector(dataInfo->pathPos).get(); + vectors->pathVector = resultSet->getValueVector(dataInfo->pathPos).get(); bfsState = std::make_unique>( upperBound, targetDstNodes.get()); for (auto i = lowerBound; i <= upperBound; ++i) { @@ -49,7 +49,6 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte } } break; case planner::RecursiveJoinType::TRACK_NONE: { - pathVector = nullptr; bfsState = std::make_unique>( upperBound, targetDstNodes.get()); for (auto i = lowerBound; i <= upperBound; ++i) { @@ -63,7 +62,7 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte case common::QueryRelType::ALL_SHORTEST: { switch (joinType) { case planner::RecursiveJoinType::TRACK_PATH: { - pathVector = resultSet->getValueVector(dataInfo->pathPos).get(); + vectors->pathVector = resultSet->getValueVector(dataInfo->pathPos).get(); bfsState = std::make_unique>( upperBound, targetDstNodes.get()); for (auto i = lowerBound; i <= upperBound; ++i) { @@ -71,7 +70,6 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte } } break; case planner::RecursiveJoinType::TRACK_NONE: { - pathVector = nullptr; bfsState = std::make_unique>( upperBound, targetDstNodes.get()); for (auto i = lowerBound; i <= upperBound; ++i) { @@ -86,6 +84,17 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte default: throw common::NotImplementedException("BaseRecursiveJoin::initLocalStateInternal"); } + if (vectors->pathVector != nullptr) { + assert(vectors->pathVector->dataType.getPhysicalType() == common::PhysicalTypeID::STRUCT); + auto nodeIDFieldIdx = common::StructType::getFieldIdx( + &vectors->pathVector->dataType, common::InternalKeyword::NODES); + auto relIDFieldIdx = common::StructType::getFieldIdx( + &vectors->pathVector->dataType, common::InternalKeyword::RELS); + vectors->pathNodeIDVector = + common::StructVector::getFieldVector(vectors->pathVector, nodeIDFieldIdx).get(); + vectors->pathRelIDVector = + common::StructVector::getFieldVector(vectors->pathVector, relIDFieldIdx).get(); + } frontiersScanner = std::make_unique(std::move(scanners)); initLocalRecursivePlan(context); } @@ -120,22 +129,23 @@ bool RecursiveJoin::getNextTuplesInternal(ExecutionContext* context) { bool RecursiveJoin::scanOutput() { common::sel_t offsetVectorSize = 0u; - common::sel_t dataVectorSize = 0u; - if (pathVector != nullptr) { - pathVector->resetAuxiliaryBuffer(); + common::sel_t nodeIDDataVectorSize = 0u; + common::sel_t relIDDataVectorSize = 0u; + if (vectors->pathVector != nullptr) { + vectors->pathVector->resetAuxiliaryBuffer(); } frontiersScanner->scan( - pathVector, dstNodeIDVector, pathLengthVector, offsetVectorSize, dataVectorSize); + vectors.get(), offsetVectorSize, nodeIDDataVectorSize, relIDDataVectorSize); if (offsetVectorSize == 0) { return false; } - dstNodeIDVector->state->initOriginalAndSelectedSize(offsetVectorSize); + vectors->dstNodeIDVector->state->initOriginalAndSelectedSize(offsetVectorSize); return true; } void RecursiveJoin::computeBFS(ExecutionContext* context) { - auto nodeID = srcNodeIDVector->getValue( - srcNodeIDVector->state->selVector->selectedPositions[0]); + auto nodeID = vectors->srcNodeIDVector->getValue( + vectors->srcNodeIDVector->state->selVector->selectedPositions[0]); bfsState->markSrc(nodeID); while (!bfsState->isComplete()) { auto boundNodeID = bfsState->getNextNodeID(); @@ -154,10 +164,10 @@ void RecursiveJoin::computeBFS(ExecutionContext* context) { void RecursiveJoin::updateVisitedNodes(common::nodeID_t boundNodeID) { auto boundNodeMultiplicity = bfsState->getMultiplicity(boundNodeID); - for (auto i = 0u; i < recursiveDstNodeIDVector->state->selVector->selectedSize; ++i) { - auto pos = recursiveDstNodeIDVector->state->selVector->selectedPositions[i]; - auto nbrNodeID = recursiveDstNodeIDVector->getValue(pos); - auto edgeID = recursiveEdgeIDVector->getValue(pos); + for (auto i = 0u; i < vectors->recursiveDstNodeIDVector->state->selVector->selectedSize; ++i) { + auto pos = vectors->recursiveDstNodeIDVector->state->selVector->selectedPositions[i]; + auto nbrNodeID = vectors->recursiveDstNodeIDVector->getValue(pos); + auto edgeID = vectors->recursiveEdgeIDVector->getValue(pos); bfsState->markVisited(boundNodeID, nbrNodeID, edgeID, boundNodeMultiplicity); } } @@ -171,9 +181,10 @@ void RecursiveJoin::initLocalRecursivePlan(ExecutionContext* context) { scanFrontier = (ScanFrontier*)op; localResultSet = std::make_unique( dataInfo->localResultSetDescriptor.get(), context->memoryManager); - recursiveDstNodeIDVector = + vectors->recursiveDstNodeIDVector = localResultSet->getValueVector(dataInfo->recursiveDstNodeIDPos).get(); - recursiveEdgeIDVector = localResultSet->getValueVector(dataInfo->recursiveEdgeIDPos).get(); + vectors->recursiveEdgeIDVector = + localResultSet->getValueVector(dataInfo->recursiveEdgeIDPos).get(); recursiveRoot->initLocalState(localResultSet.get(), context); } diff --git a/src/processor/result/factorized_table.cpp b/src/processor/result/factorized_table.cpp index 7799457908..94b26f6156 100644 --- a/src/processor/result/factorized_table.cpp +++ b/src/processor/result/factorized_table.cpp @@ -144,15 +144,18 @@ void FactorizedTable::lookup(std::vector& vectors, uint64_t numTuplesToRead) const { assert(vectors.size() == colIdxesToScan.size()); for (auto i = 0u; i < colIdxesToScan.size(); i++) { + auto vector = vectors[i]; + // TODO(Xiyang/Ziyi): we should set up a rule about when to reset. Should it be in operator? + vector->resetAuxiliaryBuffer(); ft_col_idx_t colIdx = colIdxesToScan[i]; if (tableSchema->getColumn(colIdx)->isFlat()) { - assert(!(vectors[i]->state->isFlat() && numTuplesToRead > 1)); - readFlatCol(tuplesToRead + startPos, colIdx, *vectors[i], numTuplesToRead); + assert(!(vector->state->isFlat() && numTuplesToRead > 1)); + readFlatCol(tuplesToRead + startPos, colIdx, *vector, numTuplesToRead); } else { // If the caller wants to read an unflat column from factorizedTable, the vector // must be unflat and the numTuplesToScan should be 1. - assert(!vectors[i]->state->isFlat() && numTuplesToRead == 1); - readUnflatCol(tuplesToRead + startPos, colIdx, *vectors[i]); + assert(!vector->state->isFlat() && numTuplesToRead == 1); + readUnflatCol(tuplesToRead + startPos, colIdx, *vector); } } } @@ -586,7 +589,7 @@ void FactorizedTable::readUnflatCol( auto val = vectorOverflowValue.value; for (auto i = 0u; i < vectorOverflowValue.numElements; i++) { ValueVectorUtils::copyNonNullDataWithSameTypeIntoPos(vector, i, val); - val += vector.getNumBytesPerValue(); + val += getDataTypeSize(vector.dataType); } } else { for (auto i = 0u; i < vectorOverflowValue.numElements; i++) { diff --git a/test/test_files/demo_db/demo_db.test b/test/test_files/demo_db/demo_db.test index cc648f0026..0f04911e2b 100644 --- a/test/test_files/demo_db/demo_db.test +++ b/test/test_files/demo_db/demo_db.test @@ -114,10 +114,10 @@ Kitchener|2 -NAME ReturnVarLen -QUERY MATCH (a:User)-[e:Follows*1..2]->(b:User) WHERE a.name = 'Adam' RETURN b.name, e; ---- 4 -Karissa|[0:0,2:0,0:1] -Zhang|[0:0,2:0,0:1,2:2,0:2] -Zhang|[0:0,2:1,0:2] -Noura|[0:0,2:1,0:2,2:3,0:3] +Karissa|{_NODES: [0:0,0:1], _RELS: [2:0]} +Noura|{_NODES: [0:0,0:2,0:3], _RELS: [2:1,2:3]} +Zhang|{_NODES: [0:0,0:1,0:2], _RELS: [2:0,2:2]} +Zhang|{_NODES: [0:0,0:2], _RELS: [2:1]} -NAME ShortestPath -QUERY MATCH (a:User)-[e* SHORTEST 1..4]->(b:City) WHERE a.name = 'Adam' RETURN b.name, length(e) AS length; diff --git a/test/test_files/shortest_path/bfs_sssp.test b/test/test_files/shortest_path/bfs_sssp.test index fad6208098..f5ae5f3b4e 100644 --- a/test/test_files/shortest_path/bfs_sssp.test +++ b/test/test_files/shortest_path/bfs_sssp.test @@ -8,23 +8,24 @@ -NAME SingleSourceAllDestinationsSSP -QUERY MATCH (a:person)-[r:knows* SHORTEST 1..30]->(b:person) WHERE a.fName = 'Alice' RETURN a.fName, b.fName, r ---- 7 -Alice|Bob|[0:0,1:0,0:1] -Alice|Carol|[0:0,1:1,0:2] -Alice|Dan|[0:0,1:2,0:3] -Alice|Elizabeth|[0:0,1:0,0:1,1:6,0:4] -Alice|Farooq|[0:0,1:0,0:1,1:6,0:4,1:13,0:5] -Alice|Greg|[0:0,1:0,0:1,1:6,0:4,1:14,0:6] -Alice|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|[0:0,1:0,0:1,1:6,0:4,1:15,0:7] +Alice|Bob|{_NODES: [0:0,0:1], _RELS: [1:0]} +Alice|Carol|{_NODES: [0:0,0:2], _RELS: [1:1]} +Alice|Dan|{_NODES: [0:0,0:3], _RELS: [1:2]} +Alice|Elizabeth|{_NODES: [0:0,0:1,0:4], _RELS: [1:0,1:6]} +Alice|Farooq|{_NODES: [0:0,0:1,0:4,0:5], _RELS: [1:0,1:6,1:13]} +Alice|Greg|{_NODES: [0:0,0:1,0:4,0:6], _RELS: [1:0,1:6,1:14]} +Alice|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|{_NODES: [0:0,0:1,0:4,0:7], _RELS: [1:0,1:6,1:15]} + -NAME AllSourcesSingleDestinationQuery -QUERY MATCH (a:person)-[r:knows* SHORTEST 1..30]->(b:person) WHERE b.fName = 'Alice' RETURN a.fName, b.fName, r ---- 6 -Bob|Alice|[0:0,1:3,0:1] -Carol|Alice|[0:0,1:7,0:2] -Dan|Alice|[0:0,1:10,0:3] -Elizabeth|Alice|[0:0,1:20,0:7,1:15,0:4] -Farooq|Alice|[0:0,1:20,0:7,1:17,0:5] -Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|Alice|[0:0,1:20,0:7] +Bob|Alice|{_NODES: [0:0,0:1], _RELS: [1:3]} +Carol|Alice|{_NODES: [0:0,0:2], _RELS: [1:7]} +Dan|Alice|{_NODES: [0:0,0:3], _RELS: [1:10]} +Elizabeth|Alice|{_NODES: [0:0,0:7,0:4], _RELS: [1:20,1:15]} +Farooq|Alice|{_NODES: [0:0,0:7,0:5], _RELS: [1:20,1:17]} +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|Alice|{_NODES: [0:0,0:7], _RELS: [1:20]} -NAME SingleSourceWithAllProperties -QUERY MATCH (a:person)-[r:knows* SHORTEST 1..30]->(b:person) WHERE a.fName = 'Alice' RETURN length(r), b, a @@ -45,11 +46,11 @@ Alice|Bob|1 -NAME SingleSourceAllDestinations2 -QUERY MATCH (a:person)-[r:knows* SHORTEST 1..2]->(b:person) WHERE a.fName = 'Elizabeth' RETURN a.fName, b.fName, r ---- 5 -Elizabeth|Alice|[0:4,1:15,0:7,1:20,0:0] -Elizabeth|Dan|[0:4,1:15,0:7,1:21,0:3] -Elizabeth|Farooq|[0:4,1:13,0:5] -Elizabeth|Greg|[0:4,1:14,0:6] -Elizabeth|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|[0:4,1:15,0:7] +Elizabeth|Alice|{_NODES: [0:4,0:7,0:0], _RELS: [1:15,1:20]} +Elizabeth|Dan|{_NODES: [0:4,0:7,0:3], _RELS: [1:15,1:21]} +Elizabeth|Farooq|{_NODES: [0:4,0:5], _RELS: [1:13]} +Elizabeth|Greg|{_NODES: [0:4,0:6], _RELS: [1:14]} +Elizabeth|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|{_NODES: [0:4,0:7], _RELS: [1:15]} -NAME SingleSourceUnreachableDestination -QUERY MATCH (a:person)-[r:knows* SHORTEST 1..30]->(b:person) WHERE a.fName = 'Alice' AND b.fName = 'Alice11' RETURN a.fName, b.fName, r diff --git a/test/test_files/tinysnb/var_length_extend/multi_label.test b/test/test_files/tinysnb/var_length_extend/multi_label.test index 1defe8617f..d14346e7af 100644 --- a/test/test_files/tinysnb/var_length_extend/multi_label.test +++ b/test/test_files/tinysnb/var_length_extend/multi_label.test @@ -18,32 +18,34 @@ -NAME NodeUndirectedTest2 -QUERY MATCH (a)-[e:knows|:studyAt|:workAt*1..2]-(b) WHERE a.ID=7 RETURN e, label(b) ---- 8 -[0:4,3:12,0:5,3:12,0:4]|person -[0:4,3:12,0:5,4:2,1:0]|organisation -[0:4,3:12,0:5]|person -[0:4,3:13,0:6,3:13,0:4]|person -[0:4,3:13,0:6]|person -[0:4,5:2,1:2,5:1,0:3]|person -[0:4,5:2,1:2,5:2,0:4]|person -[0:4,5:2,1:2]|organisation +{_NODES: [0:4,0:5,0:4], _RELS: [3:12,3:12]}|person +{_NODES: [0:4,0:5,1:0], _RELS: [3:12,4:2]}|organisation +{_NODES: [0:4,0:5], _RELS: [3:12]}|person +{_NODES: [0:4,0:6,0:4], _RELS: [3:13,3:13]}|person +{_NODES: [0:4,0:6], _RELS: [3:13]}|person +{_NODES: [0:4,1:2,0:3], _RELS: [5:2,5:1]}|person +{_NODES: [0:4,1:2,0:4], _RELS: [5:2,5:2]}|person +{_NODES: [0:4,1:2], _RELS: [5:2]}|organisation + -NAME RelMultiLabelTest -QUERY MATCH (a:person)-[e*1..2]->(b:organisation) WHERE a.fName = 'Alice' RETURN b.ID, e ---- 6 -1|[0:0,3:0,0:1,4:1,1:0] -1|[0:0,4:0,1:0] -1|[0:0,6:0,0:1,4:1,1:0] -1|[0:0,7:0,0:1,4:1,1:0] -4|[0:0,3:1,0:2,5:0,1:1] -6|[0:0,3:2,0:3,5:1,1:2] +1|{_NODES: [0:0,0:1,1:0], _RELS: [3:0,4:1]} +1|{_NODES: [0:0,0:1,1:0], _RELS: [6:0,4:1]} +1|{_NODES: [0:0,0:1,1:0], _RELS: [7:0,4:1]} +1|{_NODES: [0:0,1:0], _RELS: [4:0]} +4|{_NODES: [0:0,0:2,1:1], _RELS: [3:1,5:0]} +6|{_NODES: [0:0,0:3,1:2], _RELS: [3:2,5:1]} -NAME MixMultiLabelTest2 -QUERY MATCH (a:person)-[e:meets|:marries|:studyAt*2..2]->(b) WHERE a.fName = 'Alice' RETURN b.ID, e ---- 4 -1|[0:0,6:0,0:1,4:1,1:0] -1|[0:0,7:0,0:1,4:1,1:0] -5|[0:0,6:0,0:1,6:1,0:3] -5|[0:0,7:0,0:1,6:1,0:3] +1|{_NODES: [0:0,0:1,1:0], _RELS: [6:0,4:1]} +1|{_NODES: [0:0,0:1,1:0], _RELS: [7:0,4:1]} +5|{_NODES: [0:0,0:1,0:3], _RELS: [6:0,6:1]} +5|{_NODES: [0:0,0:1,0:3], _RELS: [7:0,6:1]} + -NAME MixMultiLabelTest3 -QUERY MATCH (a:person)-[e:meets|:marries|:studyAt*2..2]->(b) WHERE a.fName = 'Alice' AND b.ID < 5 RETURN COUNT(*) diff --git a/test/test_files/tinysnb/var_length_extend/n_n.test b/test/test_files/tinysnb/var_length_extend/n_n.test index 1b6717b07a..d2d6284cc2 100644 --- a/test/test_files/tinysnb/var_length_extend/n_n.test +++ b/test/test_files/tinysnb/var_length_extend/n_n.test @@ -45,15 +45,15 @@ Greg -NAME KnowsOneToTwoHopTest -QUERY MATCH (a:person)-[e:knows*1..2]->(b:person) WHERE a.fName='Alice' RETURN e ---- 12 -[0:0,3:0,0:1,3:3,0:0] -[0:0,3:0,0:1,3:4,0:2] -[0:0,3:0,0:1,3:5,0:3] -[0:0,3:0,0:1] -[0:0,3:1,0:2,3:6,0:0] -[0:0,3:1,0:2,3:7,0:1] -[0:0,3:1,0:2,3:8,0:3] -[0:0,3:1,0:2] -[0:0,3:2,0:3,3:9,0:0] -[0:0,3:2,0:3,3:10,0:1] -[0:0,3:2,0:3,3:11,0:2] -[0:0,3:2,0:3] +{_NODES: [0:0,0:1,0:0], _RELS: [3:0,3:3]} +{_NODES: [0:0,0:1,0:2], _RELS: [3:0,3:4]} +{_NODES: [0:0,0:1,0:3], _RELS: [3:0,3:5]} +{_NODES: [0:0,0:1], _RELS: [3:0]} +{_NODES: [0:0,0:2,0:0], _RELS: [3:1,3:6]} +{_NODES: [0:0,0:2,0:1], _RELS: [3:1,3:7]} +{_NODES: [0:0,0:2,0:3], _RELS: [3:1,3:8]} +{_NODES: [0:0,0:2], _RELS: [3:1]} +{_NODES: [0:0,0:3,0:0], _RELS: [3:2,3:9]} +{_NODES: [0:0,0:3,0:1], _RELS: [3:2,3:10]} +{_NODES: [0:0,0:3,0:2], _RELS: [3:2,3:11]} +{_NODES: [0:0,0:3], _RELS: [3:2]}