Skip to content

Commit

Permalink
Refactor sel vector interface (#3177)
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Mar 31, 2024
1 parent fa528c1 commit 4e406a1
Show file tree
Hide file tree
Showing 31 changed files with 98 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/common/data_chunk/data_chunk_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ void DataChunkState::slice(offset_t offset) {
// NOTE: this operation has performance penalty. Ideally we should directly modify selVector
// instead of creating a new one.
auto slicedSelVector = std::make_unique<SelectionVector>(DEFAULT_VECTOR_CAPACITY);
slicedSelVector->resetSelectorToValuePosBufferWithSize(selVector->selectedSize - offset);
slicedSelVector->setToFiltered(selVector->selectedSize - offset);
for (auto i = 0u; i < slicedSelVector->selectedSize; i++) {
slicedSelVector->selectedPositions[i] = selVector->selectedPositions[i + offset];
}
Expand Down
5 changes: 3 additions & 2 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ bool ValueVector::discardNull(ValueVector& vector) {
} else {
auto selectedPos = 0u;
if (vector.state->selVector->isUnfiltered()) {
vector.state->selVector->resetSelectorToValuePosBuffer();
auto buffer = vector.state->selVector->getMultableBuffer();
for (auto i = 0u; i < vector.state->selVector->selectedSize; i++) {
vector.state->selVector->selectedPositions[selectedPos] = i;
buffer[selectedPos] = i;
selectedPos += !vector.isNull(i);
}
vector.state->selVector->setToFiltered();
} else {
for (auto i = 0u; i < vector.state->selVector->selectedSize; i++) {
auto pos = vector.state->selVector->selectedPositions[i];
Expand Down
4 changes: 2 additions & 2 deletions src/expression_evaluator/case_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void CaseAlternativeEvaluator::init(const ResultSet& resultSet, MemoryManager* m
whenEvaluator->init(resultSet, memoryManager);
thenEvaluator->init(resultSet, memoryManager);
whenSelVector = std::make_unique<SelectionVector>(DEFAULT_VECTOR_CAPACITY);
whenSelVector->resetSelectorToValuePosBuffer();
whenSelVector->setToFiltered();
}

void CaseExpressionEvaluator::init(const ResultSet& resultSet, MemoryManager* memoryManager) {
Expand Down Expand Up @@ -51,7 +51,7 @@ bool CaseExpressionEvaluator::select(SelectionVector& selVector, ClientContext*
evaluate(clientContext);
KU_ASSERT(resultVector->state->selVector->selectedSize == selVector.selectedSize);
auto numSelectedValues = 0u;
auto selectedPosBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPosBuffer = selVector.getMultableBuffer();
for (auto i = 0u; i < selVector.selectedSize; ++i) {
auto selVectorPos = selVector.selectedPositions[i];
auto resultVectorPos = resultVector->state->selVector->selectedPositions[i];
Expand Down
2 changes: 1 addition & 1 deletion src/expression_evaluator/function_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool FunctionExpressionEvaluator::select(
auto numSelectedValues = 0u;
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; ++i) {
auto pos = resultVector->state->selVector->selectedPositions[i];
auto selectedPosBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPosBuffer = selVector.getMultableBuffer();
selectedPosBuffer[numSelectedValues] = pos;
numSelectedValues += resultVector->getValue<bool>(pos);
}
Expand Down
2 changes: 1 addition & 1 deletion src/expression_evaluator/reference_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ inline static bool isTrue(ValueVector& vector, uint64_t pos) {
bool ReferenceExpressionEvaluator::select(
SelectionVector& selVector, ClientContext* /*clientContext*/) {
uint64_t numSelectedValues = 0;
auto selectedBuffer = resultVector->state->selVector->getSelectedPositionsBuffer();
auto selectedBuffer = resultVector->state->selVector->getMultableBuffer();
if (resultVector->state->selVector->isUnfiltered()) {
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) {
selectedBuffer[numSelectedValues] = i;
Expand Down
2 changes: 1 addition & 1 deletion src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output
outputVector->copyFromVectorData(i, localVector.get(), i);
}
}
dataChunk.state->selVector->resetSelectorToUnselectedWithSize(numValuesToOutput);
dataChunk.state->selVector->setToUnfiltered(numValuesToOutput);
localState->currChunkIdx++;
return numValuesToOutput;
}
Expand Down
2 changes: 1 addition & 1 deletion src/function/vector_hash_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ static std::unique_ptr<ValueVector> computeDataVecHash(ValueVector* operand) {
auto numValuesInDataVec = ListVector::getDataVectorSize(operand);
ListVector::resizeDataVector(hashVector.get(), numValuesInDataVec);
auto selectionState = std::make_shared<DataChunkState>();
selectionState->selVector->resetSelectorToValuePosBuffer();
selectionState->selVector->setToFiltered();
ListVector::getDataVector(operand)->setState(selectionState);
auto numValuesComputed = 0u;
while (numValuesComputed < numValuesInDataVec) {
Expand Down
22 changes: 9 additions & 13 deletions src/include/common/data_chunk/sel_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@ class SelectionVector {
public:
explicit SelectionVector(sel_t capacity) : selectedSize{0} {
selectedPositionsBuffer = std::make_unique<sel_t[]>(capacity);
resetSelectorToUnselected();
setToUnfiltered();
}

inline bool isUnfiltered() const {
return selectedPositions == (sel_t*)&INCREMENTAL_SELECTED_POS;
}
inline void resetSelectorToUnselected() {
selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS;
}
inline void resetSelectorToUnselectedWithSize(sel_t size) {
bool isUnfiltered() const { return selectedPositions == (sel_t*)&INCREMENTAL_SELECTED_POS; }
void setToUnfiltered() { selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS; }
void setToUnfiltered(sel_t size) {
selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS;
selectedSize = size;
}
inline void resetSelectorToValuePosBuffer() {
selectedPositions = selectedPositionsBuffer.get();
}
inline void resetSelectorToValuePosBufferWithSize(sel_t size) {

// Set to filtered is not very accurate. It sets selectedPositions to a mutable array.
void setToFiltered() { selectedPositions = selectedPositionsBuffer.get(); }
void setToFiltered(sel_t size) {
selectedPositions = selectedPositionsBuffer.get();
selectedSize = size;
}
inline sel_t* getSelectedPositionsBuffer() { return selectedPositionsBuffer.get(); }
sel_t* getMultableBuffer() { return selectedPositionsBuffer.get(); }

KUZU_API static const sel_t INCREMENTAL_SELECTED_POS[DEFAULT_VECTOR_CAPACITY];

Expand Down
6 changes: 3 additions & 3 deletions src/include/function/binary_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ struct BinaryFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto lPos = left.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.isNull(lPos)) {
return numSelectedValues;
} else if (right.hasNoNullsGuarantee()) {
Expand Down Expand Up @@ -368,7 +368,7 @@ struct BinaryFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto rPos = right.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (right.isNull(rPos)) {
return numSelectedValues;
} else if (left.hasNoNullsGuarantee()) {
Expand Down Expand Up @@ -411,7 +411,7 @@ struct BinaryFunctionExecutor {
static bool selectBothUnFlat(
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.hasNoNullsGuarantee() && right.hasNoNullsGuarantee()) {
if (left.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < left.state->selVector->selectedSize; i++) {
Expand Down
8 changes: 4 additions & 4 deletions src/include/function/boolean/boolean_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct BinaryBooleanFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto lPos = left.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (right.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < right.state->selVector->selectedSize; ++i) {
selectOnValue<FUNC>(
Expand All @@ -192,7 +192,7 @@ struct BinaryBooleanFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto rPos = right.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < left.state->selVector->selectedSize; ++i) {
selectOnValue<FUNC>(
Expand All @@ -213,7 +213,7 @@ struct BinaryBooleanFunctionExecutor {
static bool selectBothUnFlat(
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < left.state->selVector->selectedSize; ++i) {
selectOnValue<FUNC>(
Expand Down Expand Up @@ -303,7 +303,7 @@ struct UnaryBooleanOperationExecutor {
return resultValue == true;
} else {
uint64_t numSelectedValues = 0;
auto selectedPositionBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionBuffer = selVector.getMultableBuffer();
if (operand.state->selVector->isUnfiltered()) {
for (auto i = 0ul; i < operand.state->selVector->selectedSize; i++) {
selectOnValue<FUNC>(operand, i, numSelectedValues, selectedPositionBuffer);
Expand Down
4 changes: 2 additions & 2 deletions src/include/function/null/null_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ struct NullOperationExecutor {
return resultValue == true;
} else {
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto buffer = selVector.getMultableBuffer();
for (auto i = 0ul; i < operand.state->selVector->selectedSize; i++) {
auto pos = operand.state->selVector->selectedPositions[i];
selectOnValue<FUNC>(operand, pos, numSelectedValues, selectedPositionsBuffer);
selectOnValue<FUNC>(operand, pos, numSelectedValues, buffer);
}
selVector.selectedSize = numSelectedValues;
return numSelectedValues > 0;
Expand Down
2 changes: 1 addition & 1 deletion src/include/function/path/path_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct UnaryPathExecutor {
common::StructVector::getFieldVector(listDataVector, fieldIdx).get();
std::unordered_set<common::nodeID_t, InternalIDHasher> internalIDSet;
auto numSelectedValues = 0u;
auto buffer = selectionVector.getSelectedPositionsBuffer();
auto buffer = selectionVector.getMultableBuffer();
if (inputSelVector.isUnfiltered()) {
for (auto i = 0u; i < inputSelVector.selectedSize; ++i) {
auto& listEntry = listVector.getValue<common::list_entry_t>(i);
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ProbeState {
probedTuples = std::make_unique<uint8_t*[]>(common::DEFAULT_VECTOR_CAPACITY);
matchedSelVector =
std::make_unique<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
matchedSelVector->resetSelectorToValuePosBuffer();
matchedSelVector->setToFiltered();
}

// Each key corresponds to a pointer with the same hash value from the ht directory.
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool Filter::getNextTuplesInternal(ExecutionContext* context) {
*dataChunkToSelect->state->selVector, context->clientContext);
if (!dataChunkToSelect->state->isFlat() &&
dataChunkToSelect->state->selVector->isUnfiltered()) {
dataChunkToSelect->state->selVector->resetSelectorToValuePosBuffer();
dataChunkToSelect->state->selVector->setToFiltered();
}
} while (!hasAtLeastOneSelectedValue);
metrics->numOutputTuple.increase(dataChunkToSelect->state->selVector->selectedSize);
Expand All @@ -44,14 +44,14 @@ bool NodeLabelFiler::getNextTuplesInternal(ExecutionContext* context) {
}
saveSelVector(nodeIDVector->state->selVector);
numSelectValue = 0;
auto buffer = nodeIDVector->state->selVector->getSelectedPositionsBuffer();
auto buffer = nodeIDVector->state->selVector->getMultableBuffer();
for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; ++i) {
auto pos = nodeIDVector->state->selVector->selectedPositions[i];
buffer[numSelectValue] = pos;
numSelectValue +=
info->nodeLabelSet.contains(nodeIDVector->getValue<nodeID_t>(pos).tableID);
}
nodeIDVector->state->selVector->resetSelectorToValuePosBuffer();
nodeIDVector->state->selVector->setToFiltered();
} while (numSelectValue == 0);
nodeIDVector->state->selVector->selectedSize = numSelectValue;
metrics->numOutputTuple.increase(nodeIDVector->state->selVector->selectedSize);
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/filtering_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ void SelVectorOverWriter::saveSelVector(std::shared_ptr<SelectionVector>& selVec
void SelVectorOverWriter::resetToCurrentSelVector(std::shared_ptr<SelectionVector>& selVector) {
currentSelVector->selectedSize = selVector->selectedSize;
if (selVector->isUnfiltered()) {
currentSelVector->resetSelectorToUnselected();
currentSelVector->setToUnfiltered();
} else {
memcpy(currentSelVector->getSelectedPositionsBuffer(), selVector->selectedPositions,
memcpy(currentSelVector->getMultableBuffer(), selVector->selectedPositions,
selVector->selectedSize * sizeof(sel_t));
currentSelVector->resetSelectorToValuePosBuffer();
currentSelVector->setToFiltered();
}
selVector = currentSelVector;
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/flatten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace processor {

void Flatten::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) {
dataChunkState = resultSet->dataChunks[dataChunkToFlattenPos]->state.get();
currentSelVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
currentSelVector->setToFiltered(1 /* size */);
localState = std::make_unique<FlattenLocalState>();
}

Expand Down
7 changes: 3 additions & 4 deletions src/processor/operator/hash_join/hash_join_probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ uint64_t HashJoinProbe::getInnerJoinResultForUnFlatKey() {
auto keySelVector = keyVectors[0]->state->selVector.get();
if (keySelVector->selectedSize != numTuplesToRead) {
// Some keys have no matched tuple. So we modify selected position.
auto keySelectedBuffer = keySelVector->getSelectedPositionsBuffer();
auto buffer = keySelVector->getMultableBuffer();
for (auto i = 0u; i < numTuplesToRead; i++) {
keySelectedBuffer[i] = probeState->matchedSelVector->selectedPositions[i];
buffer[i] = probeState->matchedSelVector->selectedPositions[i];
}
keySelVector->selectedSize = numTuplesToRead;
keySelVector->resetSelectorToValuePosBuffer();
keySelVector->setToFiltered(numTuplesToRead);
}
sharedState->getHashTable()->lookup(vectorsToReadInto, columnIdxsToReadFrom,
probeState->matchedTuples.get(), probeState->nextMatchedTupleIdx, numTuplesToRead);
Expand Down
14 changes: 8 additions & 6 deletions src/processor/operator/hash_join/join_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ void JoinHashTable::appendVector(
static void sortSelectedPos(ValueVector* nodeIDVector) {
auto selVector = nodeIDVector->state->selVector.get();
auto size = selVector->selectedSize;
auto selectedPos = selVector->getSelectedPositionsBuffer();
auto buffer = selVector->getMultableBuffer();
if (selVector->isUnfiltered()) {
memcpy(selectedPos, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t));
selVector->resetSelectorToValuePosBuffer();
memcpy(buffer, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t));
selVector->setToFiltered();
}
std::sort(selectedPos, selectedPos + size, [nodeIDVector](sel_t left, sel_t right) {
std::sort(buffer, buffer + size, [nodeIDVector](sel_t left, sel_t right) {
return nodeIDVector->getValue<nodeID_t>(left) < nodeIDVector->getValue<nodeID_t>(right);
});
}
Expand All @@ -80,7 +80,7 @@ void JoinHashTable::appendVectorWithSorting(
auto payloadNodeIDVector = payloadVectors[0];
auto payloadsState = payloadNodeIDVector->state.get();
if (!payloadsState->isFlat()) {
// Sorting is only needed when the payload is unflat (a list of values).
// Sorting is only needed when the payload is unFlat (a list of values).
sortSelectedPos(payloadNodeIDVector);
}
// A single appendInfo will return from `allocateFlatTupleBlocks` when numTuplesToAppend is 1.
Expand All @@ -95,7 +95,9 @@ void JoinHashTable::appendVectorWithSorting(
}
factorizedTable->copyVectorToColumn(*hashVector, appendInfos[0], numTuplesToAppend, colIdx);
if (!payloadsState->isFlat()) {
payloadsState->selVector->resetSelectorToUnselected();
// TODO(Xiyang): I can no longer recall why I set to un-filtered but this is probably wrong.
// We should set back to the un-sorted state.
payloadsState->selVector->setToUnfiltered();
}
factorizedTable->numTuples += numTuplesToAppend;
}
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ bool IndexScan::getNextTuplesInternal(ExecutionContext* context) {
}
saveSelVector(outVector->state->selVector);
numSelectedValues = 0u;
auto buffer = outVector->state->selVector->getMultableBuffer();
for (auto i = 0u; i < indexVector->state->selVector->selectedSize; ++i) {
auto pos = indexVector->state->selVector->selectedPositions[i];
if (indexVector->isNull(pos)) {
continue;
}
outVector->state->selVector->getSelectedPositionsBuffer()[numSelectedValues] = pos;
buffer[numSelectedValues] = pos;
offset_t nodeOffset = INVALID_OFFSET;
numSelectedValues +=
pkIndex->lookup(context->clientContext->getTx(), indexVector, pos, nodeOffset);
nodeID_t nodeID{nodeOffset, tableID};
outVector->setValue<nodeID_t>(pos, nodeID);
}
if (!outVector->state->isFlat() && outVector->state->selVector->isUnfiltered()) {
outVector->state->selVector->resetSelectorToValuePosBuffer();
outVector->state->selVector->setToFiltered();
}
} while (numSelectedValues == 0);
outVector->state->selVector->selectedSize = numSelectedValues;
Expand Down
Loading

0 comments on commit 4e406a1

Please sign in to comment.