From e8e02836673478eeeeac032768bcf38570e7b5a6 Mon Sep 17 00:00:00 2001 From: xiyang Date: Thu, 10 Aug 2023 16:21:49 -0400 Subject: [PATCH] Add generic hash join --- src/binder/expression/expression.cpp | 25 ++ src/common/types/types.cpp | 10 + src/common/vector/value_vector.cpp | 2 +- src/include/binder/expression/expression.h | 8 + .../common/data_chunk/data_chunk_state.h | 6 +- src/include/common/types/types.h | 3 + src/include/common/vector/value_vector.h | 11 +- .../planner/logical_plan/logical_hash_join.h | 30 +- src/include/planner/logical_plan/schema.h | 2 - .../logical_plan/sip/side_way_info_passing.h | 9 +- .../processor/operator/base_hash_table.h | 17 +- .../operator/hash_join/hash_join_build.h | 30 +- .../operator/hash_join/join_hash_table.h | 46 ++- .../processor/operator/intersect/intersect.h | 4 +- .../operator/intersect/intersect_build.h | 21 +- .../operator/intersect/intersect_hash_table.h | 18 -- src/optimizer/acc_hash_join_optimizer.cpp | 4 + src/optimizer/filter_push_down_optimizer.cpp | 67 ++--- .../projection_push_down_optimizer.cpp | 5 +- src/planner/operator/logical_hash_join.cpp | 82 +++--- src/planner/operator/schema.cpp | 11 - src/planner/plan/append_join.cpp | 12 +- src/processor/map/map_hash_join.cpp | 27 +- src/processor/map/map_intersect.cpp | 18 +- src/processor/map/map_path_property_probe.cpp | 6 +- .../aggregate/aggregate_hash_table.cpp | 12 +- .../operator/hash_join/hash_join_build.cpp | 28 +- .../operator/hash_join/hash_join_probe.cpp | 40 +-- .../operator/hash_join/join_hash_table.cpp | 269 +++++++++++++++--- .../operator/intersect/CMakeLists.txt | 3 +- .../intersect/intersect_hash_table.cpp | 47 --- src/storage/store/rel_table.cpp | 2 +- .../tinysnb/generic_hash_join/basic.test | 60 ++++ 33 files changed, 598 insertions(+), 337 deletions(-) delete mode 100644 src/include/processor/operator/intersect/intersect_hash_table.h delete mode 100644 src/processor/operator/intersect/intersect_hash_table.cpp create mode 100644 test/test_files/tinysnb/generic_hash_join/basic.test diff --git a/src/binder/expression/expression.cpp b/src/binder/expression/expression.cpp index c0c6f433d4..54e667d319 100644 --- a/src/binder/expression/expression.cpp +++ b/src/binder/expression/expression.cpp @@ -49,6 +49,21 @@ std::string ExpressionUtil::toString(const expression_vector& expressions) { return result; } +std::string ExpressionUtil::toString(const std::vector& expressionPairs) { + if (expressionPairs.empty()) { + return std::string{}; + } + auto result = toString(expressionPairs[0]); + for (auto i = 1u; i < expressionPairs.size(); ++i) { + result += "," + toString(expressionPairs[i]); + } + return result; +} + +std::string ExpressionUtil::toString(const expression_pair& expressionPair) { + return expressionPair.first->toString() + "=" + expressionPair.second->toString(); +} + expression_vector ExpressionUtil::excludeExpressions( const expression_vector& expressions, const expression_vector& expressionsToExclude) { expression_set excludeSet; @@ -64,5 +79,15 @@ expression_vector ExpressionUtil::excludeExpressions( return result; } +std::vector> ExpressionUtil::getDataTypes( + const kuzu::binder::expression_vector& expressions) { + std::vector> result; + result.reserve(expressions.size()); + for (auto& expression : expressions) { + result.push_back(expression->getDataType().copy()); + } + return result; +} + } // namespace binder } // namespace kuzu diff --git a/src/common/types/types.cpp b/src/common/types/types.cpp index b67f05d237..22ed87eb82 100644 --- a/src/common/types/types.cpp +++ b/src/common/types/types.cpp @@ -304,6 +304,16 @@ std::unique_ptr LogicalType::copy() const { return dataType; } +std::vector> LogicalType::copy( + const std::vector>& types) { + std::vector> typesCopy; + typesCopy.reserve(types.size()); + for (auto& type : types) { + typesCopy.push_back(type->copy()); + } + return typesCopy; +} + void LogicalType::setPhysicalType() { switch (typeID) { case LogicalTypeID::ANY: { diff --git a/src/common/vector/value_vector.cpp b/src/common/vector/value_vector.cpp index 75933e67f4..a9de763adf 100644 --- a/src/common/vector/value_vector.cpp +++ b/src/common/vector/value_vector.cpp @@ -25,7 +25,7 @@ void ValueVector::setState(std::shared_ptr state) { } } -bool NodeIDVector::discardNull(ValueVector& vector) { +bool ValueVector::discardNull(ValueVector& vector) { if (vector.hasNoNullsGuarantee()) { return true; } else { diff --git a/src/include/binder/expression/expression.h b/src/include/binder/expression/expression.h index 39917c05fe..abc1a6b180 100644 --- a/src/include/binder/expression/expression.h +++ b/src/include/binder/expression/expression.h @@ -119,7 +119,12 @@ struct ExpressionUtil { static uint32_t find(Expression* target, expression_vector expressions); + // Print as a1,a2,a3,... static std::string toString(const expression_vector& expressions); + // Print as a1=a2, a3=a4,... + static std::string toString(const std::vector& expressionPairs); + // Print as a1=a2 + static std::string toString(const expression_pair& expressionPair); static expression_vector excludeExpressions( const expression_vector& expressions, const expression_vector& expressionsToExclude); @@ -136,6 +141,9 @@ struct ExpressionUtil { return expression.expressionType == common::ExpressionType::VARIABLE && expression.dataType.getLogicalTypeID() == common::LogicalTypeID::RECURSIVE_REL; } + + static std::vector> getDataTypes( + const expression_vector& expressions); }; } // namespace binder diff --git a/src/include/common/data_chunk/data_chunk_state.h b/src/include/common/data_chunk/data_chunk_state.h index b222e2bb59..f13fdbc811 100644 --- a/src/include/common/data_chunk/data_chunk_state.h +++ b/src/include/common/data_chunk/data_chunk_state.h @@ -8,8 +8,12 @@ namespace kuzu { namespace common { -class DataChunkState { +enum class FactorizationStateType : uint8_t { + FLAT = 0, + UNFLAT = 1, +}; +class DataChunkState { public: DataChunkState() : DataChunkState(DEFAULT_VECTOR_CAPACITY) {} explicit DataChunkState(uint64_t capacity) : currIdx{-1}, originalSize{0} { diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 349bad18df..8f1bfb00b1 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -271,6 +271,9 @@ class LogicalType { std::unique_ptr copy() const; + static std::vector> copy( + const std::vector>& types); + private: void setPhysicalType(); diff --git a/src/include/common/vector/value_vector.h b/src/include/common/vector/value_vector.h index 205198d6cb..46c8d0e976 100644 --- a/src/include/common/vector/value_vector.h +++ b/src/include/common/vector/value_vector.h @@ -81,6 +81,10 @@ class ValueVector { void resetAuxiliaryBuffer(); + // If there is still non-null values after discarding, return true. Otherwise, return false. + // For an unflat vector, its selection vector is also updated to the resultSelVector. + static bool discardNull(ValueVector& vector); + private: uint32_t getDataTypeSize(const LogicalType& type); void initializeValueBuffer(); @@ -228,13 +232,6 @@ class ArrowColumnVector { static void slice(ValueVector* vector, offset_t offset); }; -class NodeIDVector { -public: - // If there is still non-null values after discarding, return true. Otherwise, return false. - // For an unflat vector, its selection vector is also updated to the resultSelVector. - static bool discardNull(ValueVector& vector); -}; - class MapVector { public: static inline ValueVector* getKeyVector(const ValueVector* vector) { diff --git a/src/include/planner/logical_plan/logical_hash_join.h b/src/include/planner/logical_plan/logical_hash_join.h index 8f61ef50e5..fbd5296365 100644 --- a/src/include/planner/logical_plan/logical_hash_join.h +++ b/src/include/planner/logical_plan/logical_hash_join.h @@ -9,29 +9,32 @@ namespace kuzu { namespace planner { +// We only support equality comparison as join condition +using join_condition_t = binder::expression_pair; + // Probe side on left, i.e. children[0]. Build side on right, i.e. children[1]. class LogicalHashJoin : public LogicalOperator { public: // Inner and left join. - LogicalHashJoin(binder::expression_vector joinNodeIDs, common::JoinType joinType, + LogicalHashJoin(std::vector joinConditions, common::JoinType joinType, std::shared_ptr probeSideChild, std::shared_ptr buildSideChild) - : LogicalHashJoin{std::move(joinNodeIDs), joinType, nullptr, std::move(probeSideChild), + : LogicalHashJoin{std::move(joinConditions), joinType, nullptr, std::move(probeSideChild), std::move(buildSideChild)} {} // Mark join. - LogicalHashJoin(binder::expression_vector joinNodeIDs, std::shared_ptr mark, - std::shared_ptr probeSideChild, + LogicalHashJoin(std::vector joinConditions, + std::shared_ptr mark, std::shared_ptr probeSideChild, std::shared_ptr buildSideChild) - : LogicalHashJoin{std::move(joinNodeIDs), common::JoinType::MARK, std::move(mark), + : LogicalHashJoin{std::move(joinConditions), common::JoinType::MARK, std::move(mark), std::move(probeSideChild), std::move(buildSideChild)} {} - LogicalHashJoin(binder::expression_vector joinNodeIDs, common::JoinType joinType, + LogicalHashJoin(std::vector joinConditions, common::JoinType joinType, std::shared_ptr mark, std::shared_ptr probeSideChild, std::shared_ptr buildSideChild) : LogicalOperator{LogicalOperatorType::HASH_JOIN, std::move(probeSideChild), std::move(buildSideChild)}, - joinNodeIDs(std::move(joinNodeIDs)), joinType{joinType}, mark{std::move(mark)}, + joinConditions(std::move(joinConditions)), joinType{joinType}, mark{std::move(mark)}, sip{SidewaysInfoPassing::NONE} {} f_group_pos_set getGroupsPosToFlattenOnProbeSide(); @@ -41,11 +44,15 @@ class LogicalHashJoin : public LogicalOperator { void computeFlatSchema() override; inline std::string getExpressionsForPrinting() const override { - return binder::ExpressionUtil::toString(joinNodeIDs); + return isNodeIDOnlyJoin() ? binder::ExpressionUtil::toString(getJoinNodeIDs()) : + binder::ExpressionUtil::toString(joinConditions); } binder::expression_vector getExpressionsToMaterialize() const; - inline binder::expression_vector getJoinNodeIDs() const { return joinNodeIDs; } + + binder::expression_vector getJoinNodeIDs() const; + + inline std::vector getJoinConditions() const { return joinConditions; } inline common::JoinType getJoinType() const { return joinType; } inline std::shared_ptr getMark() const { assert(joinType == common::JoinType::MARK && mark); @@ -56,7 +63,7 @@ class LogicalHashJoin : public LogicalOperator { inline std::unique_ptr copy() override { return make_unique( - joinNodeIDs, joinType, mark, children[0]->copy(), children[1]->copy()); + joinConditions, joinType, mark, children[0]->copy(), children[1]->copy()); } // Flat probe side key group in either of the following two cases: @@ -69,10 +76,11 @@ class LogicalHashJoin : public LogicalOperator { bool requireFlatProbeKeys(); private: + bool isNodeIDOnlyJoin() const; bool isJoinKeyUniqueOnBuildSide(const binder::Expression& joinNodeID); private: - binder::expression_vector joinNodeIDs; + std::vector joinConditions; common::JoinType joinType; std::shared_ptr mark; // when joinType is Mark SidewaysInfoPassing sip; diff --git a/src/include/planner/logical_plan/schema.h b/src/include/planner/logical_plan/schema.h index 9a39c4da56..cb5fba3961 100644 --- a/src/include/planner/logical_plan/schema.h +++ b/src/include/planner/logical_plan/schema.h @@ -141,8 +141,6 @@ class Schema { class SchemaUtils { public: - static std::vector getExpressionsPerGroup( - const binder::expression_vector& expressions, const Schema& schema); // Given a set of factorization group, a leading group is selected as the unFlat group (caller // should ensure at most one unFlat group which is our general assumption of factorization). If // all groups are flat, we select any (the first) group as leading group. diff --git a/src/include/planner/logical_plan/sip/side_way_info_passing.h b/src/include/planner/logical_plan/sip/side_way_info_passing.h index 82c98f85b4..38fef87bb4 100644 --- a/src/include/planner/logical_plan/sip/side_way_info_passing.h +++ b/src/include/planner/logical_plan/sip/side_way_info_passing.h @@ -7,10 +7,11 @@ namespace planner { enum class SidewaysInfoPassing : uint8_t { NONE = 0, - PROBE_TO_BUILD = 1, - PROHIBIT_PROBE_TO_BUILD = 2, - BUILD_TO_PROBE = 3, - PROHIBIT_BUILD_TO_PROBE = 4, + PROHIBIT = 1, + PROBE_TO_BUILD = 2, + PROHIBIT_PROBE_TO_BUILD = 3, + BUILD_TO_PROBE = 4, + PROHIBIT_BUILD_TO_PROBE = 5, }; } // namespace planner diff --git a/src/include/processor/operator/base_hash_table.h b/src/include/processor/operator/base_hash_table.h index e0b3c2e740..8a4fc1ebdb 100644 --- a/src/include/processor/operator/base_hash_table.h +++ b/src/include/processor/operator/base_hash_table.h @@ -16,14 +16,29 @@ class BaseHashTable { virtual ~BaseHashTable() = default; +protected: + inline void setMaxNumHashSlots(uint64_t newSize) { + maxNumHashSlots = newSize; + bitmask = maxNumHashSlots - 1; + } + + inline void initSlotConstant(uint64_t numSlotsPerBlock_) { + assert(numSlotsPerBlock_ == common::nextPowerOfTwo(numSlotsPerBlock_)); + numSlotsPerBlock = numSlotsPerBlock_; + numSlotsPerBlockLog2 = std::log2(numSlotsPerBlock); + slotIdxInBlockMask = + common::BitmaskUtils::all1sMaskForLeastSignificantBits(numSlotsPerBlockLog2); + } + inline uint64_t getSlotIdxForHash(common::hash_t hash) const { return hash & bitmask; } protected: uint64_t maxNumHashSlots; uint64_t bitmask; - std::vector> hashSlotsBlocks; + uint64_t numSlotsPerBlock; uint64_t numSlotsPerBlockLog2; uint64_t slotIdxInBlockMask; + std::vector> hashSlotsBlocks; storage::MemoryManager& memoryManager; std::unique_ptr factorizedTable; }; diff --git a/src/include/processor/operator/hash_join/hash_join_build.h b/src/include/processor/operator/hash_join/hash_join_build.h index 01bd956c2a..f8034407de 100644 --- a/src/include/processor/operator/hash_join/hash_join_build.h +++ b/src/include/processor/operator/hash_join/hash_join_build.h @@ -38,13 +38,14 @@ class HashJoinBuildInfo { friend class HashJoinBuild; public: - HashJoinBuildInfo(std::vector keysPos, std::vector payloadsPos, - std::unique_ptr tableSchema) - : keysPos{std::move(keysPos)}, payloadsPos{std::move(payloadsPos)}, tableSchema{std::move( - tableSchema)} {} + HashJoinBuildInfo(std::vector keysPos, + std::vector factorizationStateTypes, + std::vector payloadsPos, std::unique_ptr tableSchema) + : keysPos{std::move(keysPos)}, factorizationStateTypes{std::move(factorizationStateTypes)}, + payloadsPos{std::move(payloadsPos)}, tableSchema{std::move(tableSchema)} {} HashJoinBuildInfo(const HashJoinBuildInfo& other) - : keysPos{other.keysPos}, payloadsPos{other.payloadsPos}, tableSchema{ - other.tableSchema->copy()} {} + : keysPos{other.keysPos}, factorizationStateTypes{other.factorizationStateTypes}, + payloadsPos{other.payloadsPos}, tableSchema{other.tableSchema->copy()} {} inline uint32_t getNumKeys() const { return keysPos.size(); } @@ -56,6 +57,7 @@ class HashJoinBuildInfo { private: std::vector keysPos; + std::vector factorizationStateTypes; std::vector payloadsPos; std::unique_ptr tableSchema; }; @@ -73,7 +75,6 @@ class HashJoinBuild : public Sink { uint32_t id, const std::string& paramsString) : Sink{std::move(resultSetDescriptor), operatorType, std::move(child), id, paramsString}, sharedState{std::move(sharedState)}, info{std::move(info)} {} - ~HashJoinBuild() override = default; inline std::shared_ptr getSharedState() const { return sharedState; } @@ -88,15 +89,22 @@ class HashJoinBuild : public Sink { } protected: - virtual void initLocalHashTable(storage::MemoryManager& memoryManager) { - hashTable = std::make_unique( - memoryManager, info->getNumKeys(), info->tableSchema->copy()); + virtual inline void appendVectors() { + hashTable->appendVectors(keyVectors, payloadVectors, keyState); } +private: + void setKeyState(common::DataChunkState* state); + protected: std::shared_ptr sharedState; std::unique_ptr info; - std::vector vectorsToAppend; + + std::vector keyVectors; + // State of unFlat key(s). If all keys are flat, it points to any flat key state. + common::DataChunkState* keyState = nullptr; + std::vector payloadVectors; + std::unique_ptr hashTable; // local state }; diff --git a/src/include/processor/operator/hash_join/join_hash_table.h b/src/include/processor/operator/hash_join/join_hash_table.h index 91cc69fd79..0f01a0ad82 100644 --- a/src/include/processor/operator/hash_join/join_hash_table.h +++ b/src/include/processor/operator/hash_join/join_hash_table.h @@ -9,18 +9,36 @@ namespace kuzu { namespace processor { class JoinHashTable : public BaseHashTable { + using hash_function_t = std::function; + using compare_function_t = + std::function; + public: - JoinHashTable(storage::MemoryManager& memoryManager, uint64_t numKeyColumns, + JoinHashTable(storage::MemoryManager& memoryManager, + std::vector> keyTypes, std::unique_ptr tableSchema); - virtual ~JoinHashTable() = default; + void appendVectors(const std::vector& keyVectors, + const std::vector& payloadVectors, common::DataChunkState* keyState); + void appendVector(common::ValueVector* vector, + const std::vector& appendInfos, ft_col_idx_t colIdx); + + // Used in worst-case optimal join + void appendVectorWithSorting( + common::ValueVector* keyVector, std::vector payloadVectors); - virtual void append(const std::vector& vectorsToAppend); void allocateHashSlots(uint64_t numTuples); void buildHashSlots(); void probe(const std::vector& keyVectors, common::ValueVector* hashVector, common::ValueVector* tmpHashVector, uint8_t** probedTuples); + // All key vectors must be flat. Thus input is a tuple, multiple matches can be found for the + // given key tuple. + common::sel_t matchFlatKeys(const std::vector& keyVectors, + uint8_t** probedTuples, uint8_t** matchedTuples); + // Input is multiple tuples, at most one match exist for each key. + common::sel_t matchUnFlatKey(common::ValueVector* keyVector, uint8_t** probedTuples, + uint8_t** matchedTuples, common::SelectionVector* matchedTuplesSelVector); inline void lookup(std::vector& vectors, std::vector& colIdxesToScan, uint8_t** tuplesToRead, uint64_t startPos, @@ -30,7 +48,7 @@ class JoinHashTable : public BaseHashTable { inline void merge(JoinHashTable& other) { factorizedTable->merge(*other.factorizedTable); } inline uint64_t getNumTuples() { return factorizedTable->getNumTuples(); } inline uint8_t** getPrevTuple(const uint8_t* tuple) const { - return (uint8_t**)(tuple + colOffsetOfPrevPtrInTuple); + return (uint8_t**)(tuple + prevPtrColOffset); } inline uint8_t* getTupleForHash(common::hash_t hash) { auto slotIdx = getSlotIdxForHash(hash); @@ -42,18 +60,24 @@ class JoinHashTable : public BaseHashTable { return factorizedTable->getTableSchema(); } -protected: - uint8_t** findHashSlot(common::nodeID_t* nodeIDs) const; +private: + uint8_t** findHashSlot(uint8_t* tuple) const; // This function returns the pointer that previously stored in the same slot. uint8_t* insertEntry(uint8_t* tuple) const; - // This function returns a boolean flag indicating if there is non-null keys after discarding. - static bool discardNullFromKeys( - const std::vector& vectors, uint32_t numKeyVectors); + bool compareFlatKeys(const std::vector& keyVectors, const uint8_t* tuple); + + void initFunctions(); + void getHashFunction(common::PhysicalTypeID physicalTypeID, hash_function_t& func); + void getCompareFunction(common::PhysicalTypeID physicalTypeID, compare_function_t& func); private: - uint64_t numKeyColumns; - uint64_t colOffsetOfPrevPtrInTuple; + std::vector> keyTypes; + std::vector entryHashFunctions; + std::vector entryCompareFunctions; + + const FactorizedTableSchema* tableSchema; + uint64_t prevPtrColOffset; }; } // namespace processor diff --git a/src/include/processor/operator/intersect/intersect.h b/src/include/processor/operator/intersect/intersect.h index f5246b3121..f21a9d7169 100644 --- a/src/include/processor/operator/intersect/intersect.h +++ b/src/include/processor/operator/intersect/intersect.h @@ -15,7 +15,7 @@ struct IntersectDataInfo { class Intersect : public PhysicalOperator { public: Intersect(const DataPos& outputDataPos, std::vector intersectDataInfos, - std::vector> sharedHTs, + std::vector> sharedHTs, std::vector> children, uint32_t id, const std::string& paramsString) : PhysicalOperator{PhysicalOperatorType::INTERSECT, std::move(children), id, paramsString}, @@ -59,7 +59,7 @@ class Intersect : public PhysicalOperator { std::shared_ptr outKeyVector; std::vector> probeKeyVectors; std::vector> intersectSelVectors; - std::vector> sharedHTs; + std::vector> sharedHTs; std::vector isIntersectListAFlatValue; std::vector> probedFlatTuples; // Keep track of the tuple to intersect for each build side. diff --git a/src/include/processor/operator/intersect/intersect_build.h b/src/include/processor/operator/intersect/intersect_build.h index 74692d74c5..8136226e6b 100644 --- a/src/include/processor/operator/intersect/intersect_build.h +++ b/src/include/processor/operator/intersect/intersect_build.h @@ -1,35 +1,26 @@ #pragma once -#include "intersect_hash_table.h" #include "processor/operator/hash_join/hash_join_build.h" namespace kuzu { namespace processor { -class IntersectSharedState : public HashJoinSharedState { -public: - explicit IntersectSharedState(std::unique_ptr hashtable) - : HashJoinSharedState{std::move(hashtable)} {} -}; - class IntersectBuild : public HashJoinBuild { public: IntersectBuild(std::unique_ptr resultSetDescriptor, - std::shared_ptr sharedState, std::unique_ptr info, + std::shared_ptr sharedState, std::unique_ptr info, std::unique_ptr child, uint32_t id, const std::string& paramsString) : HashJoinBuild{std::move(resultSetDescriptor), PhysicalOperatorType::INTERSECT_BUILD, std::move(sharedState), std::move(info), std::move(child), id, paramsString} {} inline std::unique_ptr clone() override { - return make_unique(resultSetDescriptor->copy(), - common::ku_reinterpret_pointer_cast( - sharedState), - info->copy(), children[0]->clone(), id, paramsString); + return make_unique(resultSetDescriptor->copy(), sharedState, info->copy(), + children[0]->clone(), id, paramsString); } -protected: - inline void initLocalHashTable(storage::MemoryManager& memoryManager) override { - hashTable = make_unique(memoryManager, info->getTableSchema()->copy()); + inline void appendVectors() final { + assert(keyVectors.size() == 1); + hashTable->appendVectorWithSorting(keyVectors[0], payloadVectors); } }; diff --git a/src/include/processor/operator/intersect/intersect_hash_table.h b/src/include/processor/operator/intersect/intersect_hash_table.h deleted file mode 100644 index 265a687f11..0000000000 --- a/src/include/processor/operator/intersect/intersect_hash_table.h +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include "processor/operator/hash_join/join_hash_table.h" - -namespace kuzu { -namespace processor { - -class IntersectHashTable : public JoinHashTable { -public: - IntersectHashTable( - storage::MemoryManager& memoryManager, std::unique_ptr tableSchema) - : JoinHashTable{memoryManager, 1 /* numKeyColumns */, std::move(tableSchema)} {} - - void append(const std::vector& vectorsToAppend) override; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/optimizer/acc_hash_join_optimizer.cpp b/src/optimizer/acc_hash_join_optimizer.cpp index 42f6239f9f..bfb6cb9f60 100644 --- a/src/optimizer/acc_hash_join_optimizer.cpp +++ b/src/optimizer/acc_hash_join_optimizer.cpp @@ -27,6 +27,10 @@ void HashJoinSIPOptimizer::visitOperator(planner::LogicalOperator* op) { } void HashJoinSIPOptimizer::visitHashJoin(planner::LogicalOperator* op) { + auto hashJoin = (LogicalHashJoin*)op; + if (hashJoin->getSIP() == planner::SidewaysInfoPassing::PROHIBIT) { + return; + } if (tryBuildToProbeHJSIP(op)) { // Try build to probe SIP first. return; } diff --git a/src/optimizer/filter_push_down_optimizer.cpp b/src/optimizer/filter_push_down_optimizer.cpp index afdca1b54e..a05ef8067a 100644 --- a/src/optimizer/filter_push_down_optimizer.cpp +++ b/src/optimizer/filter_push_down_optimizer.cpp @@ -4,6 +4,7 @@ #include "binder/expression/property_expression.h" #include "binder/expression_visitor.h" #include "planner/logical_plan/logical_filter.h" +#include "planner/logical_plan/logical_hash_join.h" #include "planner/logical_plan/scan/logical_dummy_scan.h" #include "planner/logical_plan/scan/logical_scan_node.h" #include "planner/logical_plan/scan/logical_scan_node_property.h" @@ -50,61 +51,35 @@ std::shared_ptr FilterPushDownOptimizer::visitFilterReplace( return visitOperator(filter->getChild(0)); } -// A trivial sub-plan is defined as a plan containing only simple table scan (i.e. SCAN_NODE, -// SCAN_NODE_PROPERTY), FILTER and PROJECTION. -static bool isTrivialSubPlan(LogicalOperator* root) { - switch (root->getOperatorType()) { - case LogicalOperatorType::FILTER: - case LogicalOperatorType::SCAN_NODE_PROPERTY: - case LogicalOperatorType::PROJECTION: { // operators we directly search through - return isTrivialSubPlan(root->getChild(0).get()); - } - case LogicalOperatorType::SCAN_NODE: { - return true; - } - default: - return false; - } -} - -static std::vector> fetchOpsInTrivialSubPlan( - std::shared_ptr root) { - std::vector> result; - auto op = std::move(root); - while (op->getOperatorType() != LogicalOperatorType::SCAN_NODE) { - result.push_back(op); - assert(op->getNumChildren() == 1); - op = op->getChild(0); - } - result.push_back(op); - return result; -} - std::shared_ptr FilterPushDownOptimizer::visitCrossProductReplace( std::shared_ptr op) { for (auto i = 0u; i < op->getNumChildren(); ++i) { auto optimizer = FilterPushDownOptimizer(); op->setChild(i, optimizer.visitOperator(op->getChild(i))); } - if (!isTrivialSubPlan(op->getChild(1).get())) { - return finishPushDown(op); + auto probeSchema = op->getChild(0)->getSchema(); + auto buildSchema = op->getChild(1)->getSchema(); + std::vector joinConditions; + for (auto& predicate : predicateSet->equalityPredicates) { + auto left = predicate->getChild(0); + auto right = predicate->getChild(1); + // TODO(Xiyang): this can only rewrite left = right, we should also be able to do + // expr(left), expr(right) + if (probeSchema->isExpressionInScope(*left) && buildSchema->isExpressionInScope(*right)) { + joinConditions.emplace_back(left, right); + } else if (probeSchema->isExpressionInScope(*right) && + buildSchema->isExpressionInScope(*left)) { + joinConditions.emplace_back(right, left); + } } - auto buildOps = fetchOpsInTrivialSubPlan(op->getChild(1)); - auto node = ((LogicalScanNode&)*buildOps[buildOps.size() - 1]).getNode(); - auto primaryKeyEqualityComparison = predicateSet->popNodePKEqualityComparison(*node); - if (primaryKeyEqualityComparison == nullptr) { + if (joinConditions.empty()) { return finishPushDown(op); } - // Append index scan to left branch - auto indexScan = make_shared( - node, primaryKeyEqualityComparison->getChild(1), op->getChild(0)); - indexScan->computeFlatSchema(); - // Append right branch (except for node table scan) to left branch - buildOps[buildOps.size() - 2]->setChild(0, std::move(indexScan)); - for (auto i = 0; i < buildOps.size() - 1; ++i) { - buildOps[i]->computeFlatSchema(); - } - return buildOps[0]; + auto hashJoin = std::make_shared( + joinConditions, JoinType::INNER, op->getChild(0), op->getChild(1)); + hashJoin->setSIP(planner::SidewaysInfoPassing::PROHIBIT); + hashJoin->computeFlatSchema(); + return hashJoin; } std::shared_ptr FilterPushDownOptimizer::visitScanNodePropertyReplace( diff --git a/src/optimizer/projection_push_down_optimizer.cpp b/src/optimizer/projection_push_down_optimizer.cpp index 4147aff2b4..1499da14a9 100644 --- a/src/optimizer/projection_push_down_optimizer.cpp +++ b/src/optimizer/projection_push_down_optimizer.cpp @@ -82,8 +82,9 @@ void ProjectionPushDownOptimizer::visitFilter(planner::LogicalOperator* op) { void ProjectionPushDownOptimizer::visitHashJoin(planner::LogicalOperator* op) { auto hashJoin = (LogicalHashJoin*)op; - for (auto& joinNodeID : hashJoin->getJoinNodeIDs()) { - collectExpressionsInUse(joinNodeID); + for (auto& [probeJoinKey, buildJoinKey] : hashJoin->getJoinConditions()) { + collectExpressionsInUse(probeJoinKey); + collectExpressionsInUse(buildJoinKey); } if (hashJoin->getJoinType() == JoinType::MARK) { // no need to perform push down for mark join. return; diff --git a/src/planner/operator/logical_hash_join.cpp b/src/planner/operator/logical_hash_join.cpp index c431fb57c9..e8d64e8ec5 100644 --- a/src/planner/operator/logical_hash_join.cpp +++ b/src/planner/operator/logical_hash_join.cpp @@ -15,8 +15,8 @@ f_group_pos_set LogicalHashJoin::getGroupsPosToFlattenOnProbeSide() { return result; } auto probeSchema = children[0]->getSchema(); - for (auto& joinNodeID : joinNodeIDs) { - result.insert(probeSchema->getGroupPos(*joinNodeID)); + for (auto& [probeKey, buildKey] : joinConditions) { + result.insert(probeSchema->getGroupPos(*probeKey)); } return result; } @@ -24,8 +24,8 @@ f_group_pos_set LogicalHashJoin::getGroupsPosToFlattenOnProbeSide() { f_group_pos_set LogicalHashJoin::getGroupsPosToFlattenOnBuildSide() { auto buildSchema = children[1]->getSchema(); f_group_pos_set joinNodesGroupPos; - for (auto& joinNodeID : joinNodeIDs) { - joinNodesGroupPos.insert(buildSchema->getGroupPos(*joinNodeID)); + for (auto& [probeKey, buildKey] : joinConditions) { + joinNodesGroupPos.insert(buildSchema->getGroupPos(*buildKey)); } return factorization::FlattenAllButOne::getGroupsPosToFlatten(joinNodesGroupPos, buildSchema); } @@ -37,30 +37,25 @@ void LogicalHashJoin::computeFactorizedSchema() { switch (joinType) { case JoinType::INNER: case JoinType::LEFT: { - // resolve key groups - std::unordered_map> keyGroupPosToKeys; - for (auto& joinNodeID : joinNodeIDs) { - auto groupPos = buildSchema->getGroupPos(*joinNodeID); - if (!keyGroupPosToKeys.contains(groupPos)) { - keyGroupPosToKeys.insert({groupPos, std::unordered_set()}); + // Populate group position mapping + std::unordered_map buildToProbeKeyGroupPositionMap; + for (auto& [probeKey, buildKey] : joinConditions) { + auto probeKeyGroupPos = probeSchema->getGroupPos(*probeKey); + auto buildKeyGroupPos = buildSchema->getGroupPos(*buildKey); + if (!buildToProbeKeyGroupPositionMap.contains(buildKeyGroupPos)) { + buildToProbeKeyGroupPositionMap.insert({buildKeyGroupPos, probeKeyGroupPos}); } - keyGroupPosToKeys.at(groupPos).insert(joinNodeID->getUniqueName()); } - // resolve expressions to materialize in each group - auto expressionsToMaterializePerGroup = - SchemaUtils::getExpressionsPerGroup(getExpressionsToMaterialize(), *buildSchema); + // Resolve expressions to materialize in each group binder::expression_vector expressionsToMaterializeInNonKeyGroups; - for (auto i = 0; i < buildSchema->getNumGroups(); ++i) { - auto expressions = expressionsToMaterializePerGroup[i]; - bool isKeyGroup = keyGroupPosToKeys.contains(i); + for (auto groupIdx = 0; groupIdx < buildSchema->getNumGroups(); ++groupIdx) { + auto expressions = buildSchema->getExpressionsInScope(groupIdx); + bool isKeyGroup = buildToProbeKeyGroupPositionMap.contains(groupIdx); if (isKeyGroup) { // merge key group - auto keys = keyGroupPosToKeys.at(i); - auto resultGroupPos = schema->getGroupPos(*keys.begin()); + auto probeKeyGroupPos = buildToProbeKeyGroupPositionMap.at(groupIdx); for (auto& expression : expressions) { - if (keys.contains(expression->getUniqueName())) { - continue; - } - schema->insertToGroupAndScope(expression, resultGroupPos); + // Join key may repeat for internal ID based joins + schema->insertToGroupAndScopeMayRepeat(expression, probeKeyGroupPos); } } else { for (auto& expression : expressions) { @@ -73,8 +68,8 @@ void LogicalHashJoin::computeFactorizedSchema() { } break; case JoinType::MARK: { std::unordered_set probeSideKeyGroupPositions; - for (auto& joinNodeID : joinNodeIDs) { - probeSideKeyGroupPositions.insert(probeSchema->getGroupPos(*joinNodeID)); + for (auto& [probeKey, buildKey] : joinConditions) { + probeSideKeyGroupPositions.insert(probeSchema->getGroupPos(*probeKey)); } if (probeSideKeyGroupPositions.size() > 1) { SchemaUtils::validateNoUnFlatGroup(probeSideKeyGroupPositions, *probeSchema); @@ -94,12 +89,9 @@ void LogicalHashJoin::computeFlatSchema() { switch (joinType) { case JoinType::INNER: case JoinType::LEFT: { - auto joinKeysSet = binder::expression_set{joinNodeIDs.begin(), joinNodeIDs.end()}; for (auto& expression : buildSchema->getExpressionsInScope()) { - if (joinKeysSet.contains(expression)) { - continue; - } - schema->insertToGroupAndScope(expression, 0); + // Join key may repeat for internal ID based joins. + schema->insertToGroupAndScopeMayRepeat(expression, 0); } } break; case JoinType::MARK: { @@ -124,18 +116,40 @@ binder::expression_vector LogicalHashJoin::getExpressionsToMaterialize() const { } } +bool LogicalHashJoin::isNodeIDOnlyJoin() const { + for (auto& [probeKey, buildKey] : joinConditions) { + if (probeKey->getUniqueName() != buildKey->getUniqueName() || + probeKey->getDataType().getLogicalTypeID() != common::LogicalTypeID::INTERNAL_ID) { + return false; + } + } + return true; +} + +binder::expression_vector LogicalHashJoin::getJoinNodeIDs() const { + binder::expression_vector result; + assert(isNodeIDOnlyJoin()); + for (auto& [probeKey, _] : joinConditions) { + result.push_back(probeKey); + } + return result; +} + bool LogicalHashJoin::requireFlatProbeKeys() { // Flatten for multiple join keys. - if (joinNodeIDs.size() > 1) { + if (joinConditions.size() > 1) { return true; } // Flatten for left join. - // TODO(Guodong): fix this. if (joinType == JoinType::LEFT) { + return true; // TODO(Guodong): fix this. We shouldn't require flatten. + } + auto& [probeKey, buildKey] = joinConditions[0]; + // Flatten for non-ID-based join. + if (probeKey->dataType.getLogicalTypeID() != LogicalTypeID::INTERNAL_ID) { return true; } - auto joinNodeID = joinNodeIDs[0].get(); - return !isJoinKeyUniqueOnBuildSide(*joinNodeID); + return !isJoinKeyUniqueOnBuildSide(*buildKey); } bool LogicalHashJoin::isJoinKeyUniqueOnBuildSide(const binder::Expression& joinNodeID) { diff --git a/src/planner/operator/schema.cpp b/src/planner/operator/schema.cpp index f5a162ac70..6b7cda93ba 100644 --- a/src/planner/operator/schema.cpp +++ b/src/planner/operator/schema.cpp @@ -127,17 +127,6 @@ size_t Schema::getNumGroups(bool isFlat) const { return result; } -std::vector SchemaUtils::getExpressionsPerGroup( - const binder::expression_vector& expressions, const Schema& schema) { - std::vector result; - result.resize(schema.getNumGroups()); - for (auto& expression : expressions) { - auto groupPos = schema.getGroupPos(*expression); - result[groupPos].push_back(expression); - } - return result; -} - f_group_pos SchemaUtils::getLeadingGroupPos( const std::unordered_set& groupPositions, const Schema& schema) { auto leadingGroupPos = INVALID_F_GROUP_POS; diff --git a/src/planner/plan/append_join.cpp b/src/planner/plan/append_join.cpp index 766605718f..98dbf4791a 100644 --- a/src/planner/plan/append_join.cpp +++ b/src/planner/plan/append_join.cpp @@ -10,8 +10,12 @@ namespace planner { void QueryPlanner::appendHashJoin(const expression_vector& joinNodeIDs, JoinType joinType, LogicalPlan& probePlan, LogicalPlan& buildPlan) { + std::vector joinConditions; + for (auto& joinNodeID : joinNodeIDs) { + joinConditions.emplace_back(joinNodeID, joinNodeID); + } auto hashJoin = make_shared( - joinNodeIDs, joinType, probePlan.getLastOperator(), buildPlan.getLastOperator()); + joinConditions, joinType, probePlan.getLastOperator(), buildPlan.getLastOperator()); // Apply flattening to probe side auto groupsPosToFlattenOnProbeSide = hashJoin->getGroupsPosToFlattenOnProbeSide(); appendFlattens(groupsPosToFlattenOnProbeSide, probePlan); @@ -37,8 +41,12 @@ void QueryPlanner::appendHashJoin(const expression_vector& joinNodeIDs, JoinType void QueryPlanner::appendMarkJoin(const expression_vector& joinNodeIDs, const std::shared_ptr& mark, LogicalPlan& probePlan, LogicalPlan& buildPlan) { + std::vector joinConditions; + for (auto& joinNodeID : joinNodeIDs) { + joinConditions.emplace_back(joinNodeID, joinNodeID); + } auto hashJoin = make_shared( - joinNodeIDs, mark, probePlan.getLastOperator(), buildPlan.getLastOperator()); + joinConditions, mark, probePlan.getLastOperator(), buildPlan.getLastOperator()); // Apply flattening to probe side appendFlattens(hashJoin->getGroupsPosToFlattenOnProbeSide(), probePlan); hashJoin->setChild(0, probePlan.getLastOperator()); diff --git a/src/processor/map/map_hash_join.cpp b/src/processor/map/map_hash_join.cpp index bad6ab5392..8cfd2ca78e 100644 --- a/src/processor/map/map_hash_join.cpp +++ b/src/processor/map/map_hash_join.cpp @@ -14,6 +14,7 @@ std::unique_ptr PlanMapper::createHashBuildInfo( const Schema& buildSchema, const expression_vector& keys, const expression_vector& payloads) { planner::f_group_pos_set keyGroupPosSet; std::vector keysPos; + std::vector factorizationStateTypes; std::vector payloadsPos; auto tableSchema = std::make_unique(); for (auto& key : keys) { @@ -24,6 +25,9 @@ std::unique_ptr PlanMapper::createHashBuildInfo( LogicalTypeUtils::getRowLayoutSize(key->dataType)); tableSchema->appendColumn(std::move(columnSchema)); keysPos.push_back(pos); + factorizationStateTypes.push_back(buildSchema.getGroup(pos.dataChunkPos)->isFlat() ? + FactorizationStateType::FLAT : + FactorizationStateType::UNFLAT); } for (auto& payload : payloads) { auto pos = DataPos(buildSchema.getExpressionPos(*payload)); @@ -47,8 +51,8 @@ std::unique_ptr PlanMapper::createHashBuildInfo( auto pointerColumn = std::make_unique(false /* isUnFlat */, INVALID_DATA_CHUNK_POS, LogicalTypeUtils::getRowLayoutSize(pointerType)); tableSchema->appendColumn(std::move(pointerColumn)); - return std::make_unique( - std::move(keysPos), std::move(payloadsPos), std::move(tableSchema)); + return std::make_unique(std::move(keysPos), + std::move(factorizationStateTypes), std::move(payloadsPos), std::move(tableSchema)); } std::unique_ptr PlanMapper::mapHashJoin(LogicalOperator* logicalOperator) { @@ -66,20 +70,27 @@ std::unique_ptr PlanMapper::mapHashJoin(LogicalOperator* logic probeSidePrevOperator = mapOperator(hashJoin->getChild(0).get()); } auto paramsString = hashJoin->getExpressionsForPrinting(); - auto payloads = ExpressionUtil::excludeExpressions( - hashJoin->getExpressionsToMaterialize(), hashJoin->getJoinNodeIDs()); + expression_vector probeKeys; + expression_vector buildKeys; + for (auto& [probeKey, buildKey] : hashJoin->getJoinConditions()) { + probeKeys.push_back(probeKey); + buildKeys.push_back(buildKey); + } + auto buildKeyTypes = ExpressionUtil::getDataTypes(buildKeys); + auto payloads = + ExpressionUtil::excludeExpressions(hashJoin->getExpressionsToMaterialize(), probeKeys); // Create build - auto buildInfo = createHashBuildInfo(*buildSchema, hashJoin->getJoinNodeIDs(), payloads); + auto buildInfo = createHashBuildInfo(*buildSchema, buildKeys, payloads); auto globalHashTable = std::make_unique( - *memoryManager, buildInfo->getNumKeys(), buildInfo->getTableSchema()->copy()); + *memoryManager, LogicalType::copy(buildKeyTypes), buildInfo->getTableSchema()->copy()); auto sharedState = std::make_shared(std::move(globalHashTable)); auto hashJoinBuild = make_unique(std::make_unique(buildSchema), sharedState, std::move(buildInfo), std::move(buildSidePrevOperator), getOperatorID(), paramsString); // Create probe std::vector probeKeysDataPos; - for (auto& joinNodeID : hashJoin->getJoinNodeIDs()) { - probeKeysDataPos.emplace_back(outSchema->getExpressionPos(*joinNodeID)); + for (auto& probeKey : probeKeys) { + probeKeysDataPos.emplace_back(outSchema->getExpressionPos(*probeKey)); } std::vector probePayloadsOutPos; for (auto& payload : payloads) { diff --git a/src/processor/map/map_intersect.cpp b/src/processor/map/map_intersect.cpp index 75f3ca6dc2..f134db77f4 100644 --- a/src/processor/map/map_intersect.cpp +++ b/src/processor/map/map_intersect.cpp @@ -3,7 +3,9 @@ #include "processor/operator/intersect/intersect_build.h" #include "processor/plan_mapper.h" +using namespace kuzu::binder; using namespace kuzu::planner; +using namespace kuzu::common; namespace kuzu { namespace processor { @@ -14,19 +16,21 @@ std::unique_ptr PlanMapper::mapIntersect(LogicalOperator* logi auto outSchema = logicalIntersect->getSchema(); std::vector> children; children.resize(logicalOperator->getNumChildren()); - std::vector> sharedStates; + std::vector> sharedStates; std::vector intersectDataInfos; // Map build side children. for (auto i = 1u; i < logicalIntersect->getNumChildren(); i++) { auto keyNodeID = logicalIntersect->getKeyNodeID(i - 1); + auto keys = expression_vector{keyNodeID}; + auto keyTypes = ExpressionUtil::getDataTypes(keys); auto buildSchema = logicalIntersect->getChild(i)->getSchema(); auto buildPrevOperator = mapOperator(logicalIntersect->getChild(i).get()); - auto payloadExpressions = binder::ExpressionUtil::excludeExpressions( - buildSchema->getExpressionsInScope(), {keyNodeID}); - auto buildInfo = createHashBuildInfo(*buildSchema, {keyNodeID}, payloadExpressions); - auto globalHashTable = std::make_unique( - *memoryManager, buildInfo->getTableSchema()->copy()); - auto sharedState = std::make_shared(std::move(globalHashTable)); + auto payloadExpressions = + binder::ExpressionUtil::excludeExpressions(buildSchema->getExpressionsInScope(), keys); + auto buildInfo = createHashBuildInfo(*buildSchema, keys, payloadExpressions); + auto globalHashTable = std::make_unique( + *memoryManager, LogicalType::copy(keyTypes), buildInfo->getTableSchema()->copy()); + auto sharedState = std::make_shared(std::move(globalHashTable)); sharedStates.push_back(sharedState); children[i] = make_unique( std::make_unique(buildSchema), sharedState, std::move(buildInfo), diff --git a/src/processor/map/map_path_property_probe.cpp b/src/processor/map/map_path_property_probe.cpp index 0bcd75fc90..839ad54f0a 100644 --- a/src/processor/map/map_path_property_probe.cpp +++ b/src/processor/map/map_path_property_probe.cpp @@ -49,11 +49,12 @@ std::unique_ptr PlanMapper::mapPathPropertyProbe( auto nodeBuildPrevOperator = mapOperator(logicalProbe->getChild(1).get()); auto nodeBuildSchema = logicalProbe->getChild(1)->getSchema(); auto nodeKeys = expression_vector{recursiveInfo->node->getInternalIDProperty()}; + auto nodeKeyTypes = ExpressionUtil::getDataTypes(nodeKeys); auto nodePayloads = ExpressionUtil::excludeExpressions(nodeBuildSchema->getExpressionsInScope(), nodeKeys); auto nodeBuildInfo = createHashBuildInfo(*nodeBuildSchema, nodeKeys, nodePayloads); auto nodeHashTable = std::make_unique( - *memoryManager, nodeBuildInfo->getNumKeys(), nodeBuildInfo->getTableSchema()->copy()); + *memoryManager, std::move(nodeKeyTypes), nodeBuildInfo->getTableSchema()->copy()); auto nodeBuildSharedState = std::make_shared(std::move(nodeHashTable)); auto nodeBuild = make_unique( std::make_unique(nodeBuildSchema), nodeBuildSharedState, @@ -62,11 +63,12 @@ std::unique_ptr PlanMapper::mapPathPropertyProbe( auto relBuildPrvOperator = mapOperator(logicalProbe->getChild(2).get()); auto relBuildSchema = logicalProbe->getChild(2)->getSchema(); auto relKeys = expression_vector{recursiveInfo->rel->getInternalIDProperty()}; + auto relKeyTypes = ExpressionUtil::getDataTypes(relKeys); auto relPayloads = ExpressionUtil::excludeExpressions(relBuildSchema->getExpressionsInScope(), relKeys); auto relBuildInfo = createHashBuildInfo(*relBuildSchema, relKeys, relPayloads); auto relHashTable = std::make_unique( - *memoryManager, relBuildInfo->getNumKeys(), relBuildInfo->getTableSchema()->copy()); + *memoryManager, std::move(relKeyTypes), relBuildInfo->getTableSchema()->copy()); auto relBuildSharedState = std::make_shared(std::move(relHashTable)); auto relBuild = std::make_unique( std::make_unique(relBuildSchema), relBuildSharedState, diff --git a/src/processor/operator/aggregate/aggregate_hash_table.cpp b/src/processor/operator/aggregate/aggregate_hash_table.cpp index 72f387f6fe..4b3c611795 100644 --- a/src/processor/operator/aggregate/aggregate_hash_table.cpp +++ b/src/processor/operator/aggregate/aggregate_hash_table.cpp @@ -172,13 +172,10 @@ void AggregateHashTable::initializeFT( } void AggregateHashTable::initializeHashTable(uint64_t numEntriesToAllocate) { - maxNumHashSlots = nextPowerOfTwo( - std::max(BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot), numEntriesToAllocate)); - bitmask = maxNumHashSlots - 1; + setMaxNumHashSlots(nextPowerOfTwo( + std::max(BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot), numEntriesToAllocate))); auto numHashSlotsPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot); - assert(numHashSlotsPerBlock == nextPowerOfTwo(numHashSlotsPerBlock)); - numSlotsPerBlockLog2 = log2(numHashSlotsPerBlock); - slotIdxInBlockMask = BitmaskUtils::all1sMaskForLeastSignificantBits(numSlotsPerBlockLog2); + initSlotConstant(numHashSlotsPerBlock); auto numDataBlocks = maxNumHashSlots / numHashSlotsPerBlock + (maxNumHashSlots % numHashSlotsPerBlock != 0); for (auto i = 0u; i < numDataBlocks; i++) { @@ -214,8 +211,7 @@ uint8_t* AggregateHashTable::findEntryInDistinctHT( } void AggregateHashTable::resize(uint64_t newSize) { - maxNumHashSlots = newSize; - bitmask = maxNumHashSlots - 1; + setMaxNumHashSlots(newSize); addDataBlocksIfNecessary(maxNumHashSlots); for (auto& block : hashSlotsBlocks) { block->resetToZero(); diff --git a/src/processor/operator/hash_join/hash_join_build.cpp b/src/processor/operator/hash_join/hash_join_build.cpp index 0e324388ae..9f797a66b1 100644 --- a/src/processor/operator/hash_join/hash_join_build.cpp +++ b/src/processor/operator/hash_join/hash_join_build.cpp @@ -12,13 +12,31 @@ void HashJoinSharedState::mergeLocalHashTable(JoinHashTable& localHashTable) { } void HashJoinBuild::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - for (auto& pos : info->keysPos) { - vectorsToAppend.push_back(resultSet->getValueVector(pos).get()); + std::vector> keyTypes; + for (auto i = 0u; i < info->keysPos.size(); ++i) { + auto vector = resultSet->getValueVector(info->keysPos[i]).get(); + keyTypes.push_back(vector->dataType.copy()); + if (info->factorizationStateTypes[i] == common::FactorizationStateType::UNFLAT) { + setKeyState(vector->state.get()); + } + keyVectors.push_back(vector); + } + if (keyState == nullptr) { + setKeyState(keyVectors[0]->state.get()); } for (auto& pos : info->payloadsPos) { - vectorsToAppend.push_back(resultSet->getValueVector(pos).get()); + payloadVectors.push_back(resultSet->getValueVector(pos).get()); + } + hashTable = std::make_unique( + *context->memoryManager, std::move(keyTypes), info->tableSchema->copy()); +} + +void HashJoinBuild::setKeyState(common::DataChunkState* state) { + if (keyState == nullptr) { + keyState = state; + } else { + assert(keyState == state); // two pointers should be pointing to the same state } - initLocalHashTable(*context->memoryManager); } void HashJoinBuild::finalize(ExecutionContext* context) { @@ -31,7 +49,7 @@ void HashJoinBuild::executeInternal(ExecutionContext* context) { // Append thread-local tuples while (children[0]->getNextTuple(context)) { for (auto i = 0u; i < resultSet->multiplicity; ++i) { - hashTable->append(vectorsToAppend); + appendVectors(); } } // Merge with global hash table once local tuples are all appended. diff --git a/src/processor/operator/hash_join/hash_join_probe.cpp b/src/processor/operator/hash_join/hash_join_probe.cpp index 1194e3c5d0..ed9f99ce6f 100644 --- a/src/processor/operator/hash_join/hash_join_probe.cpp +++ b/src/processor/operator/hash_join/hash_join_probe.cpp @@ -48,24 +48,8 @@ bool HashJoinProbe::getMatchedTuplesForFlatKey(ExecutionContext* context) { sharedState->getHashTable()->probe( keyVectors, hashVector.get(), tmpHashVector.get(), probeState->probedTuples.get()); } - auto numMatchedTuples = 0; - while (probeState->probedTuples[0]) { - if (numMatchedTuples == DEFAULT_VECTOR_CAPACITY) { - break; - } - auto currentTuple = probeState->probedTuples[0]; - probeState->matchedTuples[numMatchedTuples] = currentTuple; - bool isKeysEqual = true; - for (auto i = 0u; i < keyVectors.size(); i++) { - auto pos = keyVectors[i]->state->selVector->selectedPositions[0]; - if (((nodeID_t*)currentTuple)[i] != keyVectors[i]->getValue(pos)) { - isKeysEqual = false; - break; - } - } - numMatchedTuples += isKeysEqual; - probeState->probedTuples[0] = *sharedState->getHashTable()->getPrevTuple(currentTuple); - } + auto numMatchedTuples = sharedState->getHashTable()->matchFlatKeys( + keyVectors, probeState->probedTuples.get(), probeState->matchedTuples.get()); probeState->matchedSelVector->selectedSize = numMatchedTuples; probeState->nextMatchedTupleIdx = 0; return true; @@ -81,23 +65,9 @@ bool HashJoinProbe::getMatchedTuplesForUnFlatKey(ExecutionContext* context) { saveSelVector(keyVector->state->selVector); sharedState->getHashTable()->probe( keyVectors, hashVector.get(), tmpHashVector.get(), probeState->probedTuples.get()); - auto numMatchedTuples = 0; - auto keySelVector = keyVector->state->selVector.get(); - for (auto i = 0u; i < keySelVector->selectedSize; i++) { - auto pos = keySelVector->selectedPositions[i]; - while (probeState->probedTuples[i]) { - assert(numMatchedTuples <= DEFAULT_VECTOR_CAPACITY); - auto currentTuple = probeState->probedTuples[i]; - if (*(nodeID_t*)currentTuple == keyVectors[0]->getValue(pos)) { - // Break if a match has been found. - probeState->matchedTuples[numMatchedTuples] = currentTuple; - probeState->matchedSelVector->selectedPositions[numMatchedTuples] = pos; - numMatchedTuples++; - break; - } - probeState->probedTuples[i] = *sharedState->getHashTable()->getPrevTuple(currentTuple); - } - } + auto numMatchedTuples = + sharedState->getHashTable()->matchUnFlatKey(keyVector, probeState->probedTuples.get(), + probeState->matchedTuples.get(), probeState->matchedSelVector.get()); probeState->matchedSelVector->selectedSize = numMatchedTuples; probeState->nextMatchedTupleIdx = 0; return true; diff --git a/src/processor/operator/hash_join/join_hash_table.cpp b/src/processor/operator/hash_join/join_hash_table.cpp index 9e69b61362..7a0e1ca44b 100644 --- a/src/processor/operator/hash_join/join_hash_table.cpp +++ b/src/processor/operator/hash_join/join_hash_table.cpp @@ -1,30 +1,41 @@ #include "processor/operator/hash_join/join_hash_table.h" +#include "function/comparison/comparison_functions.h" #include "function/hash/vector_hash_functions.h" using namespace kuzu::common; using namespace kuzu::storage; +using namespace kuzu::function; namespace kuzu { namespace processor { -JoinHashTable::JoinHashTable(MemoryManager& memoryManager, uint64_t numKeyColumns, +JoinHashTable::JoinHashTable(MemoryManager& memoryManager, + std::vector> keyTypes, std::unique_ptr tableSchema) - : BaseHashTable{memoryManager}, numKeyColumns{numKeyColumns} { + : BaseHashTable{memoryManager}, keyTypes{std::move(keyTypes)} { auto numSlotsPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / sizeof(uint8_t*); - assert(numSlotsPerBlock == nextPowerOfTwo(numSlotsPerBlock)); - numSlotsPerBlockLog2 = std::log2(numSlotsPerBlock); - slotIdxInBlockMask = BitmaskUtils::all1sMaskForLeastSignificantBits(numSlotsPerBlockLog2); + initSlotConstant(numSlotsPerBlock); // Prev pointer is always the last column in the table. - colOffsetOfPrevPtrInTuple = tableSchema->getColOffset(tableSchema->getNumColumns() - 1); + prevPtrColOffset = tableSchema->getColOffset(tableSchema->getNumColumns() - 1); factorizedTable = std::make_unique(&memoryManager, std::move(tableSchema)); + this->tableSchema = factorizedTable->getTableSchema(); + initFunctions(); } -bool JoinHashTable::discardNullFromKeys( - const std::vector& vectors, uint32_t numKeyVectors) { +void JoinHashTable::initFunctions() { + entryHashFunctions.resize(keyTypes.size()); + entryCompareFunctions.resize(keyTypes.size()); + for (auto i = 0u; i < keyTypes.size(); ++i) { + getHashFunction(keyTypes[i]->getPhysicalType(), entryHashFunctions[i]); + getCompareFunction(keyTypes[i]->getPhysicalType(), entryCompareFunctions[i]); + } +} + +static bool discardNullFromKeys(const std::vector& vectors) { bool hasNonNullKeys = true; - for (auto i = 0u; i < numKeyVectors; i++) { - if (!NodeIDVector::discardNull(*vectors[i])) { + for (auto& vector : vectors) { + if (!ValueVector::discardNull(*vector)) { hasNonNullKeys = false; break; } @@ -32,38 +43,71 @@ bool JoinHashTable::discardNullFromKeys( return hasNonNullKeys; } -// TODO(Guodong): refactor this function to partially re-use FactorizedTable::append, but calculate -// numTuplesToAppend here. The refactor should also handle the case where multiple unFlat vectors -// are in the same dataChunk. -void JoinHashTable::append(const std::vector& vectorsToAppend) { - if (!discardNullFromKeys(vectorsToAppend, numKeyColumns)) { - return; +void JoinHashTable::appendVectors(const std::vector& keyVectors, + const std::vector& payloadVectors, DataChunkState* keyState) { + discardNullFromKeys(keyVectors); + auto numTuplesToAppend = keyState->selVector->selectedSize; + auto appendInfos = factorizedTable->allocateFlatTupleBlocks(numTuplesToAppend); + auto colIdx = 0u; + for (auto& vector : keyVectors) { + appendVector(vector, appendInfos, colIdx++); + } + for (auto& vector : payloadVectors) { + appendVector(vector, appendInfos, colIdx++); + } + factorizedTable->numTuples += numTuplesToAppend; +} + +void JoinHashTable::appendVector( + ValueVector* vector, const std::vector& appendInfos, ft_col_idx_t colIdx) { + auto numAppendedTuples = 0ul; + for (auto& blockAppendInfo : appendInfos) { + factorizedTable->copyVectorToColumn(*vector, blockAppendInfo, numAppendedTuples, colIdx); + numAppendedTuples += blockAppendInfo.numTuplesToAppend; + } +} + +static void sortSelectedPos(ValueVector* nodeIDVector) { + auto selVector = nodeIDVector->state->selVector.get(); + auto size = selVector->selectedSize; + auto selectedPos = selVector->getSelectedPositionsBuffer(); + if (selVector->isUnfiltered()) { + memcpy(selectedPos, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t)); + selVector->resetSelectorToValuePosBuffer(); } - // TODO(Guodong): use compiling information to remove the for loop. + std::sort(selectedPos, selectedPos + size, [nodeIDVector](sel_t left, sel_t right) { + return nodeIDVector->getValue(left) < nodeIDVector->getValue(right); + }); +} + +void JoinHashTable::appendVectorWithSorting( + ValueVector* keyVector, std::vector payloadVectors) { auto numTuplesToAppend = 1; - for (auto i = 0u; i < numKeyColumns; i++) { - // At most one unFlat key data chunk. If there are multiple unFlat key vectors, they must - // share the same state. - if (!vectorsToAppend[i]->state->isFlat()) { - numTuplesToAppend = vectorsToAppend[i]->state->selVector->selectedSize; - break; - } + assert(keyVector->state->selVector->selectedSize == 1); + // Based on the way we are planning, we assume that the first and second vectors are both + // nodeIDs from extending, while the first one is key, and the second one is payload. + auto payloadNodeIDVector = payloadVectors[0]; + auto payloadsState = payloadNodeIDVector->state.get(); + if (!payloadsState->isFlat()) { + // Sorting is only needed when the payload is unflat (a list of values). + sortSelectedPos(payloadNodeIDVector); } + // A single appendInfo will return from `allocateFlatTupleBlocks` when numTuplesToAppend is 1. auto appendInfos = factorizedTable->allocateFlatTupleBlocks(numTuplesToAppend); - for (auto i = 0u; i < vectorsToAppend.size(); i++) { - auto numAppendedTuples = 0ul; - for (auto& blockAppendInfo : appendInfos) { - factorizedTable->copyVectorToColumn( - *vectorsToAppend[i], blockAppendInfo, numAppendedTuples, i); - numAppendedTuples += blockAppendInfo.numTuplesToAppend; - } + assert(appendInfos.size() == 1); + auto colIdx = 0u; + factorizedTable->copyVectorToColumn(*keyVector, appendInfos[0], numTuplesToAppend, colIdx++); + for (auto& vector : payloadVectors) { + factorizedTable->copyVectorToColumn(*vector, appendInfos[0], numTuplesToAppend, colIdx++); + } + if (!payloadsState->isFlat()) { + payloadsState->selVector->resetSelectorToUnselected(); } factorizedTable->numTuples += numTuplesToAppend; } void JoinHashTable::allocateHashSlots(uint64_t numTuples) { - maxNumHashSlots = nextPowerOfTwo(numTuples * 2); - bitmask = maxNumHashSlots - 1; + setMaxNumHashSlots(nextPowerOfTwo(numTuples * 2)); auto numSlotsPerBlock = (uint64_t)1 << numSlotsPerBlockLog2; auto numBlocksNeeded = (maxNumHashSlots + numSlotsPerBlock - 1) / numSlotsPerBlock; while (hashSlotsBlocks.size() < numBlocksNeeded) { @@ -85,31 +129,70 @@ void JoinHashTable::buildHashSlots() { void JoinHashTable::probe(const std::vector& keyVectors, ValueVector* hashVector, ValueVector* tmpHashVector, uint8_t** probedTuples) { - assert(keyVectors.size() == numKeyColumns); + assert(keyVectors.size() == keyTypes.size()); if (getNumTuples() == 0) { return; } - if (!discardNullFromKeys(keyVectors, numKeyColumns)) { + if (!discardNullFromKeys(keyVectors)) { return; } function::VectorHashFunction::computeHash(keyVectors[0], hashVector); - for (auto i = 1u; i < numKeyColumns; i++) { + for (auto i = 1u; i < keyVectors.size(); i++) { function::VectorHashFunction::computeHash(keyVectors[i], tmpHashVector); function::VectorHashFunction::combineHash(hashVector, tmpHashVector, hashVector); } for (auto i = 0u; i < hashVector->state->selVector->selectedSize; i++) { auto pos = hashVector->state->selVector->selectedPositions[i]; + assert(i < DEFAULT_VECTOR_CAPACITY); probedTuples[i] = getTupleForHash(hashVector->getValue(pos)); } } -uint8_t** JoinHashTable::findHashSlot(nodeID_t* nodeIDs) const { +sel_t JoinHashTable::matchFlatKeys( + const std::vector& keyVectors, uint8_t** probedTuples, uint8_t** matchedTuples) { + auto numMatchedTuples = 0; + while (probedTuples[0]) { + if (numMatchedTuples == DEFAULT_VECTOR_CAPACITY) { + break; + } + auto currentTuple = probedTuples[0]; + matchedTuples[numMatchedTuples] = currentTuple; + numMatchedTuples += compareFlatKeys(keyVectors, currentTuple); + probedTuples[0] = *getPrevTuple(currentTuple); + } + return numMatchedTuples; +} + +sel_t JoinHashTable::matchUnFlatKey(ValueVector* keyVector, uint8_t** probedTuples, + uint8_t** matchedTuples, SelectionVector* matchedTuplesSelVector) { + auto numMatchedTuples = 0; + for (auto i = 0u; i < keyVector->state->selVector->selectedSize; ++i) { + auto pos = keyVector->state->selVector->selectedPositions[i]; + while (probedTuples[i]) { + auto currentTuple = probedTuples[i]; + uint8_t entryCompareResult = false; + entryCompareFunctions[0](*keyVector, pos, currentTuple, entryCompareResult); + if (entryCompareResult) { + matchedTuples[numMatchedTuples] = currentTuple; + matchedTuplesSelVector->selectedPositions[numMatchedTuples] = pos; + numMatchedTuples++; + break; + } + probedTuples[i] = *getPrevTuple(currentTuple); + } + } + return numMatchedTuples; +} + +uint8_t** JoinHashTable::findHashSlot(uint8_t* tuple) const { + auto idx = 0u; hash_t hash; - function::Hash::operation(nodeIDs[0], false /* isNull */, hash); - for (auto i = 1u; i < numKeyColumns; i++) { - hash_t newHash; - function::Hash::operation(nodeIDs[i], false /* isNull */, newHash); - function::CombineHash::operation(hash, newHash, hash); + entryHashFunctions[idx++](tuple, hash); + hash_t tmpHash; + while (idx < keyTypes.size()) { + entryHashFunctions[idx](tuple + tableSchema->getColOffset(idx), tmpHash); + function::CombineHash::operation(hash, tmpHash, hash); + idx++; } auto slotIdx = getSlotIdxForHash(hash); return (uint8_t**)(hashSlotsBlocks[slotIdx >> numSlotsPerBlockLog2]->getData() + @@ -117,11 +200,111 @@ uint8_t** JoinHashTable::findHashSlot(nodeID_t* nodeIDs) const { } uint8_t* JoinHashTable::insertEntry(uint8_t* tuple) const { - auto slot = findHashSlot((nodeID_t*)tuple); + auto slot = findHashSlot(tuple); auto prevPtr = *slot; *slot = tuple; return prevPtr; } +bool JoinHashTable::compareFlatKeys( + const std::vector& keyVectors, const uint8_t* tuple) { + uint8_t equal = false; + for (auto i = 0u; i < keyVectors.size(); i++) { + auto keyVector = keyVectors[i]; + assert(keyVector->state->selVector->selectedSize == 1); + auto pos = keyVector->state->selVector->selectedPositions[0]; + entryCompareFunctions[i](*keyVector, pos, tuple + tableSchema->getColOffset(i), equal); + if (!equal) { + return false; + } + } + return true; +} + +template +static void hashEntry(const uint8_t* entry, hash_t& result) { + Hash::operation(*(T*)entry, result); +} + +void JoinHashTable::getHashFunction(PhysicalTypeID physicalTypeID, hash_function_t& func) { + switch (physicalTypeID) { + case PhysicalTypeID::INTERNAL_ID: { + func = hashEntry; + } break; + case PhysicalTypeID::BOOL: { + func = hashEntry; + } break; + case PhysicalTypeID::INT64: { + func = hashEntry; + } break; + case PhysicalTypeID::INT32: { + func = hashEntry; + } break; + case PhysicalTypeID::INT16: { + func = hashEntry; + } break; + case PhysicalTypeID::DOUBLE: { + func = hashEntry; + } break; + case PhysicalTypeID::FLOAT: { + func = hashEntry; + } break; + case PhysicalTypeID::STRING: { + func = hashEntry; + } break; + case PhysicalTypeID::INTERVAL: { + func = hashEntry; + } break; + default: { + throw RuntimeException("Join hash table cannot hash data type " + + PhysicalTypeUtils::physicalTypeToString(physicalTypeID)); + } + } +} + +template +static void compareEntry( + const ValueVector& vector, uint32_t pos, const uint8_t* entry, uint8_t& result) { + Equals::operation(vector.getValue(pos), *(T*)entry, result, nullptr /* leftVector */, + nullptr /* rightVector */); +} + +void JoinHashTable::getCompareFunction( + PhysicalTypeID physicalTypeID, JoinHashTable::compare_function_t& func) { + switch (physicalTypeID) { + case PhysicalTypeID::INTERNAL_ID: { + func = compareEntry; + } break; + case PhysicalTypeID::BOOL: { + func = compareEntry; + } break; + case PhysicalTypeID::INT64: { + func = compareEntry; + } break; + case PhysicalTypeID::INT32: { + func = compareEntry; + } break; + case PhysicalTypeID::INT16: { + func = compareEntry; + } break; + case PhysicalTypeID::DOUBLE: { + func = compareEntry; + } break; + case PhysicalTypeID::FLOAT: { + func = compareEntry; + } break; + case PhysicalTypeID::STRING: { + func = compareEntry; + } break; + case PhysicalTypeID::INTERVAL: { + func = compareEntry; + } break; + default: { + throw RuntimeException("Join hash table cannot compare data type " + + PhysicalTypeUtils::physicalTypeToString(physicalTypeID)); + } + } +} + } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/intersect/CMakeLists.txt b/src/processor/operator/intersect/CMakeLists.txt index 4419fbea0f..92a2755b6d 100644 --- a/src/processor/operator/intersect/CMakeLists.txt +++ b/src/processor/operator/intersect/CMakeLists.txt @@ -1,7 +1,6 @@ add_library(kuzu_processor_operator_intersect OBJECT - intersect.cpp - intersect_hash_table.cpp) + intersect.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/processor/operator/intersect/intersect_hash_table.cpp b/src/processor/operator/intersect/intersect_hash_table.cpp deleted file mode 100644 index efb15a10b8..0000000000 --- a/src/processor/operator/intersect/intersect_hash_table.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "processor/operator/intersect/intersect_hash_table.h" - -using namespace kuzu::common; - -namespace kuzu { -namespace processor { - -static void sortSelectedPos(ValueVector* nodeIDVector) { - auto selVector = nodeIDVector->state->selVector.get(); - auto size = selVector->selectedSize; - auto selectedPos = selVector->getSelectedPositionsBuffer(); - if (selVector->isUnfiltered()) { - memcpy(selectedPos, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t)); - selVector->resetSelectorToValuePosBuffer(); - } - std::sort(selectedPos, selectedPos + size, [nodeIDVector](sel_t left, sel_t right) { - return nodeIDVector->getValue(left) < nodeIDVector->getValue(right); - }); -} - -void IntersectHashTable::append(const std::vector& vectorsToAppend) { - auto numTuplesToAppend = 1; - // Based on the way we are planning, we assume that the first and second vectors are both - // nodeIDs from extending, while the first one is key, and the second one is payload. - auto keyState = vectorsToAppend[0]->state.get(); - auto payloadNodeIDVector = vectorsToAppend[1]; - auto payloadsState = payloadNodeIDVector->state.get(); - assert(keyState->isFlat()); - if (!payloadsState->isFlat()) { - // Sorting is only needed when the payload is unflat (a list of values). - sortSelectedPos(payloadNodeIDVector); - } - // A single appendInfo will return from `allocateFlatTupleBlocks` when numTuplesToAppend is 1. - auto appendInfos = factorizedTable->allocateFlatTupleBlocks(numTuplesToAppend); - assert(appendInfos.size() == 1); - for (auto i = 0u; i < vectorsToAppend.size(); i++) { - factorizedTable->copyVectorToColumn( - *vectorsToAppend[i], appendInfos[0], numTuplesToAppend, i); - } - if (!payloadsState->isFlat()) { - payloadsState->selVector->resetSelectorToUnselected(); - } - factorizedTable->numTuples += numTuplesToAppend; -} - -} // namespace processor -} // namespace kuzu diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index 11e6a2bd54..4f4d210ef5 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -93,7 +93,7 @@ void DirectedRelTableData::scanColumns(transaction::Transaction* transaction, const std::vector& outputVectors) { // Note: The scan operator should guarantee that the first property in the output is adj column. adjColumn->read(transaction, inNodeIDVector, outputVectors[0]); - if (!NodeIDVector::discardNull(*outputVectors[0])) { + if (!ValueVector::discardNull(*outputVectors[0])) { return; } fillNbrTableIDs(outputVectors[0]); diff --git a/test/test_files/tinysnb/generic_hash_join/basic.test b/test/test_files/tinysnb/generic_hash_join/basic.test new file mode 100644 index 0000000000..cfd8cb65f1 --- /dev/null +++ b/test/test_files/tinysnb/generic_hash_join/basic.test @@ -0,0 +1,60 @@ +-GROUP TinySnbReadTest +-DATASET CSV tinysnb + +-- + +-CASE BasicGenericHashJoin + +-STATEMENT MATCH (a:person), (b:person) WHERE a.ID = b.ID AND a.ID = 7 RETURN a.fName, b.fName, a.grades, b.grades +---- 1 +Elizabeth|Elizabeth|[96,59,65,88]|[96,59,65,88] + +-STATEMENT MATCH (a:person), (b:person) WHERE a.fName = b.fName AND a.ID < 6 RETURN a.fName, b.fName, a.ID, b.ID +-ENCODED_JOIN HJ(a.fName=b.fName){S(a)}{S(b)} +---- 4 +Alice|Alice|0|0 +Bob|Bob|2|2 +Carol|Carol|3|3 +Dan|Dan|5|5 + +-STATEMENT MATCH (a:person), (b:person) WHERE a.isWorker = b.isStudent AND a.ID = 0 RETURN b.fName +---- 5 +Carol +Dan +Elizabeth +Greg +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff + +-STATEMENT MATCH (a:person), (b:person) WHERE a.height = b.height RETURN COUNT(*) +---- 1 +8 + +-STATEMENT MATCH (a:person), (b:person) + WHERE a.age = b.age + AND a.eyeSight = b.eyeSight + AND a.lastJobDuration = b.lastJobDuration + AND a.ID > 7 + RETURN a.fName, b.fName +---- 3 +Farooq|Farooq +Greg|Greg +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff + +# Note vMovies doesn't have ID property +-STATEMENT MATCH (a), (b) WHERE a.ID = b.ID RETURN COUNT(*) +---- 1 +11 + +-STATEMENT MATCH (a:person)-[e1:knows]->(b:person) , (c:person)-[e2:knows]->(d:person) WHERE a.ID = 0 AND c.ID = 2 AND e1.date = e2.date RETURN id(e1), e1.date, id(e2), e2.date +---- 3 +3:0|2021-06-30|3:3|2021-06-30 +3:1|2021-06-30|3:3|2021-06-30 +3:2|2021-06-30|3:3|2021-06-30 + +-STATEMENT MATCH (a), (b) WHERE a.name = b.name AND a.length = b.length AND a.note = b.note RETURN a.description, b.description +---- 3 +{rating: 1223.000000, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22}|{rating: 1223.000000, views: 10003, release: 2011-02-11 16:44:22, film: 2013-02-22} +{rating: 5.300000, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11}|{rating: 5.300000, views: 152, release: 2011-08-20 11:25:30, film: 2012-05-11} +{rating: 7.000000, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12}|{rating: 7.000000, views: 982, release: 2018-11-13 13:33:11, film: 2014-09-12} + +