Skip to content

Commit

Permalink
Merge pull request #1971 from kuzudb/order-by-compilation
Browse files Browse the repository at this point in the history
Refactor order by compilation
  • Loading branch information
andyfengHKU committed Aug 29, 2023
2 parents 440eb0a + 7267f59 commit e1cfe67
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 253 deletions.
15 changes: 6 additions & 9 deletions src/include/processor/operator/order_by/order_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ namespace processor {
class OrderBy : public Sink {
public:
OrderBy(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
const OrderByDataInfo& orderByDataInfo, std::unique_ptr<SortLocalState> localState,
std::shared_ptr<SortSharedState> sharedState, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
std::unique_ptr<OrderByDataInfo> info, std::shared_ptr<SortSharedState> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::ORDER_BY, std::move(child), id,
paramsString},
orderByDataInfo{orderByDataInfo}, localState{std::move(localState)},
sharedState{std::move(sharedState)} {}
info{std::move(info)}, sharedState{std::move(sharedState)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

Expand All @@ -33,16 +31,15 @@ class OrderBy : public Sink {
}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<OrderBy>(resultSetDescriptor->copy(), orderByDataInfo,
std::make_unique<SortLocalState>(), sharedState, children[0]->clone(), id,
paramsString);
return std::make_unique<OrderBy>(resultSetDescriptor->copy(), info->copy(), sharedState,
children[0]->clone(), id, paramsString);
}

private:
void initGlobalStateInternal(ExecutionContext* context) final;

private:
OrderByDataInfo orderByDataInfo;
std::unique_ptr<OrderByDataInfo> info;
std::unique_ptr<SortLocalState> localState;
std::shared_ptr<SortSharedState> sharedState;
std::vector<common::ValueVector*> orderByVectors;
Expand Down
42 changes: 42 additions & 0 deletions src/include/processor/operator/order_by/order_by_data_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include "common/types/types_include.h"
#include "processor/data_pos.h"
#include "processor/result/factorized_table.h"

namespace kuzu {
namespace processor {

struct OrderByDataInfo {
std::vector<DataPos> keysPos;
std::vector<DataPos> payloadsPos;
std::vector<std::unique_ptr<common::LogicalType>> keyTypes;
std::vector<std::unique_ptr<common::LogicalType>> payloadTypes;
std::vector<bool> isAscOrder;
std::unique_ptr<FactorizedTableSchema> payloadTableSchema;
std::vector<uint32_t> keyInPayloadPos;

OrderByDataInfo(std::vector<DataPos> keysPos, std::vector<DataPos> payloadsPos,
std::vector<std::unique_ptr<common::LogicalType>> keyTypes,
std::vector<std::unique_ptr<common::LogicalType>> payloadTypes,
std::vector<bool> isAscOrder, std::unique_ptr<FactorizedTableSchema> payloadTableSchema,
std::vector<uint32_t> keyInPayloadPos)
: keysPos{std::move(keysPos)}, payloadsPos{std::move(payloadsPos)}, keyTypes{std::move(
keyTypes)},
payloadTypes{std::move(payloadTypes)}, isAscOrder{std::move(isAscOrder)},
payloadTableSchema{std::move(payloadTableSchema)}, keyInPayloadPos{
std::move(keyInPayloadPos)} {}
OrderByDataInfo(const OrderByDataInfo& other)
: keysPos{other.keysPos},
payloadsPos{other.payloadsPos}, keyTypes{common::LogicalType::copy(other.keyTypes)},
payloadTypes{common::LogicalType::copy(other.payloadTypes)}, isAscOrder{other.isAscOrder},
payloadTableSchema{other.payloadTableSchema->copy()}, keyInPayloadPos{
other.keyInPayloadPos} {}

std::unique_ptr<OrderByDataInfo> copy() const {
return std::make_unique<OrderByDataInfo>(*this);
}
};

} // namespace processor
} // namespace kuzu
26 changes: 2 additions & 24 deletions src/include/processor/operator/order_by/order_by_key_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "common/exception.h"
#include "common/utils.h"
#include "common/vector/value_vector.h"
#include "order_by_data_info.h"
#include "processor/result/factorized_table.h"

namespace kuzu {
Expand All @@ -28,29 +29,6 @@ namespace processor {

#define BSWAP16(x) ((uint16_t)((((uint16_t)(x)&0xff00) >> 8) | (((uint16_t)(x)&0x00ff) << 8)))

struct OrderByDataInfo {
public:
OrderByDataInfo(std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType,
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType,
std::vector<bool> isPayloadFlat, std::vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
std::vector<std::pair<DataPos, common::LogicalType>> keysPosAndType;
std::vector<std::pair<DataPos, common::LogicalType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
std::vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
};

// The OrderByKeyEncoder encodes all columns in the ORDER BY clause into a single binary sequence
// that, when compared using memcmp will yield the correct overall sorting order. On little-endian
// hardware, the least-significant byte is stored at the smallest address. To encode the sorting
Expand Down Expand Up @@ -108,7 +86,7 @@ class OrderByKeyEncoder {

static uint32_t getEncodingSize(const common::LogicalType& dataType);

void encodeKeys(std::vector<common::ValueVector*> orderByKeys);
void encodeKeys(const std::vector<common::ValueVector*>& orderByKeys);

inline void clear() { keyBlocks.clear(); }

Expand Down
31 changes: 11 additions & 20 deletions src/include/processor/operator/order_by/sort_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@
namespace kuzu {
namespace processor {

struct LocalPayloadTableInfo {
uint64_t globalIdx;
FactorizedTable* payloadTable;
};

class SortSharedState {
public:
SortSharedState()
: nextFactorizedTableIdx{0},
sortedKeyBlocks{std::make_shared<std::queue<std::shared_ptr<MergedKeyBlocks>>>()} {};
SortSharedState() : nextTableIdx{0}, numBytesPerTuple{0} {
sortedKeyBlocks = std::make_unique<std::queue<std::shared_ptr<MergedKeyBlocks>>>();
}

inline uint64_t getNumBytesPerTuple() const { return numBytesPerTuple; }

Expand All @@ -29,7 +24,8 @@ class SortSharedState {

void init(const OrderByDataInfo& orderByDataInfo);

LocalPayloadTableInfo getLocalPayloadTable(storage::MemoryManager& memoryManager);
std::pair<uint64_t, FactorizedTable*> getLocalPayloadTable(
storage::MemoryManager& memoryManager, const FactorizedTableSchema& payloadTableSchema);

void appendLocalSortedKeyBlock(std::shared_ptr<MergedKeyBlocks> mergedDataBlocks);

Expand All @@ -41,35 +37,30 @@ class SortSharedState {
return sortedKeyBlocks->empty() ? nullptr : sortedKeyBlocks->front().get();
}

private:
void calculatePayloadSchema(const kuzu::processor::OrderByDataInfo& orderByDataInfo);

private:
std::mutex mtx;
std::vector<std::unique_ptr<FactorizedTable>> payloadTables;
uint8_t nextFactorizedTableIdx;
std::shared_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;
uint8_t nextTableIdx;
std::unique_ptr<std::queue<std::shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;
uint32_t numBytesPerTuple;
std::vector<StrKeyColInfo> strKeyColsInfo;

private:
std::unique_ptr<FactorizedTableSchema> payloadSchema;
};

class SortLocalState {
public:
void init(const OrderByDataInfo& orderByDataInfo, SortSharedState& sharedState,
storage::MemoryManager* memoryManager);

void append(std::vector<common::ValueVector*> keyVectors,
std::vector<common::ValueVector*> payloadVectors);
void append(const std::vector<common::ValueVector*>& keyVectors,
const std::vector<common::ValueVector*>& payloadVectors);

void finalize(SortSharedState& sharedState);

private:
std::unique_ptr<OrderByKeyEncoder> orderByKeyEncoder;
std::unique_ptr<RadixSort> radixSorter;
LocalPayloadTableInfo localPayloadTableInfo;
uint64_t globalIdx;
FactorizedTable* payloadTable;
};

class PayloadScanner {
Expand Down
Loading

0 comments on commit e1cfe67

Please sign in to comment.