Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sel vector interface #3177

Merged
merged 1 commit into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/data_chunk/data_chunk_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ void DataChunkState::slice(offset_t offset) {
// NOTE: this operation has performance penalty. Ideally we should directly modify selVector
// instead of creating a new one.
auto slicedSelVector = std::make_unique<SelectionVector>(DEFAULT_VECTOR_CAPACITY);
slicedSelVector->resetSelectorToValuePosBufferWithSize(selVector->selectedSize - offset);
slicedSelVector->setToFiltered(selVector->selectedSize - offset);
for (auto i = 0u; i < slicedSelVector->selectedSize; i++) {
slicedSelVector->selectedPositions[i] = selVector->selectedPositions[i + offset];
}
Expand Down
5 changes: 3 additions & 2 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ bool ValueVector::discardNull(ValueVector& vector) {
} else {
auto selectedPos = 0u;
if (vector.state->selVector->isUnfiltered()) {
vector.state->selVector->resetSelectorToValuePosBuffer();
auto buffer = vector.state->selVector->getMultableBuffer();
for (auto i = 0u; i < vector.state->selVector->selectedSize; i++) {
vector.state->selVector->selectedPositions[selectedPos] = i;
buffer[selectedPos] = i;
selectedPos += !vector.isNull(i);
}
vector.state->selVector->setToFiltered();
} else {
for (auto i = 0u; i < vector.state->selVector->selectedSize; i++) {
auto pos = vector.state->selVector->selectedPositions[i];
Expand Down
4 changes: 2 additions & 2 deletions src/expression_evaluator/case_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void CaseAlternativeEvaluator::init(const ResultSet& resultSet, MemoryManager* m
whenEvaluator->init(resultSet, memoryManager);
thenEvaluator->init(resultSet, memoryManager);
whenSelVector = std::make_unique<SelectionVector>(DEFAULT_VECTOR_CAPACITY);
whenSelVector->resetSelectorToValuePosBuffer();
whenSelVector->setToFiltered();
}

void CaseExpressionEvaluator::init(const ResultSet& resultSet, MemoryManager* memoryManager) {
Expand Down Expand Up @@ -51,7 +51,7 @@ bool CaseExpressionEvaluator::select(SelectionVector& selVector, ClientContext*
evaluate(clientContext);
KU_ASSERT(resultVector->state->selVector->selectedSize == selVector.selectedSize);
auto numSelectedValues = 0u;
auto selectedPosBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPosBuffer = selVector.getMultableBuffer();
for (auto i = 0u; i < selVector.selectedSize; ++i) {
auto selVectorPos = selVector.selectedPositions[i];
auto resultVectorPos = resultVector->state->selVector->selectedPositions[i];
Expand Down
2 changes: 1 addition & 1 deletion src/expression_evaluator/function_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool FunctionExpressionEvaluator::select(
auto numSelectedValues = 0u;
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; ++i) {
auto pos = resultVector->state->selVector->selectedPositions[i];
auto selectedPosBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPosBuffer = selVector.getMultableBuffer();
selectedPosBuffer[numSelectedValues] = pos;
numSelectedValues += resultVector->getValue<bool>(pos);
}
Expand Down
2 changes: 1 addition & 1 deletion src/expression_evaluator/reference_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ inline static bool isTrue(ValueVector& vector, uint64_t pos) {
bool ReferenceExpressionEvaluator::select(
SelectionVector& selVector, ClientContext* /*clientContext*/) {
uint64_t numSelectedValues = 0;
auto selectedBuffer = resultVector->state->selVector->getSelectedPositionsBuffer();
auto selectedBuffer = resultVector->state->selVector->getMultableBuffer();
if (resultVector->state->selVector->isUnfiltered()) {
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) {
selectedBuffer[numSelectedValues] = i;
Expand Down
2 changes: 1 addition & 1 deletion src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output
outputVector->copyFromVectorData(i, localVector.get(), i);
}
}
dataChunk.state->selVector->resetSelectorToUnselectedWithSize(numValuesToOutput);
dataChunk.state->selVector->setToUnfiltered(numValuesToOutput);
localState->currChunkIdx++;
return numValuesToOutput;
}
Expand Down
2 changes: 1 addition & 1 deletion src/function/vector_hash_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ static std::unique_ptr<ValueVector> computeDataVecHash(ValueVector* operand) {
auto numValuesInDataVec = ListVector::getDataVectorSize(operand);
ListVector::resizeDataVector(hashVector.get(), numValuesInDataVec);
auto selectionState = std::make_shared<DataChunkState>();
selectionState->selVector->resetSelectorToValuePosBuffer();
selectionState->selVector->setToFiltered();
ListVector::getDataVector(operand)->setState(selectionState);
auto numValuesComputed = 0u;
while (numValuesComputed < numValuesInDataVec) {
Expand Down
22 changes: 9 additions & 13 deletions src/include/common/data_chunk/sel_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@ class SelectionVector {
public:
explicit SelectionVector(sel_t capacity) : selectedSize{0} {
selectedPositionsBuffer = std::make_unique<sel_t[]>(capacity);
resetSelectorToUnselected();
setToUnfiltered();
}

inline bool isUnfiltered() const {
return selectedPositions == (sel_t*)&INCREMENTAL_SELECTED_POS;
}
inline void resetSelectorToUnselected() {
selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS;
}
inline void resetSelectorToUnselectedWithSize(sel_t size) {
bool isUnfiltered() const { return selectedPositions == (sel_t*)&INCREMENTAL_SELECTED_POS; }
void setToUnfiltered() { selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS; }
void setToUnfiltered(sel_t size) {
selectedPositions = (sel_t*)&INCREMENTAL_SELECTED_POS;
selectedSize = size;
}
inline void resetSelectorToValuePosBuffer() {
selectedPositions = selectedPositionsBuffer.get();
}
inline void resetSelectorToValuePosBufferWithSize(sel_t size) {

// Set to filtered is not very accurate. It sets selectedPositions to a mutable array.
void setToFiltered() { selectedPositions = selectedPositionsBuffer.get(); }
void setToFiltered(sel_t size) {
selectedPositions = selectedPositionsBuffer.get();
selectedSize = size;
}
inline sel_t* getSelectedPositionsBuffer() { return selectedPositionsBuffer.get(); }
sel_t* getMultableBuffer() { return selectedPositionsBuffer.get(); }

KUZU_API static const sel_t INCREMENTAL_SELECTED_POS[DEFAULT_VECTOR_CAPACITY];

Expand Down
6 changes: 3 additions & 3 deletions src/include/function/binary_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ struct BinaryFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto lPos = left.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.isNull(lPos)) {
return numSelectedValues;
} else if (right.hasNoNullsGuarantee()) {
Expand Down Expand Up @@ -368,7 +368,7 @@ struct BinaryFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto rPos = right.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (right.isNull(rPos)) {
return numSelectedValues;
} else if (left.hasNoNullsGuarantee()) {
Expand Down Expand Up @@ -411,7 +411,7 @@ struct BinaryFunctionExecutor {
static bool selectBothUnFlat(
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.hasNoNullsGuarantee() && right.hasNoNullsGuarantee()) {
if (left.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < left.state->selVector->selectedSize; i++) {
Expand Down
8 changes: 4 additions & 4 deletions src/include/function/boolean/boolean_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct BinaryBooleanFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto lPos = left.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (right.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < right.state->selVector->selectedSize; ++i) {
selectOnValue<FUNC>(
Expand All @@ -192,7 +192,7 @@ struct BinaryBooleanFunctionExecutor {
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
auto rPos = right.state->selVector->selectedPositions[0];
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < left.state->selVector->selectedSize; ++i) {
selectOnValue<FUNC>(
Expand All @@ -213,7 +213,7 @@ struct BinaryBooleanFunctionExecutor {
static bool selectBothUnFlat(
common::ValueVector& left, common::ValueVector& right, common::SelectionVector& selVector) {
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionsBuffer = selVector.getMultableBuffer();
if (left.state->selVector->isUnfiltered()) {
for (auto i = 0u; i < left.state->selVector->selectedSize; ++i) {
selectOnValue<FUNC>(
Expand Down Expand Up @@ -303,7 +303,7 @@ struct UnaryBooleanOperationExecutor {
return resultValue == true;
} else {
uint64_t numSelectedValues = 0;
auto selectedPositionBuffer = selVector.getSelectedPositionsBuffer();
auto selectedPositionBuffer = selVector.getMultableBuffer();
if (operand.state->selVector->isUnfiltered()) {
for (auto i = 0ul; i < operand.state->selVector->selectedSize; i++) {
selectOnValue<FUNC>(operand, i, numSelectedValues, selectedPositionBuffer);
Expand Down
4 changes: 2 additions & 2 deletions src/include/function/null/null_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ struct NullOperationExecutor {
return resultValue == true;
} else {
uint64_t numSelectedValues = 0;
auto selectedPositionsBuffer = selVector.getSelectedPositionsBuffer();
auto buffer = selVector.getMultableBuffer();
for (auto i = 0ul; i < operand.state->selVector->selectedSize; i++) {
auto pos = operand.state->selVector->selectedPositions[i];
selectOnValue<FUNC>(operand, pos, numSelectedValues, selectedPositionsBuffer);
selectOnValue<FUNC>(operand, pos, numSelectedValues, buffer);
}
selVector.selectedSize = numSelectedValues;
return numSelectedValues > 0;
Expand Down
2 changes: 1 addition & 1 deletion src/include/function/path/path_function_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct UnaryPathExecutor {
common::StructVector::getFieldVector(listDataVector, fieldIdx).get();
std::unordered_set<common::nodeID_t, InternalIDHasher> internalIDSet;
auto numSelectedValues = 0u;
auto buffer = selectionVector.getSelectedPositionsBuffer();
auto buffer = selectionVector.getMultableBuffer();
if (inputSelVector.isUnfiltered()) {
for (auto i = 0u; i < inputSelVector.selectedSize; ++i) {
auto& listEntry = listVector.getValue<common::list_entry_t>(i);
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ProbeState {
probedTuples = std::make_unique<uint8_t*[]>(common::DEFAULT_VECTOR_CAPACITY);
matchedSelVector =
std::make_unique<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
matchedSelVector->resetSelectorToValuePosBuffer();
matchedSelVector->setToFiltered();
}

// Each key corresponds to a pointer with the same hash value from the ht directory.
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool Filter::getNextTuplesInternal(ExecutionContext* context) {
*dataChunkToSelect->state->selVector, context->clientContext);
if (!dataChunkToSelect->state->isFlat() &&
dataChunkToSelect->state->selVector->isUnfiltered()) {
dataChunkToSelect->state->selVector->resetSelectorToValuePosBuffer();
dataChunkToSelect->state->selVector->setToFiltered();
}
} while (!hasAtLeastOneSelectedValue);
metrics->numOutputTuple.increase(dataChunkToSelect->state->selVector->selectedSize);
Expand All @@ -44,14 +44,14 @@ bool NodeLabelFiler::getNextTuplesInternal(ExecutionContext* context) {
}
saveSelVector(nodeIDVector->state->selVector);
numSelectValue = 0;
auto buffer = nodeIDVector->state->selVector->getSelectedPositionsBuffer();
auto buffer = nodeIDVector->state->selVector->getMultableBuffer();
for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; ++i) {
auto pos = nodeIDVector->state->selVector->selectedPositions[i];
buffer[numSelectValue] = pos;
numSelectValue +=
info->nodeLabelSet.contains(nodeIDVector->getValue<nodeID_t>(pos).tableID);
}
nodeIDVector->state->selVector->resetSelectorToValuePosBuffer();
nodeIDVector->state->selVector->setToFiltered();
} while (numSelectValue == 0);
nodeIDVector->state->selVector->selectedSize = numSelectValue;
metrics->numOutputTuple.increase(nodeIDVector->state->selVector->selectedSize);
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/filtering_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ void SelVectorOverWriter::saveSelVector(std::shared_ptr<SelectionVector>& selVec
void SelVectorOverWriter::resetToCurrentSelVector(std::shared_ptr<SelectionVector>& selVector) {
currentSelVector->selectedSize = selVector->selectedSize;
if (selVector->isUnfiltered()) {
currentSelVector->resetSelectorToUnselected();
currentSelVector->setToUnfiltered();
} else {
memcpy(currentSelVector->getSelectedPositionsBuffer(), selVector->selectedPositions,
memcpy(currentSelVector->getMultableBuffer(), selVector->selectedPositions,
selVector->selectedSize * sizeof(sel_t));
currentSelVector->resetSelectorToValuePosBuffer();
currentSelVector->setToFiltered();
}
selVector = currentSelVector;
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/flatten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace processor {

void Flatten::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) {
dataChunkState = resultSet->dataChunks[dataChunkToFlattenPos]->state.get();
currentSelVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
currentSelVector->setToFiltered(1 /* size */);
localState = std::make_unique<FlattenLocalState>();
}

Expand Down
7 changes: 3 additions & 4 deletions src/processor/operator/hash_join/hash_join_probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ uint64_t HashJoinProbe::getInnerJoinResultForUnFlatKey() {
auto keySelVector = keyVectors[0]->state->selVector.get();
if (keySelVector->selectedSize != numTuplesToRead) {
// Some keys have no matched tuple. So we modify selected position.
auto keySelectedBuffer = keySelVector->getSelectedPositionsBuffer();
auto buffer = keySelVector->getMultableBuffer();
for (auto i = 0u; i < numTuplesToRead; i++) {
keySelectedBuffer[i] = probeState->matchedSelVector->selectedPositions[i];
buffer[i] = probeState->matchedSelVector->selectedPositions[i];
}
keySelVector->selectedSize = numTuplesToRead;
keySelVector->resetSelectorToValuePosBuffer();
keySelVector->setToFiltered(numTuplesToRead);
}
sharedState->getHashTable()->lookup(vectorsToReadInto, columnIdxsToReadFrom,
probeState->matchedTuples.get(), probeState->nextMatchedTupleIdx, numTuplesToRead);
Expand Down
14 changes: 8 additions & 6 deletions src/processor/operator/hash_join/join_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ void JoinHashTable::appendVector(
static void sortSelectedPos(ValueVector* nodeIDVector) {
auto selVector = nodeIDVector->state->selVector.get();
auto size = selVector->selectedSize;
auto selectedPos = selVector->getSelectedPositionsBuffer();
auto buffer = selVector->getMultableBuffer();
if (selVector->isUnfiltered()) {
memcpy(selectedPos, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t));
selVector->resetSelectorToValuePosBuffer();
memcpy(buffer, &SelectionVector::INCREMENTAL_SELECTED_POS, size * sizeof(sel_t));
selVector->setToFiltered();
}
std::sort(selectedPos, selectedPos + size, [nodeIDVector](sel_t left, sel_t right) {
std::sort(buffer, buffer + size, [nodeIDVector](sel_t left, sel_t right) {
return nodeIDVector->getValue<nodeID_t>(left) < nodeIDVector->getValue<nodeID_t>(right);
});
}
Expand All @@ -80,7 +80,7 @@ void JoinHashTable::appendVectorWithSorting(
auto payloadNodeIDVector = payloadVectors[0];
auto payloadsState = payloadNodeIDVector->state.get();
if (!payloadsState->isFlat()) {
// Sorting is only needed when the payload is unflat (a list of values).
// Sorting is only needed when the payload is unFlat (a list of values).
sortSelectedPos(payloadNodeIDVector);
}
// A single appendInfo will return from `allocateFlatTupleBlocks` when numTuplesToAppend is 1.
Expand All @@ -95,7 +95,9 @@ void JoinHashTable::appendVectorWithSorting(
}
factorizedTable->copyVectorToColumn(*hashVector, appendInfos[0], numTuplesToAppend, colIdx);
if (!payloadsState->isFlat()) {
payloadsState->selVector->resetSelectorToUnselected();
// TODO(Xiyang): I can no longer recall why I set to un-filtered but this is probably wrong.
// We should set back to the un-sorted state.
payloadsState->selVector->setToUnfiltered();
}
factorizedTable->numTuples += numTuplesToAppend;
}
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ bool IndexScan::getNextTuplesInternal(ExecutionContext* context) {
}
saveSelVector(outVector->state->selVector);
numSelectedValues = 0u;
auto buffer = outVector->state->selVector->getMultableBuffer();
for (auto i = 0u; i < indexVector->state->selVector->selectedSize; ++i) {
auto pos = indexVector->state->selVector->selectedPositions[i];
if (indexVector->isNull(pos)) {
continue;
}
outVector->state->selVector->getSelectedPositionsBuffer()[numSelectedValues] = pos;
buffer[numSelectedValues] = pos;
offset_t nodeOffset = INVALID_OFFSET;
numSelectedValues +=
pkIndex->lookup(context->clientContext->getTx(), indexVector, pos, nodeOffset);
nodeID_t nodeID{nodeOffset, tableID};
outVector->setValue<nodeID_t>(pos, nodeID);
}
if (!outVector->state->isFlat() && outVector->state->selVector->isUnfiltered()) {
outVector->state->selVector->resetSelectorToValuePosBuffer();
outVector->state->selVector->setToFiltered();
}
} while (numSelectedValues == 0);
outVector->state->selVector->selectedSize = numSelectedValues;
Expand Down
Loading
Loading