Skip to content

Commit

Permalink
Implement top-k optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 27, 2023
1 parent ee029b9 commit 27da653
Show file tree
Hide file tree
Showing 34 changed files with 1,081 additions and 2,235 deletions.
6 changes: 6 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,11 @@ struct ClientContextConstants {
static constexpr uint64_t TIMEOUT_IN_MS = 0;
};

struct OrderByConstants {
static constexpr uint64_t NUM_BYTES_FOR_PAYLOAD_IDX = 8;
static constexpr uint64_t MIN_SIZE_TO_REDUCE = common::DEFAULT_VECTOR_CAPACITY * 5;
static constexpr uint64_t MIN_LIMIT_RATIO_TO_REDUCE = 2;
};

} // namespace common
} // namespace kuzu
33 changes: 9 additions & 24 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,7 @@ class MergedKeyBlocks {
};

struct BlockPtrInfo {
inline BlockPtrInfo(
uint64_t startTupleIdx, uint64_t endTupleIdx, std::shared_ptr<MergedKeyBlocks>& keyBlocks)
: keyBlocks{keyBlocks}, curBlockIdx{startTupleIdx / keyBlocks->getNumTuplesPerBlock()},
endBlockIdx{endTupleIdx == 0 ? 0 : (endTupleIdx - 1) / keyBlocks->getNumTuplesPerBlock()},
endTupleIdx{endTupleIdx} {
if (startTupleIdx == endTupleIdx) {
curTuplePtr = nullptr;
endTuplePtr = nullptr;
curBlockEndTuplePtr = nullptr;
} else {
curTuplePtr = keyBlocks->getTuple(startTupleIdx);
endTuplePtr = keyBlocks->getBlockEndTuplePtr(endBlockIdx, endTupleIdx, endBlockIdx);
curBlockEndTuplePtr =
keyBlocks->getBlockEndTuplePtr(curBlockIdx, endTupleIdx, endBlockIdx);
}
}
BlockPtrInfo(uint64_t startTupleIdx, uint64_t endTupleIdx, MergedKeyBlocks* keyBlocks);

inline bool hasMoreTuplesToRead() const { return curTuplePtr != endTuplePtr; }

Expand All @@ -90,7 +75,7 @@ struct BlockPtrInfo {

void updateTuplePtrIfNecessary();

std::shared_ptr<MergedKeyBlocks>& keyBlocks;
MergedKeyBlocks* keyBlocks;
uint8_t* curTuplePtr;
uint64_t curBlockIdx;
uint64_t endBlockIdx;
Expand All @@ -101,9 +86,9 @@ struct BlockPtrInfo {

class KeyBlockMerger {
public:
explicit KeyBlockMerger(std::vector<std::shared_ptr<FactorizedTable>>& factorizedTables,
explicit KeyBlockMerger(std::vector<FactorizedTable*> factorizedTables,
std::vector<StrKeyColInfo>& strKeyColsInfo, uint32_t numBytesPerTuple)
: factorizedTables{factorizedTables}, strKeyColsInfo{strKeyColsInfo},
: factorizedTables{std::move(factorizedTables)}, strKeyColsInfo{strKeyColsInfo},
numBytesPerTuple{numBytesPerTuple}, numBytesToCompare{numBytesPerTuple - 8},
hasStringCol{!strKeyColsInfo.empty()} {}

Expand All @@ -123,7 +108,7 @@ class KeyBlockMerger {
// FactorizedTables[i] stores all order_by columns encoded and sorted by the ith thread.
// MergeSort uses factorizedTable to access the full contents of the string key columns
// when resolving ties.
std::vector<std::shared_ptr<FactorizedTable>>& factorizedTables;
std::vector<FactorizedTable*> factorizedTables;
// We also store the colIdxInFactorizedTable, colOffsetInEncodedKeyBlock, isAscOrder, isStrCol
// for each string column. So, we don't need to compute them again during merge sort.
std::vector<StrKeyColInfo>& strKeyColsInfo;
Expand Down Expand Up @@ -200,15 +185,15 @@ class KeyBlockMergeTaskDispatcher {
// This function is used to initialize the columns of keyBlockMergeTaskDispatcher based on
// sharedFactorizedTablesAndSortedKeyBlocks.
void init(storage::MemoryManager* memoryManager,
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks,
std::vector<std::shared_ptr<FactorizedTable>>& factorizedTables,
std::vector<StrKeyColInfo>& strKeyColsInfo, uint64_t numBytesPerTuple);
std::queue<std::shared_ptr<MergedKeyBlocks>>* sortedKeyBlocks,
std::vector<FactorizedTable*> factorizedTables, std::vector<StrKeyColInfo>& strKeyColsInfo,
uint64_t numBytesPerTuple);

private:
std::mutex mtx;

storage::MemoryManager* memoryManager;
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;
std::queue<std::shared_ptr<MergedKeyBlocks>>* sortedKeyBlocks;
std::vector<std::shared_ptr<KeyBlockMergeTask>> activeKeyBlockMergeTasks;
std::unique_ptr<KeyBlockMerger> keyBlockMerger;
};
Expand Down
118 changes: 15 additions & 103 deletions src/include/processor/operator/order_by/order_by.h
Original file line number Diff line number Diff line change
@@ -1,110 +1,26 @@
#pragma once

#include <queue>

#include "common/data_chunk/data_chunk_state.h"
#include "common/in_mem_overflow_buffer.h"
#include "processor/operator/order_by/radix_sort.h"
#include "processor/operator/sink.h"
#include "processor/result/factorized_table.h"
#include "processor/result/result_set.h"
#include "sort_state.h"

namespace kuzu {
namespace processor {

// This class contains factorizedTables, nextFactorizedTableIdx, strKeyColsInfo,
// sortedKeyBlocks and the size of each tuple in keyBlocks. The class is shared between the
// order_by, orderByMerge, orderByScan operators. All functions are guaranteed to be thread-safe, so
// caller doesn't need to acquire a lock before calling these functions.
class SharedFactorizedTablesAndSortedKeyBlocks {
public:
explicit SharedFactorizedTablesAndSortedKeyBlocks()
: nextFactorizedTableIdx{0},
sortedKeyBlocks{std::make_shared<std::queue<std::shared_ptr<MergedKeyBlocks>>>()} {}

uint8_t getNextFactorizedTableIdx() {
std::unique_lock lck{mtx};
return nextFactorizedTableIdx++;
}

void appendFactorizedTable(
uint8_t factorizedTableIdx, std::shared_ptr<FactorizedTable> factorizedTable) {
std::unique_lock lck{mtx};
// If the factorizedTables is full, resize the factorizedTables and
// insert the factorizedTable to the set.
if (factorizedTableIdx >= factorizedTables.size()) {
factorizedTables.resize(factorizedTableIdx + 1);
}
factorizedTables[factorizedTableIdx] = std::move(factorizedTable);
}

void appendSortedKeyBlock(std::shared_ptr<MergedKeyBlocks> mergedDataBlocks) {
std::unique_lock lck{mtx};
sortedKeyBlocks->emplace(mergedDataBlocks);
}

void setNumBytesPerTuple(uint32_t _numBytesPerTuple) {
assert(numBytesPerTuple == UINT32_MAX);
numBytesPerTuple = _numBytesPerTuple;
}

void combineFTHasNoNullGuarantee() {
for (auto i = 1u; i < factorizedTables.size(); i++) {
factorizedTables[0]->mergeMayContainNulls(*factorizedTables[i]);
}
}

void setStrKeyColInfo(std::vector<StrKeyColInfo> _strKeyColsInfo) {
assert(strKeyColsInfo.empty());
strKeyColsInfo = std::move(_strKeyColsInfo);
}

private:
std::mutex mtx;

public:
std::vector<std::shared_ptr<FactorizedTable>> factorizedTables;
uint8_t nextFactorizedTableIdx;
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;

uint32_t numBytesPerTuple = UINT32_MAX; // encoding size
std::vector<StrKeyColInfo> strKeyColsInfo;
};

struct OrderByDataInfo {
public:
OrderByDataInfo(std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType,
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType,
std::vector<bool> isPayloadFlat, std::vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType;
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
std::vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
};

class OrderBy : public Sink {
public:
OrderBy(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
const OrderByDataInfo& orderByDataInfo,
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
const OrderByDataInfo& orderByDataInfo, std::unique_ptr<SortLocalState> localState,
std::shared_ptr<SortSharedState> sharedState, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::ORDER_BY, std::move(child), id,
paramsString},
orderByDataInfo{orderByDataInfo}, sharedState{std::move(sharedState)} {}
orderByDataInfo{orderByDataInfo}, localState{std::move(localState)},
sharedState{std::move(sharedState)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

void executeInternal(ExecutionContext* context) override;

Expand All @@ -117,24 +33,20 @@ class OrderBy : public Sink {
}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<OrderBy>(resultSetDescriptor->copy(), orderByDataInfo, sharedState,
children[0]->clone(), id, paramsString);
return std::make_unique<OrderBy>(resultSetDescriptor->copy(), orderByDataInfo,
std::make_unique<SortLocalState>(), sharedState, children[0]->clone(), id,
paramsString);
}

private:
std::unique_ptr<FactorizedTableSchema> populateTableSchema();

void initGlobalStateInternal(ExecutionContext* context) override;
void initGlobalStateInternal(ExecutionContext* context) final;

private:
uint8_t factorizedTableIdx;
OrderByDataInfo orderByDataInfo;
std::unique_ptr<OrderByKeyEncoder> orderByKeyEncoder;
std::unique_ptr<RadixSort> radixSorter;
std::vector<common::ValueVector*> keyVectors;
std::vector<common::ValueVector*> vectorsToAppend;
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::shared_ptr<FactorizedTable> localFactorizedTable;
std::unique_ptr<SortLocalState> localState;
std::shared_ptr<SortSharedState> sharedState;
std::vector<common::ValueVector*> orderByVectors;
std::vector<common::ValueVector*> payloadVectors;
};

} // namespace processor
Expand Down
33 changes: 28 additions & 5 deletions src/include/processor/operator/order_by/order_by_key_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ namespace processor {

#define BSWAP16(x) ((uint16_t)((((uint16_t)(x)&0xff00) >> 8) | (((uint16_t)(x)&0x00ff) << 8)))

struct OrderByDataInfo {
public:
OrderByDataInfo(std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType,
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType,
std::vector<bool> isPayloadFlat, std::vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType;
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
std::vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
};

// The OrderByKeyEncoder encodes all columns in the ORDER BY clause into a single binary sequence
// that, when compared using memcmp will yield the correct overall sorting order. On little-endian
// hardware, the least-significant byte is stored at the smallest address. To encode the sorting
Expand All @@ -45,9 +68,8 @@ using encode_function_t = std::function<void(const uint8_t*, uint8_t*, bool)>;
class OrderByKeyEncoder {

public:
OrderByKeyEncoder(std::vector<common::ValueVector*>& orderByVectors,
std::vector<bool>& isAscOrder, storage::MemoryManager* memoryManager, uint8_t ftIdx,
uint32_t numTuplesPerBlockInFT, uint32_t numBytesPerTuple);
OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager,
uint8_t ftIdx, uint32_t numTuplesPerBlockInFT, uint32_t numBytesPerTuple);

inline std::vector<std::shared_ptr<DataBlock>>& getKeyBlocks() { return keyBlocks; }

Expand Down Expand Up @@ -86,7 +108,9 @@ class OrderByKeyEncoder {

static uint32_t getEncodingSize(const common::LogicalType& dataType);

void encodeKeys();
void encodeKeys(std::vector<common::ValueVector*> orderByKeys);

inline void clear() { keyBlocks.clear(); }

private:
template<typename type>
Expand Down Expand Up @@ -121,7 +145,6 @@ class OrderByKeyEncoder {
private:
storage::MemoryManager* memoryManager;
std::vector<std::shared_ptr<DataBlock>> keyBlocks;
std::vector<common::ValueVector*>& orderByVectors;
std::vector<bool> isAscOrder;
uint32_t numBytesPerTuple;
uint32_t maxNumTuplesPerBlock;
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ class OrderByMerge : public Sink {
public:
// This constructor will only be called by the mapper when constructing the orderByMerge
// operator, because the mapper doesn't know the existence of keyBlockMergeTaskDispatcher
OrderByMerge(std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
OrderByMerge(std::shared_ptr<SortSharedState> sharedState,
std::shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{nullptr /* resultSetDescriptor */, PhysicalOperatorType::ORDER_BY_MERGE,
std::move(child), id, paramsString},
sharedState{std::move(sharedState)}, sharedDispatcher{std::move(sharedDispatcher)} {}

// This constructor is used for cloning only.
OrderByMerge(std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
OrderByMerge(std::shared_ptr<SortSharedState> sharedState,
std::shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher, uint32_t id,
const std::string& paramsString)
: Sink{nullptr /* resultSetDescriptor */, PhysicalOperatorType::ORDER_BY_MERGE, id,
Expand All @@ -42,7 +42,7 @@ class OrderByMerge : public Sink {
void initGlobalStateInternal(ExecutionContext* context) override;

private:
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::shared_ptr<SortSharedState> sharedState;
std::unique_ptr<KeyBlockMerger> localMerger;
std::shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher;
};
Expand Down
48 changes: 23 additions & 25 deletions src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,49 @@
namespace kuzu {
namespace processor {

struct MergedKeyBlockScanState {
bool scanSingleTuple;
uint32_t nextTupleIdxToReadInMergedKeyBlock;
std::shared_ptr<MergedKeyBlocks> mergedKeyBlock;
uint32_t tupleIdxAndFactorizedTableIdxOffset;
std::vector<uint32_t> colsToScan;
std::unique_ptr<uint8_t*[]> tuplesToRead;
std::unique_ptr<BlockPtrInfo> blockPtrInfo;
struct OrderByScanLocalState {
std::vector<common::ValueVector*> vectorsToRead;
std::unique_ptr<PayloadScanner> payloadScanner;

void init(
std::vector<DataPos>& outVectorPos, SortSharedState& sharedState, ResultSet& resultSet);

inline uint64_t scan() { return payloadScanner->scan(vectorsToRead); }
};

// To preserve the ordering of tuples, the orderByScan operator will only
// be executed in single-thread mode.
class OrderByScan : public PhysicalOperator {
public:
OrderByScan(std::vector<DataPos> outVectorPos,
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
OrderByScan(std::vector<DataPos> outVectorPos, std::shared_ptr<SortSharedState> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::ORDER_BY_SCAN, std::move(child), id, paramsString},
outVectorPos{std::move(outVectorPos)}, sharedState{std::move(sharedState)} {}
outVectorPos{std::move(outVectorPos)},
localState{std::make_unique<OrderByScanLocalState>()}, sharedState{
std::move(sharedState)} {}

// This constructor is used for cloning only.
OrderByScan(std::vector<DataPos> outVectorPos,
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState, uint32_t id,
const std::string& paramsString)
OrderByScan(std::vector<DataPos> outVectorPos, std::shared_ptr<SortSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::ORDER_BY_SCAN, id, paramsString},
outVectorPos{std::move(outVectorPos)}, sharedState{std::move(sharedState)} {}
outVectorPos{std::move(outVectorPos)},
localState{std::make_unique<OrderByScanLocalState>()}, sharedState{
std::move(sharedState)} {}

inline bool isSource() const override { return true; }
inline bool isSource() const final { return true; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

std::unique_ptr<PhysicalOperator> clone() override {
std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<OrderByScan>(outVectorPos, sharedState, id, paramsString);
}

private:
void initMergedKeyBlockScanState();

private:
std::vector<DataPos> outVectorPos;
std::shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
std::vector<common::ValueVector*> vectorsToRead;
std::unique_ptr<MergedKeyBlockScanState> mergedKeyBlockScanState;
std::unique_ptr<OrderByScanLocalState> localState;
std::shared_ptr<SortSharedState> sharedState;
};

} // namespace processor
Expand Down
Loading

0 comments on commit 27da653

Please sign in to comment.