Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 26, 2023
1 parent 433c19a commit 06b4a2a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 61 deletions.
7 changes: 0 additions & 7 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ class OrderByMerge : public Sink {

void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) final {
auto sortedKeyBlocks = sharedState->getSortedKeyBlocks();
if (sortedKeyBlocks->empty()) {
sortedKeyBlocks->emplace(std::make_shared<MergedKeyBlocks>());
}
}

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<OrderByMerge>(sharedState, sharedDispatcher, id, paramsString);
}
Expand Down
16 changes: 10 additions & 6 deletions src/include/processor/operator/order_by/top_k.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class TopKBuffer {
inline void initScan(TopKScanState& scanState) { sortState->initScan(scanState, skip, limit); }

private:
void initVectors();

uint64_t findKeyVectorPosInPayload(const DataPos& keyPos);

template<typename FUNC>
void getSelectComparisonFunction(
common::PhysicalTypeID typeID, vector_select_comparison_func& selectFunc);
Expand All @@ -78,6 +82,8 @@ class TopKBuffer {

void setBoundaryValue();

bool compareBoundaryValue(std::vector<common::ValueVector*>& keyVectors);

public:
std::unique_ptr<TopKSortState> sortState;
uint64_t skip;
Expand All @@ -88,16 +94,14 @@ class TopKBuffer {
bool hasBoundaryValue = false;

private:
std::vector<std::unique_ptr<common::ValueVector>> payloadVecs;
std::vector<std::unique_ptr<common::ValueVector>> keyVecs;
std::vector<std::unique_ptr<common::ValueVector>> tmpPayloadVecs;
std::vector<std::unique_ptr<common::ValueVector>> tmpKeyVecs;
// Holds the ownership of all temp vectors.
std::vector<std::unique_ptr<common::ValueVector>> tmpVectors;
std::vector<std::unique_ptr<common::ValueVector>> boundaryVecs;

std::vector<common::ValueVector*> payloadVecsToScan;
std::vector<common::ValueVector*> keyVecsToScan;
std::vector<common::ValueVector*> tmpPayloadVecsToScan;
std::vector<common::ValueVector*> tmpKeyVecsToScan;
std::vector<common::ValueVector*> lastPayloadVecsToScan;
std::vector<common::ValueVector*> lastKeyVecsToScan;
};

class TopKLocalState {
Expand Down
125 changes: 77 additions & 48 deletions src/processor/operator/order_by/top_k.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,57 +46,15 @@ void TopKBuffer::init(const kuzu::processor::OrderByDataInfo& orderByDataInfo,
sortState->init(orderByDataInfo, memoryManager);
this->skip = skipNumber;
this->limit = limitNumber;
auto payloadState = std::make_shared<common::DataChunkState>();
auto tmpPayloadState = std::make_shared<common::DataChunkState>();
for (auto& [pos, type] : orderByDataInfo.payloadsPosAndType) {
auto payloadVec = std::make_unique<common::ValueVector>(type, memoryManager);
auto tmpPayloadVec = std::make_unique<common::ValueVector>(type, memoryManager);
payloadVec->setState(payloadState);
tmpPayloadVec->setState(tmpPayloadState);
payloadVecsToScan.push_back(payloadVec.get());
tmpPayloadVecsToScan.push_back(tmpPayloadVec.get());
payloadVecs.push_back(std::move(payloadVec));
tmpPayloadVecs.push_back(std::move(tmpPayloadVec));
}
auto boundaryState = common::DataChunkState::getSingleValueDataChunkState();
for (auto& [pos, type] : orderByDataInfo.keysPosAndType) {
auto boundaryVec = std::make_unique<common::ValueVector>(type, memoryManager);
boundaryVec->setState(boundaryState);
boundaryVecs.push_back(std::move(boundaryVec));
for (auto i = 0u; i < orderByDataInfo.payloadsPosAndType.size(); i++) {
if (pos == orderByDataInfo.payloadsPosAndType[i].first) {
keyVecsToScan.push_back(payloadVecsToScan[i]);
tmpKeyVecsToScan.push_back(tmpPayloadVecsToScan[i]);
break;
}
}
}
initVectors();
initCompareFuncs();
}

void TopKBuffer::append(std::vector<common::ValueVector*> keyVectors,
std::vector<common::ValueVector*> payloadVectors) {
auto originalSelState = keyVectors[0]->state->selVector;
if (hasBoundaryValue) {
if (keyVectors[0]->state->isFlat()) {
for (auto i = 0u; i < keyVectors.size(); i++) {
std::shared_ptr<common::SelectionVector> selVector =
std::make_shared<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
selVector->resetSelectorToValuePosBuffer();
if (!compareFuncs[i](*keyVectors[i], *boundaryVecs[i], *selVector)) {
return;
}
}
} else {
auto dataChunkState = keyVectors[0]->state;
for (auto i = 0u; i < keyVectors.size(); i++) {
std::shared_ptr<common::SelectionVector> selVector =
std::make_shared<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
selVector->resetSelectorToValuePosBuffer();
compareFuncs[i](*keyVectors[i], *boundaryVecs[i], *selVector);
dataChunkState->selVector = std::move(selVector);
}
}
if (hasBoundaryValue && !compareBoundaryValue(keyVectors)) {
return;
}
sortState->append(keyVectors, payloadVectors);
keyVectors[0]->state->selVector = std::move(originalSelState);
Expand All @@ -120,8 +78,8 @@ void TopKBuffer::reduce() {
break;
}
newSortState->append(keyVecsToScan, payloadVecsToScan);

Check warning on line 80 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L80

Added line #L80 was not covered by tests
std::swap(payloadVecsToScan, tmpPayloadVecsToScan);
std::swap(keyVecsToScan, tmpKeyVecsToScan);
std::swap(payloadVecsToScan, lastPayloadVecsToScan);
std::swap(keyVecsToScan, lastKeyVecsToScan);
}
sortState = std::move(newSortState);
}
Expand All @@ -139,6 +97,52 @@ void TopKBuffer::merge(TopKBuffer* other) {
reduce();
}

void TopKBuffer::initVectors() {
auto payloadState = std::make_shared<common::DataChunkState>();
auto lastPayloadState = std::make_shared<common::DataChunkState>();
for (auto& [pos, type] : orderByDataInfo->payloadsPosAndType) {
auto payloadVec = std::make_unique<common::ValueVector>(type, memoryManager);
auto lastPayloadVec = std::make_unique<common::ValueVector>(type, memoryManager);
payloadVec->setState(payloadState);
lastPayloadVec->setState(lastPayloadState);
payloadVecsToScan.push_back(payloadVec.get());
lastPayloadVecsToScan.push_back(lastPayloadVec.get());
tmpVectors.push_back(std::move(payloadVec));
tmpVectors.push_back(std::move(lastPayloadVec));
}
auto boundaryState = common::DataChunkState::getSingleValueDataChunkState();
for (auto& [pos, type] : orderByDataInfo->keysPosAndType) {
auto boundaryVec = std::make_unique<common::ValueVector>(type, memoryManager);
boundaryVec->setState(boundaryState);
boundaryVecs.push_back(std::move(boundaryVec));
auto posInPayload = findKeyVectorPosInPayload(pos);
if (posInPayload == UINT64_MAX) {
// If the key is not present in the payload, create a new vector.
auto keyVec = std::make_unique<common::ValueVector>(type, memoryManager);
auto lastKeyVec = std::make_unique<common::ValueVector>(type, memoryManager);
keyVecsToScan.push_back(keyVec.get());
lastKeyVecsToScan.push_back(lastKeyVec.get());
tmpVectors.push_back(std::move(keyVec));

Check warning on line 125 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L121-L125

Added lines #L121 - L125 were not covered by tests
tmpVectors.push_back(std::move(lastKeyVec));
} else {

Check warning on line 127 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L127

Added line #L127 was not covered by tests
// Otherwise grab the vector from the payload.
keyVecsToScan.push_back(payloadVecsToScan[posInPayload]);
lastKeyVecsToScan.push_back(lastPayloadVecsToScan[posInPayload]);
}
}
}

uint64_t TopKBuffer::findKeyVectorPosInPayload(const DataPos& keyPos) {
// TODO(Xiyang): this information should be passed by front end. (e.g. The key vector pos in the
// payload vector)
for (auto i = 0u; i < orderByDataInfo->payloadsPosAndType.size(); i++) {
if (keyPos == orderByDataInfo->payloadsPosAndType[i].first) {
return i;
}
}
return UINT64_MAX;
}

template<typename FUNC>
void TopKBuffer::getSelectComparisonFunction(
common::PhysicalTypeID typeID, vector_select_comparison_func& selectFunc) {
Expand Down Expand Up @@ -198,7 +202,7 @@ void TopKBuffer::setBoundaryValue() {
auto dstData =
boundaryVec->getData() + boundaryVec->getNumBytesPerValue() *
boundaryVec->state->selVector->selectedPositions[0];
auto srcVector = tmpKeyVecsToScan[i];
auto srcVector = lastKeyVecsToScan[i];

Check warning on line 205 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L203-L205

Added lines #L203 - L205 were not covered by tests
auto srcData = srcVector->getData() +
srcVector->getNumBytesPerValue() *

Check warning on line 207 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L207

Added line #L207 was not covered by tests
srcVector->state->selVector
Expand All @@ -208,6 +212,31 @@ void TopKBuffer::setBoundaryValue() {
}
}

Check warning on line 213 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L213

Added line #L213 was not covered by tests

bool TopKBuffer::compareBoundaryValue(std::vector<common::ValueVector*>& keyVectors) {
if (keyVectors[0]->state->isFlat()) {
for (auto i = 0u; i < keyVectors.size(); i++) {

Check warning on line 217 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L215-L217

Added lines #L215 - L217 were not covered by tests
std::shared_ptr<common::SelectionVector> selVector =
std::make_shared<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
selVector->resetSelectorToValuePosBuffer();
if (!compareFuncs[i](*keyVectors[i], *boundaryVecs[i], *selVector)) {

Check warning on line 221 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L221

Added line #L221 was not covered by tests
return false;
}
}
} else {
auto dataChunkState = keyVectors[0]->state;
for (auto i = 0u; i < keyVectors.size(); i++) {

Check warning on line 227 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L227

Added line #L227 was not covered by tests
std::shared_ptr<common::SelectionVector> selVector =
std::make_shared<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
selVector->resetSelectorToValuePosBuffer();
if (!compareFuncs[i](*keyVectors[i], *boundaryVecs[i], *selVector)) {

Check warning on line 231 in src/processor/operator/order_by/top_k.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/order_by/top_k.cpp#L231

Added line #L231 was not covered by tests
return false;
}
dataChunkState->selVector = std::move(selVector);
}
}
return true;
}

void TopKLocalState::init(const OrderByDataInfo& orderByDataInfo,
storage::MemoryManager* memoryManager, ResultSet& resultSet, uint64_t skipNumber,
uint64_t limitNumber) {
Expand Down
4 changes: 4 additions & 0 deletions test/test_files/tinysnb/order_by/single_label.test
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ Elizabeth
-STATEMENT MATCH (p:person) WHERE p.age > 100 RETURN p.age ORDER BY p.age
---- 0

-LOG OrderByLimitEmptyResult
-STATEMENT MATCH (p:person) WHERE p.age > 100 RETURN p.age ORDER BY p.age limit 10
---- 0

-LOG OrderByAggregateTest1
-STATEMENT MATCH (a:person)-[:knows]->(b:person) return a.age, COUNT(b) as c ORDER BY a.age
-CHECK_ORDER
Expand Down

0 comments on commit 06b4a2a

Please sign in to comment.