diff --git a/src/common/data_chunk/data_chunk_state.cpp b/src/common/data_chunk/data_chunk_state.cpp index af5c739c7b..8faecbccbe 100644 --- a/src/common/data_chunk/data_chunk_state.cpp +++ b/src/common/data_chunk/data_chunk_state.cpp @@ -14,7 +14,7 @@ void DataChunkState::slice(offset_t offset) { // NOTE: this operation has performance penalty. Ideally we should directly modify selVector // instead of creating a new one. auto slicedSelVector = std::make_unique(DEFAULT_VECTOR_CAPACITY); - slicedSelVector->resetSelectorToValuePosBufferWithSize(selVector->selectedSize - offset); + slicedSelVector->setToFiltered(selVector->selectedSize - offset); for (auto i = 0u; i < slicedSelVector->selectedSize; i++) { slicedSelVector->selectedPositions[i] = selVector->selectedPositions[i + offset]; } diff --git a/src/common/vector/value_vector.cpp b/src/common/vector/value_vector.cpp index 6ec432c9c4..9b69223dc3 100644 --- a/src/common/vector/value_vector.cpp +++ b/src/common/vector/value_vector.cpp @@ -41,11 +41,12 @@ bool ValueVector::discardNull(ValueVector& vector) { } else { auto selectedPos = 0u; if (vector.state->selVector->isUnfiltered()) { - vector.state->selVector->resetSelectorToValuePosBuffer(); + auto buffer = vector.state->selVector->getMultableBuffer(); for (auto i = 0u; i < vector.state->selVector->selectedSize; i++) { - vector.state->selVector->selectedPositions[selectedPos] = i; + buffer[selectedPos] = i; selectedPos += !vector.isNull(i); } + vector.state->selVector->setToFiltered(); } else { for (auto i = 0u; i < vector.state->selVector->selectedSize; i++) { auto pos = vector.state->selVector->selectedPositions[i]; diff --git a/src/expression_evaluator/case_evaluator.cpp b/src/expression_evaluator/case_evaluator.cpp index 24c622d28a..11b731b6a4 100644 --- a/src/expression_evaluator/case_evaluator.cpp +++ b/src/expression_evaluator/case_evaluator.cpp @@ -12,7 +12,7 @@ void CaseAlternativeEvaluator::init(const ResultSet& resultSet, MemoryManager* m whenEvaluator->init(resultSet, memoryManager); thenEvaluator->init(resultSet, memoryManager); whenSelVector = std::make_unique(DEFAULT_VECTOR_CAPACITY); - whenSelVector->resetSelectorToValuePosBuffer(); + whenSelVector->setToFiltered(); } void CaseExpressionEvaluator::init(const ResultSet& resultSet, MemoryManager* memoryManager) { @@ -51,7 +51,7 @@ bool CaseExpressionEvaluator::select(SelectionVector& selVector, ClientContext* evaluate(clientContext); KU_ASSERT(resultVector->state->selVector->selectedSize == selVector.selectedSize); auto numSelectedValues = 0u; - auto selectedPosBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPosBuffer = selVector.getMultableBuffer(); for (auto i = 0u; i < selVector.selectedSize; ++i) { auto selVectorPos = selVector.selectedPositions[i]; auto resultVectorPos = resultVector->state->selVector->selectedPositions[i]; diff --git a/src/expression_evaluator/function_evaluator.cpp b/src/expression_evaluator/function_evaluator.cpp index eeb05d502b..42f6f7bb20 100644 --- a/src/expression_evaluator/function_evaluator.cpp +++ b/src/expression_evaluator/function_evaluator.cpp @@ -47,7 +47,7 @@ bool FunctionExpressionEvaluator::select( auto numSelectedValues = 0u; for (auto i = 0u; i < resultVector->state->selVector->selectedSize; ++i) { auto pos = resultVector->state->selVector->selectedPositions[i]; - auto selectedPosBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPosBuffer = selVector.getMultableBuffer(); selectedPosBuffer[numSelectedValues] = pos; numSelectedValues += resultVector->getValue(pos); } diff --git a/src/expression_evaluator/reference_evaluator.cpp b/src/expression_evaluator/reference_evaluator.cpp index 6b863a5a62..6066782285 100644 --- a/src/expression_evaluator/reference_evaluator.cpp +++ b/src/expression_evaluator/reference_evaluator.cpp @@ -14,7 +14,7 @@ inline static bool isTrue(ValueVector& vector, uint64_t pos) { bool ReferenceExpressionEvaluator::select( SelectionVector& selVector, ClientContext* /*clientContext*/) { uint64_t numSelectedValues = 0; - auto selectedBuffer = resultVector->state->selVector->getSelectedPositionsBuffer(); + auto selectedBuffer = resultVector->state->selVector->getMultableBuffer(); if (resultVector->state->selVector->isUnfiltered()) { for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) { selectedBuffer[numSelectedValues] = i; diff --git a/src/function/table/call/storage_info.cpp b/src/function/table/call/storage_info.cpp index 792253c5eb..dba3109a32 100644 --- a/src/function/table/call/storage_info.cpp +++ b/src/function/table/call/storage_info.cpp @@ -175,7 +175,7 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output outputVector->copyFromVectorData(i, localVector.get(), i); } } - dataChunk.state->selVector->resetSelectorToUnselectedWithSize(numValuesToOutput); + dataChunk.state->selVector->setToUnfiltered(numValuesToOutput); localState->currChunkIdx++; return numValuesToOutput; } diff --git a/src/function/vector_hash_functions.cpp b/src/function/vector_hash_functions.cpp index ceff3de169..f619690184 100644 --- a/src/function/vector_hash_functions.cpp +++ b/src/function/vector_hash_functions.cpp @@ -12,7 +12,7 @@ static std::unique_ptr computeDataVecHash(ValueVector* operand) { auto numValuesInDataVec = ListVector::getDataVectorSize(operand); ListVector::resizeDataVector(hashVector.get(), numValuesInDataVec); auto selectionState = std::make_shared(); - selectionState->selVector->resetSelectorToValuePosBuffer(); + selectionState->selVector->setToFiltered(); ListVector::getDataVector(operand)->setState(selectionState); auto numValuesComputed = 0u; while (numValuesComputed < numValuesInDataVec) { diff --git a/src/include/common/data_chunk/sel_vector.h b/src/include/common/data_chunk/sel_vector.h index 9b386d4bd7..78517b20d1 100644 --- a/src/include/common/data_chunk/sel_vector.h +++ b/src/include/common/data_chunk/sel_vector.h @@ -11,27 +11,23 @@ class SelectionVector { public: explicit SelectionVector(sel_t capacity) : selectedSize{0} { selectedPositionsBuffer = std::make_unique(capacity); - resetSelectorToUnselected(); + setToUnfiltered(); } - inline bool isUnfiltered() const { - return selectedPositions == (sel_t*)&INCREMENTAL_SELECTED_POS; - } - inline void resetSelectorToUnselected() { - selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS; - } - inline void resetSelectorToUnselectedWithSize(sel_t size) { + bool isUnfiltered() const { return selectedPositions == (sel_t*)&INCREMENTAL_SELECTED_POS; } + void setToUnfiltered() { selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS; } + void setToUnfiltered(sel_t size) { selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS; selectedSize = size; } - inline void resetSelectorToValuePosBuffer() { - selectedPositions = selectedPositionsBuffer.get(); - } - inline void resetSelectorToValuePosBufferWithSize(sel_t size) { + + // Set to filtered is not very accurate. It sets selectedPositions to a mutable array. + void setToFiltered() { selectedPositions = selectedPositionsBuffer.get(); } + void setToFiltered(sel_t size) { selectedPositions = selectedPositionsBuffer.get(); selectedSize = size; } - inline sel_t* getSelectedPositionsBuffer() { return selectedPositionsBuffer.get(); } + sel_t* getMultableBuffer() { return selectedPositionsBuffer.get(); } KUZU_API static const sel_t INCREMENTAL_SELECTED_POS[DEFAULT_VECTOR_CAPACITY]; diff --git a/src/include/function/binary_function_executor.h b/src/include/function/binary_function_executor.h index b3399910ee..3b04943d08 100644 --- a/src/include/function/binary_function_executor.h +++ b/src/include/function/binary_function_executor.h @@ -325,7 +325,7 @@ struct BinaryFunctionExecutor { common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) { auto lPos = left.state->selVector->selectedPositions[0]; uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionsBuffer = selVector.getMultableBuffer(); if (left.isNull(lPos)) { return numSelectedValues; } else if (right.hasNoNullsGuarantee()) { @@ -368,7 +368,7 @@ struct BinaryFunctionExecutor { common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) { auto rPos = right.state->selVector->selectedPositions[0]; uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionsBuffer = selVector.getMultableBuffer(); if (right.isNull(rPos)) { return numSelectedValues; } else if (left.hasNoNullsGuarantee()) { @@ -411,7 +411,7 @@ struct BinaryFunctionExecutor { static bool selectBothUnFlat( common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) { uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionsBuffer = selVector.getMultableBuffer(); if (left.hasNoNullsGuarantee() && right.hasNoNullsGuarantee()) { if (left.state->selVector->isUnfiltered()) { for (auto i = 0u; i < left.state->selVector->selectedSize; i++) { diff --git a/src/include/function/boolean/boolean_function_executor.h b/src/include/function/boolean/boolean_function_executor.h index 2b79d6f8ea..6b1a68cde7 100644 --- a/src/include/function/boolean/boolean_function_executor.h +++ b/src/include/function/boolean/boolean_function_executor.h @@ -170,7 +170,7 @@ struct BinaryBooleanFunctionExecutor { common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) { auto lPos = left.state->selVector->selectedPositions[0]; uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionsBuffer = selVector.getMultableBuffer(); if (right.state->selVector->isUnfiltered()) { for (auto i = 0u; i < right.state->selVector->selectedSize; ++i) { selectOnValue( @@ -192,7 +192,7 @@ struct BinaryBooleanFunctionExecutor { common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) { auto rPos = right.state->selVector->selectedPositions[0]; uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionsBuffer = selVector.getMultableBuffer(); if (left.state->selVector->isUnfiltered()) { for (auto i = 0u; i < left.state->selVector->selectedSize; ++i) { selectOnValue( @@ -213,7 +213,7 @@ struct BinaryBooleanFunctionExecutor { static bool selectBothUnFlat( common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) { uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionsBuffer = selVector.getMultableBuffer(); if (left.state->selVector->isUnfiltered()) { for (auto i = 0u; i < left.state->selVector->selectedSize; ++i) { selectOnValue( @@ -303,7 +303,7 @@ struct UnaryBooleanOperationExecutor { return resultValue == true; } else { uint64_t numSelectedValues = 0; - auto selectedPositionBuffer = selVector.getSelectedPositionsBuffer(); + auto selectedPositionBuffer = selVector.getMultableBuffer(); if (operand.state->selVector->isUnfiltered()) { for (auto i = 0ul; i < operand.state->selVector->selectedSize; i++) { selectOnValue(operand, i, numSelectedValues, selectedPositionBuffer); diff --git a/src/include/function/null/null_function_executor.h b/src/include/function/null/null_function_executor.h index 3a8b5ef974..008ed41c2f 100644 --- a/src/include/function/null/null_function_executor.h +++ b/src/include/function/null/null_function_executor.h @@ -40,10 +40,10 @@ struct NullOperationExecutor { return resultValue == true; } else { uint64_t numSelectedValues = 0; - auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer(); + auto buffer = selVector.getMultableBuffer(); for (auto i = 0ul; i < operand.state->selVector->selectedSize; i++) { auto pos = operand.state->selVector->selectedPositions[i]; - selectOnValue(operand, pos, numSelectedValues, selectedPositionsBuffer); + selectOnValue(operand, pos, numSelectedValues, buffer); } selVector.selectedSize = numSelectedValues; return numSelectedValues > 0; diff --git a/src/include/function/path/path_function_executor.h b/src/include/function/path/path_function_executor.h index 61af150bbd..435d6d39f1 100644 --- a/src/include/function/path/path_function_executor.h +++ b/src/include/function/path/path_function_executor.h @@ -99,7 +99,7 @@ struct UnaryPathExecutor { common::StructVector::getFieldVector(listDataVector, fieldIdx).get(); std::unordered_set internalIDSet; auto numSelectedValues = 0u; - auto buffer = selectionVector.getSelectedPositionsBuffer(); + auto buffer = selectionVector.getMultableBuffer(); if (inputSelVector.isUnfiltered()) { for (auto i = 0u; i < inputSelVector.selectedSize; ++i) { auto& listEntry = listVector.getValue(i); diff --git a/src/include/processor/operator/hash_join/hash_join_probe.h b/src/include/processor/operator/hash_join/hash_join_probe.h index f2d3b31145..d59c9b7bed 100644 --- a/src/include/processor/operator/hash_join/hash_join_probe.h +++ b/src/include/processor/operator/hash_join/hash_join_probe.h @@ -15,7 +15,7 @@ struct ProbeState { probedTuples = std::make_unique(common::DEFAULT_VECTOR_CAPACITY); matchedSelVector = std::make_unique(common::DEFAULT_VECTOR_CAPACITY); - matchedSelVector->resetSelectorToValuePosBuffer(); + matchedSelVector->setToFiltered(); } // Each key corresponds to a pointer with the same hash value from the ht directory. diff --git a/src/processor/operator/filter.cpp b/src/processor/operator/filter.cpp index 6e420dddfd..ef8ec8d233 100644 --- a/src/processor/operator/filter.cpp +++ b/src/processor/operator/filter.cpp @@ -23,7 +23,7 @@ bool Filter::getNextTuplesInternal(ExecutionContext* context) { *dataChunkToSelect->state->selVector, context->clientContext); if (!dataChunkToSelect->state->isFlat() && dataChunkToSelect->state->selVector->isUnfiltered()) { - dataChunkToSelect->state->selVector->resetSelectorToValuePosBuffer(); + dataChunkToSelect->state->selVector->setToFiltered(); } } while (!hasAtLeastOneSelectedValue); metrics->numOutputTuple.increase(dataChunkToSelect->state->selVector->selectedSize); @@ -44,14 +44,14 @@ bool NodeLabelFiler::getNextTuplesInternal(ExecutionContext* context) { } saveSelVector(nodeIDVector->state->selVector); numSelectValue = 0; - auto buffer = nodeIDVector->state->selVector->getSelectedPositionsBuffer(); + auto buffer = nodeIDVector->state->selVector->getMultableBuffer(); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; ++i) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; buffer[numSelectValue] = pos; numSelectValue += info->nodeLabelSet.contains(nodeIDVector->getValue(pos).tableID); } - nodeIDVector->state->selVector->resetSelectorToValuePosBuffer(); + nodeIDVector->state->selVector->setToFiltered(); } while (numSelectValue == 0); nodeIDVector->state->selVector->selectedSize = numSelectValue; metrics->numOutputTuple.increase(nodeIDVector->state->selVector->selectedSize); diff --git a/src/processor/operator/filtering_operator.cpp b/src/processor/operator/filtering_operator.cpp index 2d541bce55..0f72df0a9c 100644 --- a/src/processor/operator/filtering_operator.cpp +++ b/src/processor/operator/filtering_operator.cpp @@ -23,11 +23,11 @@ void SelVectorOverWriter::saveSelVector(std::shared_ptr& selVec void SelVectorOverWriter::resetToCurrentSelVector(std::shared_ptr& selVector) { currentSelVector->selectedSize = selVector->selectedSize; if (selVector->isUnfiltered()) { - currentSelVector->resetSelectorToUnselected(); + currentSelVector->setToUnfiltered(); } else { - memcpy(currentSelVector->getSelectedPositionsBuffer(), selVector->selectedPositions, + memcpy(currentSelVector->getMultableBuffer(), selVector->selectedPositions, selVector->selectedSize * sizeof(sel_t)); - currentSelVector->resetSelectorToValuePosBuffer(); + currentSelVector->setToFiltered(); } selVector = currentSelVector; } diff --git a/src/processor/operator/flatten.cpp b/src/processor/operator/flatten.cpp index cb1948dc75..c815e785ec 100644 --- a/src/processor/operator/flatten.cpp +++ b/src/processor/operator/flatten.cpp @@ -7,7 +7,7 @@ namespace processor { void Flatten::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) { dataChunkState = resultSet->dataChunks[dataChunkToFlattenPos]->state.get(); - currentSelVector->resetSelectorToValuePosBufferWithSize(1 /* size */); + currentSelVector->setToFiltered(1 /* size */); localState = std::make_unique(); } diff --git a/src/processor/operator/hash_join/hash_join_probe.cpp b/src/processor/operator/hash_join/hash_join_probe.cpp index 8305582885..7e8c4f5dcc 100644 --- a/src/processor/operator/hash_join/hash_join_probe.cpp +++ b/src/processor/operator/hash_join/hash_join_probe.cpp @@ -92,12 +92,11 @@ uint64_t HashJoinProbe::getInnerJoinResultForUnFlatKey() { auto keySelVector = keyVectors[0]->state->selVector.get(); if (keySelVector->selectedSize != numTuplesToRead) { // Some keys have no matched tuple. So we modify selected position. - auto keySelectedBuffer = keySelVector->getSelectedPositionsBuffer(); + auto buffer = keySelVector->getMultableBuffer(); for (auto i = 0u; i < numTuplesToRead; i++) { - keySelectedBuffer[i] = probeState->matchedSelVector->selectedPositions[i]; + buffer[i] = probeState->matchedSelVector->selectedPositions[i]; } - keySelVector->selectedSize = numTuplesToRead; - keySelVector->resetSelectorToValuePosBuffer(); + keySelVector->setToFiltered(numTuplesToRead); } sharedState->getHashTable()->lookup(vectorsToReadInto, columnIdxsToReadFrom, probeState->matchedTuples.get(), probeState->nextMatchedTupleIdx, numTuplesToRead); diff --git a/src/processor/operator/hash_join/join_hash_table.cpp b/src/processor/operator/hash_join/join_hash_table.cpp index e50e7a48aa..92cffcef76 100644 --- a/src/processor/operator/hash_join/join_hash_table.cpp +++ b/src/processor/operator/hash_join/join_hash_table.cpp @@ -61,12 +61,12 @@ void JoinHashTable::appendVector( static void sortSelectedPos(ValueVector* nodeIDVector) { auto selVector = nodeIDVector->state->selVector.get(); auto size = selVector->selectedSize; - auto selectedPos = selVector->getSelectedPositionsBuffer(); + auto buffer = selVector->getMultableBuffer(); if (selVector->isUnfiltered()) { - memcpy(selectedPos, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t)); - selVector->resetSelectorToValuePosBuffer(); + memcpy(buffer, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t)); + selVector->setToFiltered(); } - std::sort(selectedPos, selectedPos + size, [nodeIDVector](sel_t left, sel_t right) { + std::sort(buffer, buffer + size, [nodeIDVector](sel_t left, sel_t right) { return nodeIDVector->getValue(left) < nodeIDVector->getValue(right); }); } @@ -80,7 +80,7 @@ void JoinHashTable::appendVectorWithSorting( auto payloadNodeIDVector = payloadVectors[0]; auto payloadsState = payloadNodeIDVector->state.get(); if (!payloadsState->isFlat()) { - // Sorting is only needed when the payload is unflat (a list of values). + // Sorting is only needed when the payload is unFlat (a list of values). sortSelectedPos(payloadNodeIDVector); } // A single appendInfo will return from `allocateFlatTupleBlocks` when numTuplesToAppend is 1. @@ -95,7 +95,9 @@ void JoinHashTable::appendVectorWithSorting( } factorizedTable->copyVectorToColumn(*hashVector, appendInfos[0], numTuplesToAppend, colIdx); if (!payloadsState->isFlat()) { - payloadsState->selVector->resetSelectorToUnselected(); + // TODO(Xiyang): I can no longer recall why I set to un-filtered but this is probably wrong. + // We should set back to the un-sorted state. + payloadsState->selVector->setToUnfiltered(); } factorizedTable->numTuples += numTuplesToAppend; } diff --git a/src/processor/operator/index_scan.cpp b/src/processor/operator/index_scan.cpp index 28e04f66c2..c4b0a44f0a 100644 --- a/src/processor/operator/index_scan.cpp +++ b/src/processor/operator/index_scan.cpp @@ -20,12 +20,13 @@ bool IndexScan::getNextTuplesInternal(ExecutionContext* context) { } saveSelVector(outVector->state->selVector); numSelectedValues = 0u; + auto buffer = outVector->state->selVector->getMultableBuffer(); for (auto i = 0u; i < indexVector->state->selVector->selectedSize; ++i) { auto pos = indexVector->state->selVector->selectedPositions[i]; if (indexVector->isNull(pos)) { continue; } - outVector->state->selVector->getSelectedPositionsBuffer()[numSelectedValues] = pos; + buffer[numSelectedValues] = pos; offset_t nodeOffset = INVALID_OFFSET; numSelectedValues += pkIndex->lookup(context->clientContext->getTx(), indexVector, pos, nodeOffset); @@ -33,7 +34,7 @@ bool IndexScan::getNextTuplesInternal(ExecutionContext* context) { outVector->setValue(pos, nodeID); } if (!outVector->state->isFlat() && outVector->state->selVector->isUnfiltered()) { - outVector->state->selVector->resetSelectorToValuePosBuffer(); + outVector->state->selVector->setToFiltered(); } } while (numSelectedValues == 0); outVector->state->selVector->selectedSize = numSelectedValues; diff --git a/src/processor/operator/intersect/intersect.cpp b/src/processor/operator/intersect/intersect.cpp index 59f7b52ca0..9bae12ce50 100644 --- a/src/processor/operator/intersect/intersect.cpp +++ b/src/processor/operator/intersect/intersect.cpp @@ -55,6 +55,8 @@ void Intersect::probeHTs() { void Intersect::twoWayIntersect(nodeID_t* leftNodeIDs, SelectionVector& lSelVector, nodeID_t* rightNodeIDs, SelectionVector& rSelVector) { KU_ASSERT(lSelVector.selectedSize <= rSelVector.selectedSize); + auto leftPositionBuffer = lSelVector.getMultableBuffer(); + auto rightPositionBuffer = rSelVector.getMultableBuffer(); sel_t leftPosition = 0, rightPosition = 0; uint64_t outputValuePosition = 0; while (leftPosition < lSelVector.selectedSize && rightPosition < rSelVector.selectedSize) { @@ -65,16 +67,16 @@ void Intersect::twoWayIntersect(nodeID_t* leftNodeIDs, SelectionVector& lSelVect } else if (leftNodeID.offset > rightNodeID.offset) { rightPosition++; } else { - lSelVector.getSelectedPositionsBuffer()[outputValuePosition] = leftPosition; - rSelVector.getSelectedPositionsBuffer()[outputValuePosition] = rightPosition; + leftPositionBuffer[outputValuePosition] = leftPosition; + rightPositionBuffer[outputValuePosition] = rightPosition; leftNodeIDs[outputValuePosition] = leftNodeID; leftPosition++; rightPosition++; outputValuePosition++; } } - lSelVector.resetSelectorToValuePosBufferWithSize(outputValuePosition); - rSelVector.resetSelectorToValuePosBufferWithSize(outputValuePosition); + lSelVector.setToFiltered(outputValuePosition); + rSelVector.setToFiltered(outputValuePosition); } static std::vector fetchListsToIntersectFromTuples( @@ -110,9 +112,10 @@ static void sliceSelVectors( for (auto selVec : selVectorsToSlice) { for (auto i = 0u; i < slicer.selectedSize; i++) { auto pos = slicer.selectedPositions[i]; - selVec->getSelectedPositionsBuffer()[i] = selVec->selectedPositions[pos]; + auto buffer = selVec->getMultableBuffer(); + buffer[i] = selVec->selectedPositions[pos]; } - selVec->resetSelectorToValuePosBufferWithSize(slicer.selectedSize); + selVec->setToFiltered(slicer.selectedSize); } } @@ -127,17 +130,16 @@ void Intersect::intersectLists(const std::vector& listsToInter SelectionVector lSelVector(listsToIntersect[0].numElements); lSelVector.selectedSize = listsToIntersect[0].numElements; std::vector selVectorsForIntersectedLists; - intersectSelVectors[0]->resetSelectorToUnselectedWithSize(listsToIntersect[0].numElements); + intersectSelVectors[0]->setToUnfiltered(listsToIntersect[0].numElements); selVectorsForIntersectedLists.push_back(intersectSelVectors[0].get()); for (auto i = 0u; i < listsToIntersect.size() - 1; i++) { - intersectSelVectors[i + 1]->resetSelectorToUnselectedWithSize( - listsToIntersect[i + 1].numElements); + intersectSelVectors[i + 1]->setToUnfiltered(listsToIntersect[i + 1].numElements); twoWayIntersect((nodeID_t*)outKeyVector->getData(), lSelVector, (nodeID_t*)listsToIntersect[i + 1].value, *intersectSelVectors[i + 1]); // Here we need to slice all selVectors that have been previously intersected, as all these // lists need to be selected synchronously to read payloads correctly. sliceSelVectors(selVectorsForIntersectedLists, lSelVector); - lSelVector.resetSelectorToUnselected(); + lSelVector.setToUnfiltered(); selVectorsForIntersectedLists.push_back(intersectSelVectors[i + 1].get()); } outKeyVector->state->selVector->selectedSize = lSelVector.selectedSize; diff --git a/src/processor/operator/order_by/top_k.cpp b/src/processor/operator/order_by/top_k.cpp index c323f30841..381e593746 100644 --- a/src/processor/operator/order_by/top_k.cpp +++ b/src/processor/operator/order_by/top_k.cpp @@ -211,9 +211,8 @@ bool TopKBuffer::compareBoundaryValue(const std::vector& k bool TopKBuffer::compareFlatKeys( vector_idx_t vectorIdxToCompare, std::vector keyVectors) { - std::shared_ptr selVector = - std::make_shared(common::DEFAULT_VECTOR_CAPACITY); - selVector->resetSelectorToValuePosBuffer(); + auto selVector = std::make_shared(common::DEFAULT_VECTOR_CAPACITY); + selVector->setToFiltered(); auto compareResult = compareFuncs[vectorIdxToCompare]( *keyVectors[vectorIdxToCompare], *boundaryVecs[vectorIdxToCompare], *selVector); if (vectorIdxToCompare == keyVectors.size() - 1) { @@ -230,13 +229,13 @@ void TopKBuffer::compareUnflatKeys( vector_idx_t vectorIdxToCompare, std::vector keyVectors) { auto compareSelVector = std::make_shared(common::DEFAULT_VECTOR_CAPACITY); - compareSelVector->resetSelectorToValuePosBuffer(); + compareSelVector->setToFiltered(); compareFuncs[vectorIdxToCompare]( *keyVectors[vectorIdxToCompare], *boundaryVecs[vectorIdxToCompare], *compareSelVector); if (vectorIdxToCompare != keyVectors.size() - 1) { auto equalsSelVector = std::make_shared(common::DEFAULT_VECTOR_CAPACITY); - equalsSelVector->resetSelectorToValuePosBuffer(); + equalsSelVector->setToFiltered(); if (equalsFuncs[vectorIdxToCompare](*keyVectors[vectorIdxToCompare], *boundaryVecs[vectorIdxToCompare], *equalsSelVector)) { keyVectors[vectorIdxToCompare]->state->selVector = equalsSelVector; diff --git a/src/processor/operator/partitioner.cpp b/src/processor/operator/partitioner.cpp index 35838ec8af..6a7605bc1a 100644 --- a/src/processor/operator/partitioner.cpp +++ b/src/processor/operator/partitioner.cpp @@ -155,7 +155,7 @@ void Partitioner::copyDataToPartitions(partition_idx_t partitioningIdx, DataChun vectorsToAppend.push_back(chunkToCopyFrom.getValueVector(j).get()); } SelectionVector selVector(1); - selVector.resetSelectorToValuePosBufferWithSize(1); + selVector.setToFiltered(1); for (auto i = 0u; i < chunkToCopyFrom.state->selVector->selectedSize; i++) { auto posToCopyFrom = chunkToCopyFrom.state->selVector->selectedPositions[i]; auto partitionIdx = partitionIdxes->getValue(posToCopyFrom); diff --git a/src/processor/operator/scan_node_id.cpp b/src/processor/operator/scan_node_id.cpp index 8185109425..60f7d2f701 100644 --- a/src/processor/operator/scan_node_id.cpp +++ b/src/processor/operator/scan_node_id.cpp @@ -68,7 +68,7 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) { if (state == nullptr) { return false; } - outValueVector->state->selVector->resetSelectorToUnselected(); + outValueVector->state->selVector->setToUnfiltered(); auto nodeIDValues = (nodeID_t*)(outValueVector->getData()); auto size = endOffset - startOffset; for (auto i = 0u; i < size; ++i) { @@ -88,19 +88,19 @@ void ScanNodeID::setSelVector(ExecutionContext* context, NodeTableScanState* tab tableState->getTable()->setSelVectorForDeletedOffsets( context->clientContext->getTx(), outValueVector.get()); if (tableState->isSemiMaskEnabled()) { - auto selectedBuffer = outValueVector->state->selVector->getSelectedPositionsBuffer(); + auto buffer = outValueVector->state->selVector->getMultableBuffer(); sel_t prevSelectedSize = outValueVector->state->selVector->selectedSize; // Fill selected positions based on node mask for nodes between the given startOffset and // endOffset. If the node is masked (i.e., valid for read), then it is set to the selected // positions. Finally, we update the selectedSize for selVector. sel_t numSelectedValues = 0; for (auto i = 0u; i < (endOffset - startOffset); i++) { - selectedBuffer[numSelectedValues] = i; + buffer[numSelectedValues] = i; numSelectedValues += tableState->getSemiMask()->isNodeMasked(i + startOffset); } outValueVector->state->selVector->selectedSize = numSelectedValues; if (prevSelectedSize != numSelectedValues) { - outValueVector->state->selVector->resetSelectorToValuePosBuffer(); + outValueVector->state->selVector->setToFiltered(); } } } diff --git a/src/processor/operator/skip.cpp b/src/processor/operator/skip.cpp index a389c541df..6d381ebb07 100644 --- a/src/processor/operator/skip.cpp +++ b/src/processor/operator/skip.cpp @@ -28,17 +28,17 @@ bool Skip::getNextTuplesInternal(ExecutionContext* context) { // If all dataChunks are flat, numTupleAvailable = 1 which means numTupleSkippedBefore = // skipNumber. So execution is handled in above if statement. KU_ASSERT(!dataChunkToSelect->state->isFlat()); - auto selectedPosBuffer = dataChunkToSelect->state->selVector->getSelectedPositionsBuffer(); + auto buffer = dataChunkToSelect->state->selVector->getMultableBuffer(); if (dataChunkToSelect->state->selVector->isUnfiltered()) { for (uint64_t i = numTupleToSkipInCurrentResultSet; i < dataChunkToSelect->state->selVector->selectedSize; ++i) { - selectedPosBuffer[i - numTupleToSkipInCurrentResultSet] = i; + buffer[i - numTupleToSkipInCurrentResultSet] = i; } - dataChunkToSelect->state->selVector->resetSelectorToValuePosBuffer(); + dataChunkToSelect->state->selVector->setToFiltered(); } else { for (uint64_t i = numTupleToSkipInCurrentResultSet; i < dataChunkToSelect->state->selVector->selectedSize; ++i) { - selectedPosBuffer[i - numTupleToSkipInCurrentResultSet] = selectedPosBuffer[i]; + buffer[i - numTupleToSkipInCurrentResultSet] = buffer[i]; } } dataChunkToSelect->state->selVector->selectedSize = diff --git a/src/storage/local_storage/local_rel_table.cpp b/src/storage/local_storage/local_rel_table.cpp index 08e4f1248b..2dac3a77a9 100644 --- a/src/storage/local_storage/local_rel_table.cpp +++ b/src/storage/local_storage/local_rel_table.cpp @@ -31,7 +31,7 @@ row_idx_t LocalRelNG::scanCSR(offset_t srcOffsetInChunk, offset_t posToReadForOf } } auto numRelsRead = rowIdxesToRead.size(); - outputVectors[0]->state->selVector->resetSelectorToUnselectedWithSize(numRelsRead); + outputVectors[0]->state->selVector->setToUnfiltered(numRelsRead); return numRelsRead; } @@ -65,7 +65,7 @@ void LocalRelNG::applyCSRDeletions(offset_t srcOffset, ValueVector* relIDVector) } auto selectPos = 0u; auto selVector = std::make_unique(DEFAULT_VECTOR_CAPACITY); - selVector->resetSelectorToValuePosBuffer(); + selVector->setToFiltered(); for (auto i = 0u; i < relIDVector->state->selVector->selectedSize; i++) { auto relIDPos = relIDVector->state->selVector->selectedPositions[i]; auto relOffset = relIDVector->getValue(relIDPos).offset; @@ -75,7 +75,7 @@ void LocalRelNG::applyCSRDeletions(offset_t srcOffset, ValueVector* relIDVector) selVector->selectedPositions[selectPos++] = relIDPos; } if (selectPos != relIDVector->state->selVector->selectedSize) { - relIDVector->state->selVector->resetSelectorToValuePosBuffer(); + relIDVector->state->selVector->setToFiltered(); memcpy(relIDVector->state->selVector->selectedPositions, selVector->selectedPositions, selectPos * sizeof(sel_t)); relIDVector->state->selVector->selectedSize = selectPos; diff --git a/src/storage/stats/node_table_statistics.cpp b/src/storage/stats/node_table_statistics.cpp index e890fb92d4..4721df8c09 100644 --- a/src/storage/stats/node_table_statistics.cpp +++ b/src/storage/stats/node_table_statistics.cpp @@ -113,7 +113,7 @@ void NodeTableStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(ValueVector* no auto originalSize = nodeIDVector->state->getOriginalSize(); auto itr = deletedNodeOffsets.begin(); common::sel_t numSelectedValue = 0; - auto selectedBuffer = nodeIDVector->state->selVector->getSelectedPositionsBuffer(); + auto selectedBuffer = nodeIDVector->state->selVector->getMultableBuffer(); KU_ASSERT(nodeIDVector->state->selVector->isUnfiltered()); for (auto pos = 0u; pos < nodeIDVector->state->getOriginalSize(); ++pos) { if (itr == deletedNodeOffsets.end()) { // no more deleted offset to check. @@ -127,7 +127,7 @@ void NodeTableStatsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(ValueVector* no selectedBuffer[numSelectedValue++] = pos; } if (numSelectedValue != originalSize) { - nodeIDVector->state->selVector->resetSelectorToValuePosBuffer(); + nodeIDVector->state->selVector->setToFiltered(); } nodeIDVector->state->selVector->selectedSize = numSelectedValue; } diff --git a/src/storage/store/chunked_node_group_collection.cpp b/src/storage/store/chunked_node_group_collection.cpp index d118217f42..68ab9677f0 100644 --- a/src/storage/store/chunked_node_group_collection.cpp +++ b/src/storage/store/chunked_node_group_collection.cpp @@ -18,10 +18,11 @@ void ChunkedNodeGroupCollection::append( auto& lastChunkedGroup = chunkedGroups.back(); auto numRowsToAppendInGroup = std::min(numRowsToAppend - numRowsAppended, static_cast(CHUNK_CAPACITY - lastChunkedGroup->getNumRows())); - tmpSelVector.resetSelectorToValuePosBufferWithSize(numRowsToAppendInGroup); + auto tmpSelVectorBuffer = tmpSelVector.getMultableBuffer(); for (auto i = 0u; i < numRowsToAppendInGroup; i++) { - tmpSelVector.selectedPositions[i] = selVector.selectedPositions[numRowsAppended + i]; + tmpSelVectorBuffer[i] = selVector.selectedPositions[numRowsAppended + i]; } + tmpSelVector.setToFiltered(numRowsToAppendInGroup); lastChunkedGroup->append(vectors, tmpSelVector, numRowsToAppendInGroup); if (lastChunkedGroup->getNumRows() == CHUNK_CAPACITY) { chunkedGroups.push_back(std::make_unique( diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index a3785c1869..f13d6c7808 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -318,13 +318,16 @@ void ColumnChunk::resize(uint64_t newCapacity) { } void ColumnChunk::populateWithDefaultVal(ValueVector* defaultValueVector) { + // TODO(Guodong): don't set vector state to a new one. Default vector is shared across all + // operators on the pipeline so setting its state will affect others. + // You can only set state for vectors that is local to this class. defaultValueVector->setState(std::make_shared()); auto valPos = defaultValueVector->state->selVector->selectedPositions[0]; - defaultValueVector->state->selVector->resetSelectorToValuePosBufferWithSize( - DEFAULT_VECTOR_CAPACITY); + auto positionBuffer = defaultValueVector->state->selVector->getMultableBuffer(); for (auto i = 0u; i < defaultValueVector->state->selVector->selectedSize; i++) { - defaultValueVector->state->selVector->selectedPositions[i] = valPos; + positionBuffer[i] = valPos; } + defaultValueVector->state->selVector->setToFiltered(DEFAULT_VECTOR_CAPACITY); auto numValuesAppended = 0u; auto numValuesToPopulate = capacity; while (numValuesAppended < numValuesToPopulate) { diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index 655de52aaa..1f2b0edefb 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -103,7 +103,7 @@ row_idx_t RelTable::detachDeleteForCSRRels(Transaction* transaction, RelTableDat while (relDataReadState->hasMoreToRead(transaction)) { scan(transaction, *relDataReadState); auto numRelsScanned = tempState->selVector->selectedSize; - tempState->selVector->resetSelectorToValuePosBufferWithSize(1); + tempState->selVector->setToFiltered(1); for (auto i = 0u; i < numRelsScanned; i++) { tempState->selVector->selectedPositions[0] = i; auto deleted = @@ -113,7 +113,7 @@ row_idx_t RelTable::detachDeleteForCSRRels(Transaction* transaction, RelTableDat KU_ASSERT(deleted == reverseDeleted); numRelsDeleted += (deleted && reverseDeleted); } - tempState->selVector->resetSelectorToUnselectedWithSize(DEFAULT_VECTOR_CAPACITY); + tempState->selVector->setToUnfiltered(); } return numRelsDeleted; } diff --git a/src/storage/store/rel_table_data.cpp b/src/storage/store/rel_table_data.cpp index ed2d11cf7e..72f428390d 100644 --- a/src/storage/store/rel_table_data.cpp +++ b/src/storage/store/rel_table_data.cpp @@ -217,7 +217,7 @@ void RelTableData::scan(Transaction* transaction, TableDataReadState& readState, } auto [startOffset, endOffset] = relReadState.getStartAndEndOffset(); auto numRowsToRead = endOffset - startOffset; - outputVectors[0]->state->selVector->resetSelectorToUnselectedWithSize(numRowsToRead); + outputVectors[0]->state->selVector->setToUnfiltered(numRowsToRead); outputVectors[0]->state->setOriginalSize(numRowsToRead); auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(relReadState.currentNodeOffset); auto relIDVectorIdx = INVALID_VECTOR_IDX; diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index b2cb6df8d5..1cb9f09649 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -126,8 +126,9 @@ void VarListColumnChunk::append(ValueVector* vector, const SelectionVector& selV } varListDataColumnChunk->resizeBuffer(nextListOffsetInChunk); auto dataVector = ListVector::getDataVector(vector); + // TODO(Guodong): we should not set vector to a new state. dataVector->setState(std::make_unique()); - dataVector->state->selVector->resetSelectorToValuePosBuffer(); + dataVector->state->selVector->setToFiltered(); for (auto i = 0u; i < selVector.selectedSize; i++) { auto pos = selVector.selectedPositions[i]; if (vector->isNull(pos)) { @@ -206,14 +207,15 @@ void VarListColumnChunk::write( ValueVector* vector, offset_t offsetInVector, offset_t offsetInChunk) { checkOffsetSortedAsc = true; auto selVector = std::make_unique(1); - selVector->resetSelectorToValuePosBuffer(); + selVector->setToFiltered(); selVector->selectedPositions[0] = offsetInVector; auto appendSize = vector->isNull(offsetInVector) ? 0 : vector->getValue(offsetInVector).size; varListDataColumnChunk->resizeBuffer(varListDataColumnChunk->getNumValues() + appendSize); + // TODO(Guodong): Do not set vector to a new state. auto dataVector = ListVector::getDataVector(vector); dataVector->setState(std::make_unique()); - dataVector->state->selVector->resetSelectorToValuePosBuffer(); + dataVector->state->selVector->setToFiltered(); copyListValues(vector->getValue(offsetInVector), dataVector); while (offsetInChunk >= numValues) { appendNullList();