From bdb679c359da2d1727ad28b580608ad633198064 Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Thu, 23 Feb 2023 10:49:32 -0500 Subject: [PATCH] Fix #998 --- .../processor/operator/intersect/intersect.h | 18 +++- .../operator/intersect/intersect.cpp | 90 +++++++++++++------ .../intersect/intersect_hash_table.cpp | 11 ++- 3 files changed, 83 insertions(+), 36 deletions(-) diff --git a/src/include/processor/operator/intersect/intersect.h b/src/include/processor/operator/intersect/intersect.h index 4d84303ca0c..503fbd3b51f 100644 --- a/src/include/processor/operator/intersect/intersect.h +++ b/src/include/processor/operator/intersect/intersect.h @@ -19,7 +19,11 @@ class Intersect : public PhysicalOperator { const std::string& paramsString) : PhysicalOperator{PhysicalOperatorType::INTERSECT, std::move(children), id, paramsString}, outputDataPos{outputDataPos}, - intersectDataInfos{std::move(intersectDataInfos)}, sharedHTs{std::move(sharedHTs)} {} + intersectDataInfos{std::move(intersectDataInfos)}, sharedHTs{std::move(sharedHTs)} { + tupleIdxPerBuildSide.resize(this->sharedHTs.size(), 0); + carryBuildSideIdx = -1u; + probedFlatTuples.resize(this->sharedHTs.size()); + } void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; @@ -33,14 +37,17 @@ class Intersect : public PhysicalOperator { } private: - std::vector getProbeKeys(); - std::vector probeHTs(const std::vector& keys); + // For each build side, probe its HT and return a vector of matched flat tuples. + void probeHTs(); // Left is always the one with less num of values. static void twoWayIntersect(common::nodeID_t* leftNodeIDs, common::SelectionVector& lSelVector, common::nodeID_t* rightNodeIDs, common::SelectionVector& rSelVector); void intersectLists(const std::vector& listsToIntersect); void populatePayloads( const std::vector& tuples, const std::vector& listIdxes); + bool hasNextTuplesToIntersect(); + + inline uint32_t getNumBuilds() { return sharedHTs.size(); } private: DataPos outputDataPos; @@ -53,6 +60,11 @@ class Intersect : public PhysicalOperator { std::vector> intersectSelVectors; std::vector> sharedHTs; std::vector isIntersectListAFlatValue; + std::vector> probedFlatTuples; + // Keep track of the tuple to intersect for each build side. + std::vector tupleIdxPerBuildSide; + // This is used to indicate which build side to increment the tuple idx for. + uint32_t carryBuildSideIdx; }; } // namespace processor diff --git a/src/processor/operator/intersect/intersect.cpp b/src/processor/operator/intersect/intersect.cpp index d791f1153f1..25017b13896 100644 --- a/src/processor/operator/intersect/intersect.cpp +++ b/src/processor/operator/intersect/intersect.cpp @@ -30,20 +30,23 @@ void Intersect::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* c } } -std::vector Intersect::probeHTs(const std::vector& keys) { - std::vector tuples(keys.size()); - hash_t tmpHash; - for (auto i = 0u; i < keys.size(); i++) { - Hash::operation(keys[i], false, tmpHash); - tuples[i] = sharedHTs[i]->getHashTable()->getTupleForHash(tmpHash); - while (tuples[i]) { - if (*(nodeID_t*)tuples[i] == keys[i]) { - break; // The build side should guarantee each key only has one matching tuple. +void Intersect::probeHTs() { + std::vector> flatTuples(probeKeyVectors.size()); + hash_t hashVal; + for (auto i = 0u; i < probeKeyVectors.size(); i++) { + assert(probeKeyVectors[i]->state->isFlat()); + probedFlatTuples[i].clear(); + auto key = probeKeyVectors[i]->getValue( + probeKeyVectors[i]->state->selVector->selectedPositions[0]); + Hash::operation(key, false, hashVal); + auto flatTuple = sharedHTs[i]->getHashTable()->getTupleForHash(hashVal); + while (flatTuple) { + if (*(nodeID_t*)flatTuple == key) { + probedFlatTuples[i].push_back(flatTuple); } - tuples[i] = *sharedHTs[i]->getHashTable()->getPrevTuple(tuples[i]); + flatTuple = *sharedHTs[i]->getHashTable()->getPrevTuple(flatTuple); } } - return tuples; } void Intersect::twoWayIntersect(nodeID_t* leftNodeIDs, SelectionVector& lSelVector, @@ -71,23 +74,10 @@ void Intersect::twoWayIntersect(nodeID_t* leftNodeIDs, SelectionVector& lSelVect rSelVector.resetSelectorToValuePosBufferWithSize(outputValuePosition); } -std::vector Intersect::getProbeKeys() { - std::vector keys(probeKeyVectors.size()); - for (auto i = 0u; i < keys.size(); i++) { - assert(probeKeyVectors[i]->state->isFlat()); - keys[i] = probeKeyVectors[i]->getValue( - probeKeyVectors[i]->state->selVector->selectedPositions[0]); - } - return keys; -} - static std::vector fetchListsToIntersectFromTuples( const std::vector& tuples, const std::vector& isFlatValue) { std::vector listsToIntersect(tuples.size()); for (auto i = 0u; i < tuples.size(); i++) { - if (!tuples[i]) { - continue; // overflow_value will be initialized with size 0 for non-matching tuples. - } listsToIntersect[i] = isFlatValue[i] ? overflow_value_t{1 /* numElements */, tuples[i] + sizeof(nodeID_t)} : *(overflow_value_t*)(tuples[i] + sizeof(nodeID_t)); @@ -160,17 +150,59 @@ void Intersect::populatePayloads( } } +bool Intersect::hasNextTuplesToIntersect() { + tupleIdxPerBuildSide[carryBuildSideIdx]++; + if (tupleIdxPerBuildSide[carryBuildSideIdx] == probedFlatTuples[carryBuildSideIdx].size()) { + if (carryBuildSideIdx == 0) { + return false; + } + tupleIdxPerBuildSide[carryBuildSideIdx] = 0; + carryBuildSideIdx--; + if (!hasNextTuplesToIntersect()) { + return false; + } + carryBuildSideIdx++; + } + return true; +} + bool Intersect::getNextTuplesInternal() { do { - if (!children[0]->getNextTuple()) { - return false; + while (carryBuildSideIdx == -1u) { + if (!children[0]->getNextTuple()) { + return false; + } + // For each build side, probe its HT and return a vector of matched flat tuples. + probeHTs(); + auto maxNumTuplesToIntersect = 1u; + for (auto i = 0u; i < getNumBuilds(); i++) { + maxNumTuplesToIntersect *= probedFlatTuples[i].size(); + } + if (maxNumTuplesToIntersect == 0) { + // Skip if any build side has no matches. + continue; + } + carryBuildSideIdx = getNumBuilds() - 1; + std::fill(tupleIdxPerBuildSide.begin(), tupleIdxPerBuildSide.end(), 0); } - auto tuples = probeHTs(getProbeKeys()); - auto listsToIntersect = fetchListsToIntersectFromTuples(tuples, isIntersectListAFlatValue); + // Cartesian product of all flat tuples probed from all build sides. + // Notice: when there are large adjacency lists in the build side, which means the list is + // too large to fit in a single ValueVector, we end up chunking the list as multiple tuples + // in FTable. Thus, when performing the intersection, we need to perform cartesian product + // between all flat tuples probed from all build sides. + std::vector flatTuplesToIntersect(getNumBuilds()); + for (auto i = 0u; i < getNumBuilds(); i++) { + flatTuplesToIntersect[i] = probedFlatTuples[i][tupleIdxPerBuildSide[i]]; + } + auto listsToIntersect = + fetchListsToIntersectFromTuples(flatTuplesToIntersect, isIntersectListAFlatValue); auto listIdxes = swapSmallestListToFront(listsToIntersect); intersectLists(listsToIntersect); if (outKeyVector->state->selVector->selectedSize != 0) { - populatePayloads(tuples, listIdxes); + populatePayloads(flatTuplesToIntersect, listIdxes); + } + if (!hasNextTuplesToIntersect()) { + carryBuildSideIdx = -1u; } } while (outKeyVector->state->selVector->selectedSize == 0); return true; diff --git a/src/processor/operator/intersect/intersect_hash_table.cpp b/src/processor/operator/intersect/intersect_hash_table.cpp index 542644feb02..1d73c745523 100644 --- a/src/processor/operator/intersect/intersect_hash_table.cpp +++ b/src/processor/operator/intersect/intersect_hash_table.cpp @@ -5,7 +5,7 @@ using namespace kuzu::common; namespace kuzu { namespace processor { -static void sortSelectedPos(const std::shared_ptr& nodeIDVector) { +static void sortSelectedPos(ValueVector* nodeIDVector) { auto selVector = nodeIDVector->state->selVector.get(); auto size = selVector->selectedSize; auto selectedPos = selVector->getSelectedPositionsBuffer(); @@ -23,16 +23,19 @@ void IntersectHashTable::append(const std::vector>& // Based on the way we are planning, we assume that the first and second vectors are both // nodeIDs from extending, while the first one is key, and the second one is payload. auto keyState = vectorsToAppend[0]->state.get(); - auto payloadNodeIDVector = vectorsToAppend[1]; + auto payloadNodeIDVector = vectorsToAppend[1].get(); auto payloadsState = payloadNodeIDVector->state.get(); assert(keyState->isFlat()); if (!payloadsState->isFlat()) { + // Sorting is only needed when the payload is unflat (a list of values). sortSelectedPos(payloadNodeIDVector); } // A single appendInfo will return from `allocateFlatTupleBlocks` when numTuplesToAppend is 1. - auto appendInfo = factorizedTable->allocateFlatTupleBlocks(numTuplesToAppend)[0]; + auto appendInfos = factorizedTable->allocateFlatTupleBlocks(numTuplesToAppend); + assert(appendInfos.size() == 1); for (auto i = 0u; i < vectorsToAppend.size(); i++) { - factorizedTable->copyVectorToColumn(*vectorsToAppend[i], appendInfo, numTuplesToAppend, i); + factorizedTable->copyVectorToColumn( + *vectorsToAppend[i], appendInfos[0], numTuplesToAppend, i); } if (!payloadsState->isFlat()) { payloadsState->selVector->resetSelectorToUnselected();