Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement top-k optimization #1960

Merged
merged 1 commit into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,11 @@ struct ClientContextConstants {
static constexpr uint64_t TIMEOUT_IN_MS = 0;
};

struct OrderByConstants {
static constexpr uint64_t NUM_BYTES_FOR_PAYLOAD_IDX = 8;
static constexpr uint64_t MIN_SIZE_TO_REDUCE = common::DEFAULT_VECTOR_CAPACITY * 5;
static constexpr uint64_t MIN_LIMIT_RATIO_TO_REDUCE = 2;
};

} // namespace common
} // namespace kuzu
33 changes: 9 additions & 24 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,7 @@ class MergedKeyBlocks {
};

struct BlockPtrInfo {
inline BlockPtrInfo(
uint64_t startTupleIdx, uint64_t endTupleIdx, std::shared_ptr<MergedKeyBlocks>& keyBlocks)
: keyBlocks{keyBlocks}, curBlockIdx{startTupleIdx / keyBlocks->getNumTuplesPerBlock()},
endBlockIdx{endTupleIdx == 0 ? 0 : (endTupleIdx - 1) / keyBlocks->getNumTuplesPerBlock()},
endTupleIdx{endTupleIdx} {
if (startTupleIdx == endTupleIdx) {
curTuplePtr = nullptr;
endTuplePtr = nullptr;
curBlockEndTuplePtr = nullptr;
} else {
curTuplePtr = keyBlocks->getTuple(startTupleIdx);
endTuplePtr = keyBlocks->getBlockEndTuplePtr(endBlockIdx, endTupleIdx, endBlockIdx);
curBlockEndTuplePtr =
keyBlocks->getBlockEndTuplePtr(curBlockIdx, endTupleIdx, endBlockIdx);
}
}
BlockPtrInfo(uint64_t startTupleIdx, uint64_t endTupleIdx, MergedKeyBlocks* keyBlocks);

inline bool hasMoreTuplesToRead() const { return curTuplePtr != endTuplePtr; }

Expand All @@ -90,7 +75,7 @@ struct BlockPtrInfo {

void updateTuplePtrIfNecessary();

std::shared_ptr<MergedKeyBlocks>& keyBlocks;
MergedKeyBlocks* keyBlocks;
uint8_t* curTuplePtr;
uint64_t curBlockIdx;
uint64_t endBlockIdx;
Expand All @@ -101,9 +86,9 @@ struct BlockPtrInfo {

class KeyBlockMerger {
public:
explicit KeyBlockMerger(std::vector<std::shared_ptr<FactorizedTable>>& factorizedTables,
explicit KeyBlockMerger(std::vector<FactorizedTable*> factorizedTables,
std::vector<StrKeyColInfo>& strKeyColsInfo, uint32_t numBytesPerTuple)
: factorizedTables{factorizedTables}, strKeyColsInfo{strKeyColsInfo},
: factorizedTables{std::move(factorizedTables)}, strKeyColsInfo{strKeyColsInfo},
numBytesPerTuple{numBytesPerTuple}, numBytesToCompare{numBytesPerTuple - 8},
hasStringCol{!strKeyColsInfo.empty()} {}

Expand All @@ -123,7 +108,7 @@ class KeyBlockMerger {
// FactorizedTables[i] stores all order_by columns encoded and sorted by the ith thread.
// MergeSort uses factorizedTable to access the full contents of the string key columns
// when resolving ties.
std::vector<std::shared_ptr<FactorizedTable>>& factorizedTables;
std::vector<FactorizedTable*> factorizedTables;
// We also store the colIdxInFactorizedTable, colOffsetInEncodedKeyBlock, isAscOrder, isStrCol
// for each string column. So, we don't need to compute them again during merge sort.
std::vector<StrKeyColInfo>& strKeyColsInfo;
Expand Down Expand Up @@ -200,15 +185,15 @@ class KeyBlockMergeTaskDispatcher {
// This function is used to initialize the columns of keyBlockMergeTaskDispatcher based on
// sharedFactorizedTablesAndSortedKeyBlocks.
void init(storage::MemoryManager* memoryManager,
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks,
std::vector<std::shared_ptr<FactorizedTable>>& factorizedTables,
std::vector<StrKeyColInfo>& strKeyColsInfo, uint64_t numBytesPerTuple);
std::queue<std::shared_ptr<MergedKeyBlocks>>* sortedKeyBlocks,
std::vector<FactorizedTable*> factorizedTables, std::vector<StrKeyColInfo>& strKeyColsInfo,
uint64_t numBytesPerTuple);

private:
std::mutex mtx;

storage::MemoryManager* memoryManager;
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;
std::queue<std::shared_ptr<MergedKeyBlocks>>* sortedKeyBlocks;
std::vector<std::shared_ptr<KeyBlockMergeTask>> activeKeyBlockMergeTasks;
std::unique_ptr<KeyBlockMerger> keyBlockMerger;
};
Expand Down
118 changes: 15 additions & 103 deletions src/include/processor/operator/order_by/order_by.h
Original file line number Diff line number Diff line change
@@ -1,110 +1,26 @@
#pragma once

#include <queue>

#include "common/data_chunk/data_chunk_state.h"
#include "common/in_mem_overflow_buffer.h"
#include "processor/operator/order_by/radix_sort.h"
#include "processor/operator/sink.h"
#include "processor/result/factorized_table.h"
#include "processor/result/result_set.h"
#include "sort_state.h"

namespace kuzu {
namespace processor {

// This class contains factorizedTables, nextFactorizedTableIdx, strKeyColsInfo,
// sortedKeyBlocks and the size of each tuple in keyBlocks. The class is shared between the
// order_by, orderByMerge, orderByScan operators. All functions are guaranteed to be thread-safe, so
// caller doesn't need to acquire a lock before calling these functions.
class SharedFactorizedTablesAndSortedKeyBlocks {
public:
explicit SharedFactorizedTablesAndSortedKeyBlocks()
: nextFactorizedTableIdx{0},
sortedKeyBlocks{std::make_shared<std::queue<std::shared_ptr<MergedKeyBlocks>>>()} {}

uint8_t getNextFactorizedTableIdx() {
std::unique_lock lck{mtx};
return nextFactorizedTableIdx++;
}

void appendFactorizedTable(
uint8_t factorizedTableIdx, std::shared_ptr<FactorizedTable> factorizedTable) {
std::unique_lock lck{mtx};
// If the factorizedTables is full, resize the factorizedTables and
// insert the factorizedTable to the set.
if (factorizedTableIdx >= factorizedTables.size()) {
factorizedTables.resize(factorizedTableIdx + 1);
}
factorizedTables[factorizedTableIdx] = std::move(factorizedTable);
}

void appendSortedKeyBlock(std::shared_ptr<MergedKeyBlocks> mergedDataBlocks) {
std::unique_lock lck{mtx};
sortedKeyBlocks->emplace(mergedDataBlocks);
}

void setNumBytesPerTuple(uint32_t _numBytesPerTuple) {
assert(numBytesPerTuple == UINT32_MAX);
numBytesPerTuple = _numBytesPerTuple;
}

void combineFTHasNoNullGuarantee() {
for (auto i = 1u; i < factorizedTables.size(); i++) {
factorizedTables[0]->mergeMayContainNulls(*factorizedTables[i]);
}
}

void setStrKeyColInfo(std::vector<StrKeyColInfo> _strKeyColsInfo) {
assert(strKeyColsInfo.empty());
strKeyColsInfo = std::move(_strKeyColsInfo);
}

private:
std::mutex mtx;

public:
std::vector<std::shared_ptr<FactorizedTable>> factorizedTables;
uint8_t nextFactorizedTableIdx;
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;

uint32_t numBytesPerTuple = UINT32_MAX; // encoding size
std::vector<StrKeyColInfo> strKeyColsInfo;
};

struct OrderByDataInfo {
public:
OrderByDataInfo(std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType,
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType,
std::vector<bool> isPayloadFlat, std::vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType;
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
std::vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
};

class OrderBy : public Sink {
public:
OrderBy(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
const OrderByDataInfo& orderByDataInfo,
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
const OrderByDataInfo& orderByDataInfo, std::unique_ptr<SortLocalState> localState,
std::shared_ptr<SortSharedState> sharedState, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::ORDER_BY, std::move(child), id,
paramsString},
orderByDataInfo{orderByDataInfo}, sharedState{std::move(sharedState)} {}
orderByDataInfo{orderByDataInfo}, localState{std::move(localState)},
sharedState{std::move(sharedState)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

void executeInternal(ExecutionContext* context) override;

Expand All @@ -117,24 +33,20 @@ class OrderBy : public Sink {
}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<OrderBy>(resultSetDescriptor->copy(), orderByDataInfo, sharedState,
children[0]->clone(), id, paramsString);
return std::make_unique<OrderBy>(resultSetDescriptor->copy(), orderByDataInfo,
std::make_unique<SortLocalState>(), sharedState, children[0]->clone(), id,
paramsString);
}

private:
std::unique_ptr<FactorizedTableSchema> populateTableSchema();

void initGlobalStateInternal(ExecutionContext* context) override;
void initGlobalStateInternal(ExecutionContext* context) final;

private:
uint8_t factorizedTableIdx;
OrderByDataInfo orderByDataInfo;
std::unique_ptr<OrderByKeyEncoder> orderByKeyEncoder;
std::unique_ptr<RadixSort> radixSorter;
std::vector<common::ValueVector*> keyVectors;
std::vector<common::ValueVector*> vectorsToAppend;
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::shared_ptr<FactorizedTable> localFactorizedTable;
std::unique_ptr<SortLocalState> localState;
std::shared_ptr<SortSharedState> sharedState;
std::vector<common::ValueVector*> orderByVectors;
std::vector<common::ValueVector*> payloadVectors;
};

} // namespace processor
Expand Down
33 changes: 28 additions & 5 deletions src/include/processor/operator/order_by/order_by_key_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ namespace processor {

#define BSWAP16(x) ((uint16_t)((((uint16_t)(x)&0xff00) >> 8) | (((uint16_t)(x)&0x00ff) << 8)))

struct OrderByDataInfo {
public:
OrderByDataInfo(std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType,
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType,
std::vector<bool> isPayloadFlat, std::vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType;
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
std::vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
};

// The OrderByKeyEncoder encodes all columns in the ORDER BY clause into a single binary sequence
// that, when compared using memcmp will yield the correct overall sorting order. On little-endian
// hardware, the least-significant byte is stored at the smallest address. To encode the sorting
Expand All @@ -45,9 +68,8 @@ using encode_function_t = std::function<void(const uint8_t*, uint8_t*, bool)>;
class OrderByKeyEncoder {

public:
OrderByKeyEncoder(std::vector<common::ValueVector*>& orderByVectors,
std::vector<bool>& isAscOrder, storage::MemoryManager* memoryManager, uint8_t ftIdx,
uint32_t numTuplesPerBlockInFT, uint32_t numBytesPerTuple);
OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager,
uint8_t ftIdx, uint32_t numTuplesPerBlockInFT, uint32_t numBytesPerTuple);

inline std::vector<std::shared_ptr<DataBlock>>& getKeyBlocks() { return keyBlocks; }

Expand Down Expand Up @@ -86,7 +108,9 @@ class OrderByKeyEncoder {

static uint32_t getEncodingSize(const common::LogicalType& dataType);

void encodeKeys();
void encodeKeys(std::vector<common::ValueVector*> orderByKeys);

inline void clear() { keyBlocks.clear(); }

private:
template<typename type>
Expand Down Expand Up @@ -121,7 +145,6 @@ class OrderByKeyEncoder {
private:
storage::MemoryManager* memoryManager;
std::vector<std::shared_ptr<DataBlock>> keyBlocks;
std::vector<common::ValueVector*>& orderByVectors;
std::vector<bool> isAscOrder;
uint32_t numBytesPerTuple;
uint32_t maxNumTuplesPerBlock;
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ class OrderByMerge : public Sink {
public:
// This constructor will only be called by the mapper when constructing the orderByMerge
// operator, because the mapper doesn't know the existence of keyBlockMergeTaskDispatcher
OrderByMerge(std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
OrderByMerge(std::shared_ptr<SortSharedState> sharedState,
std::shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{nullptr /* resultSetDescriptor */, PhysicalOperatorType::ORDER_BY_MERGE,
std::move(child), id, paramsString},
sharedState{std::move(sharedState)}, sharedDispatcher{std::move(sharedDispatcher)} {}

// This constructor is used for cloning only.
OrderByMerge(std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
OrderByMerge(std::shared_ptr<SortSharedState> sharedState,
std::shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher, uint32_t id,
const std::string& paramsString)
: Sink{nullptr /* resultSetDescriptor */, PhysicalOperatorType::ORDER_BY_MERGE, id,
Expand All @@ -42,7 +42,7 @@ class OrderByMerge : public Sink {
void initGlobalStateInternal(ExecutionContext* context) override;

private:
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::shared_ptr<SortSharedState> sharedState;
std::unique_ptr<KeyBlockMerger> localMerger;
std::shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher;
};
Expand Down
48 changes: 23 additions & 25 deletions src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,49 @@
namespace kuzu {
namespace processor {

struct MergedKeyBlockScanState {
bool scanSingleTuple;
uint32_t nextTupleIdxToReadInMergedKeyBlock;
std::shared_ptr<MergedKeyBlocks> mergedKeyBlock;
uint32_t tupleIdxAndFactorizedTableIdxOffset;
std::vector<uint32_t> colsToScan;
std::unique_ptr<uint8_t*[]> tuplesToRead;
std::unique_ptr<BlockPtrInfo> blockPtrInfo;
struct OrderByScanLocalState {
std::vector<common::ValueVector*> vectorsToRead;
std::unique_ptr<PayloadScanner> payloadScanner;

void init(
std::vector<DataPos>& outVectorPos, SortSharedState& sharedState, ResultSet& resultSet);

inline uint64_t scan() { return payloadScanner->scan(vectorsToRead); }
};

// To preserve the ordering of tuples, the orderByScan operator will only
// be executed in single-thread mode.
class OrderByScan : public PhysicalOperator {
public:
OrderByScan(std::vector<DataPos> outVectorPos,
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
OrderByScan(std::vector<DataPos> outVectorPos, std::shared_ptr<SortSharedState> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::ORDER_BY_SCAN, std::move(child), id, paramsString},
outVectorPos{std::move(outVectorPos)}, sharedState{std::move(sharedState)} {}
outVectorPos{std::move(outVectorPos)},
localState{std::make_unique<OrderByScanLocalState>()}, sharedState{
std::move(sharedState)} {}

// This constructor is used for cloning only.
OrderByScan(std::vector<DataPos> outVectorPos,
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState, uint32_t id,
const std::string& paramsString)
OrderByScan(std::vector<DataPos> outVectorPos, std::shared_ptr<SortSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::ORDER_BY_SCAN, id, paramsString},
outVectorPos{std::move(outVectorPos)}, sharedState{std::move(sharedState)} {}
outVectorPos{std::move(outVectorPos)},
localState{std::make_unique<OrderByScanLocalState>()}, sharedState{
std::move(sharedState)} {}

inline bool isSource() const override { return true; }
inline bool isSource() const final { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

std::unique_ptr<PhysicalOperator> clone() override {
std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<OrderByScan>(outVectorPos, sharedState, id, paramsString);
}

private:
void initMergedKeyBlockScanState();

private:
std::vector<DataPos> outVectorPos;
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::vector<common::ValueVector*> vectorsToRead;
std::unique_ptr<MergedKeyBlockScanState> mergedKeyBlockScanState;
std::unique_ptr<OrderByScanLocalState> localState;
std::shared_ptr<SortSharedState> sharedState;
};

} // namespace processor
Expand Down
Loading