diff --git a/src/include/processor/operator/order_by/order_by.h b/src/include/processor/operator/order_by/order_by.h index d26aa9038ce..22f7cc8c5ac 100644 --- a/src/include/processor/operator/order_by/order_by.h +++ b/src/include/processor/operator/order_by/order_by.h @@ -12,13 +12,11 @@ namespace processor { class OrderBy : public Sink { public: OrderBy(std::unique_ptr resultSetDescriptor, - const OrderByDataInfo& orderByDataInfo, std::unique_ptr localState, - std::shared_ptr sharedState, std::unique_ptr child, - uint32_t id, const std::string& paramsString) + std::unique_ptr info, std::shared_ptr sharedState, + std::unique_ptr child, uint32_t id, const std::string& paramsString) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::ORDER_BY, std::move(child), id, paramsString}, - orderByDataInfo{orderByDataInfo}, localState{std::move(localState)}, - sharedState{std::move(sharedState)} {} + info{std::move(info)}, sharedState{std::move(sharedState)} {} void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final; @@ -33,16 +31,15 @@ class OrderBy : public Sink { } std::unique_ptr clone() override { - return std::make_unique(resultSetDescriptor->copy(), orderByDataInfo, - std::make_unique(), sharedState, children[0]->clone(), id, - paramsString); + return std::make_unique(resultSetDescriptor->copy(), info->copy(), sharedState, + children[0]->clone(), id, paramsString); } private: void initGlobalStateInternal(ExecutionContext* context) final; private: - OrderByDataInfo orderByDataInfo; + std::unique_ptr info; std::unique_ptr localState; std::shared_ptr sharedState; std::vector orderByVectors; diff --git a/src/include/processor/operator/order_by/order_by_data_info.h b/src/include/processor/operator/order_by/order_by_data_info.h new file mode 100644 index 00000000000..fcd7aee190b --- /dev/null +++ b/src/include/processor/operator/order_by/order_by_data_info.h @@ -0,0 +1,42 @@ +#pragma once + +#include "common/types/types_include.h" +#include "processor/data_pos.h" +#include "processor/result/factorized_table.h" + +namespace kuzu { +namespace processor { + +struct OrderByDataInfo { + std::vector keysPos; + std::vector payloadsPos; + std::vector> keyTypes; + std::vector> payloadTypes; + std::vector isAscOrder; + std::unique_ptr payloadTableSchema; + std::vector keyInPayloadPos; + + OrderByDataInfo(std::vector keysPos, std::vector payloadsPos, + std::vector> keyTypes, + std::vector> payloadTypes, + std::vector isAscOrder, std::unique_ptr payloadTableSchema, + std::vector keyInPayloadPos) + : keysPos{std::move(keysPos)}, payloadsPos{std::move(payloadsPos)}, keyTypes{std::move( + keyTypes)}, + payloadTypes{std::move(payloadTypes)}, isAscOrder{std::move(isAscOrder)}, + payloadTableSchema{std::move(payloadTableSchema)}, keyInPayloadPos{ + std::move(keyInPayloadPos)} {} + OrderByDataInfo(const OrderByDataInfo& other) + : keysPos{other.keysPos}, + payloadsPos{other.payloadsPos}, keyTypes{common::LogicalType::copy(other.keyTypes)}, + payloadTypes{common::LogicalType::copy(other.payloadTypes)}, isAscOrder{other.isAscOrder}, + payloadTableSchema{other.payloadTableSchema->copy()}, keyInPayloadPos{ + other.keyInPayloadPos} {} + + std::unique_ptr copy() const { + return std::make_unique(*this); + } +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/order_by/order_by_key_encoder.h b/src/include/processor/operator/order_by/order_by_key_encoder.h index 491aa8f36cf..c7fd81487cd 100644 --- a/src/include/processor/operator/order_by/order_by_key_encoder.h +++ b/src/include/processor/operator/order_by/order_by_key_encoder.h @@ -7,6 +7,7 @@ #include "common/exception.h" #include "common/utils.h" #include "common/vector/value_vector.h" +#include "order_by_data_info.h" #include "processor/result/factorized_table.h" namespace kuzu { @@ -28,29 +29,6 @@ namespace processor { #define BSWAP16(x) ((uint16_t)((((uint16_t)(x)&0xff00) >> 8) | (((uint16_t)(x)&0x00ff) << 8))) -struct OrderByDataInfo { -public: - OrderByDataInfo(std::vector> keysPosAndType, - std::vector> payloadsPosAndType, - std::vector isPayloadFlat, std::vector isAscOrder, bool mayContainUnflatKey) - : keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move( - payloadsPosAndType)}, - isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)}, - mayContainUnflatKey{mayContainUnflatKey} {} - - OrderByDataInfo(const OrderByDataInfo& other) - : OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat, - other.isAscOrder, other.mayContainUnflatKey} {} - -public: - std::vector> keysPosAndType; - std::vector> payloadsPosAndType; - std::vector isPayloadFlat; - std::vector isAscOrder; - // TODO(Ziyi): We should figure out unflat keys in a more general way. - bool mayContainUnflatKey; -}; - // The OrderByKeyEncoder encodes all columns in the ORDER BY clause into a single binary sequence // that, when compared using memcmp will yield the correct overall sorting order. On little-endian // hardware, the least-significant byte is stored at the smallest address. To encode the sorting @@ -108,7 +86,7 @@ class OrderByKeyEncoder { static uint32_t getEncodingSize(const common::LogicalType& dataType); - void encodeKeys(std::vector orderByKeys); + void encodeKeys(const std::vector& orderByKeys); inline void clear() { keyBlocks.clear(); } diff --git a/src/include/processor/operator/order_by/sort_state.h b/src/include/processor/operator/order_by/sort_state.h index 3694d3c94ee..32fc8894d5a 100644 --- a/src/include/processor/operator/order_by/sort_state.h +++ b/src/include/processor/operator/order_by/sort_state.h @@ -8,16 +8,11 @@ namespace kuzu { namespace processor { -struct LocalPayloadTableInfo { - uint64_t globalIdx; - FactorizedTable* payloadTable; -}; - class SortSharedState { public: - SortSharedState() - : nextFactorizedTableIdx{0}, - sortedKeyBlocks{std::make_shared>>()} {}; + SortSharedState() : nextTableIdx{0}, numBytesPerTuple{0} { + sortedKeyBlocks = std::make_shared>>(); + } inline uint64_t getNumBytesPerTuple() const { return numBytesPerTuple; } @@ -29,7 +24,8 @@ class SortSharedState { void init(const OrderByDataInfo& orderByDataInfo); - LocalPayloadTableInfo getLocalPayloadTable(storage::MemoryManager& memoryManager); + std::pair getLocalPayloadTable( + storage::MemoryManager& memoryManager, const FactorizedTableSchema& payloadTableSchema); void appendLocalSortedKeyBlock(std::shared_ptr mergedDataBlocks); @@ -41,19 +37,13 @@ class SortSharedState { return sortedKeyBlocks->empty() ? nullptr : sortedKeyBlocks->front().get(); } -private: - void calculatePayloadSchema(const kuzu::processor::OrderByDataInfo& orderByDataInfo); - private: std::mutex mtx; std::vector> payloadTables; - uint8_t nextFactorizedTableIdx; + uint8_t nextTableIdx; std::shared_ptr>> sortedKeyBlocks; uint32_t numBytesPerTuple; std::vector strKeyColsInfo; - -private: - std::unique_ptr payloadSchema; }; class SortLocalState { @@ -61,15 +51,16 @@ class SortLocalState { void init(const OrderByDataInfo& orderByDataInfo, SortSharedState& sharedState, storage::MemoryManager* memoryManager); - void append(std::vector keyVectors, - std::vector payloadVectors); + void append(const std::vector& keyVectors, + const std::vector& payloadVectors); void finalize(SortSharedState& sharedState); private: std::unique_ptr orderByKeyEncoder; std::unique_ptr radixSorter; - LocalPayloadTableInfo localPayloadTableInfo; + uint64_t globalIdx; + FactorizedTable* payloadTable; }; class PayloadScanner { diff --git a/src/include/processor/operator/order_by/top_k.h b/src/include/processor/operator/order_by/top_k.h index 98263d9b8d8..3623c2537ca 100644 --- a/src/include/processor/operator/order_by/top_k.h +++ b/src/include/processor/operator/order_by/top_k.h @@ -9,26 +9,14 @@ namespace kuzu { namespace processor { -struct TopKScanState { - // TODO(Xiyang): Move the initialization of payloadScanner to mapper. - inline void init(MergedKeyBlocks* keyBlockToScan, std::vector payloadTables, - uint64_t skipNum, uint64_t limitNum) { - payloadScanner = std::make_unique( - keyBlockToScan, std::move(payloadTables), skipNum, limitNum); - } - - std::unique_ptr payloadScanner; -}; - class TopKSortState { - public: TopKSortState(); void init(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager); - void append(std::vector keyVectors, - std::vector payloadVectors); + void append(const std::vector& keyVectors, + const std::vector& payloadVectors); void finalize(); @@ -36,8 +24,8 @@ class TopKSortState { inline SortSharedState* getSharedState() { return orderBySharedState.get(); } - inline void initScan(TopKScanState& scanState, uint64_t skip, uint64_t limit) { - scanState.init(orderBySharedState->getMergedKeyBlock(), + std::unique_ptr getScanner(uint64_t skip, uint64_t limit) { + return std::make_unique(orderBySharedState->getMergedKeyBlock(), orderBySharedState->getPayloadTables(), skip, limit); } @@ -45,8 +33,8 @@ class TopKSortState { std::unique_ptr orderByLocalState; std::unique_ptr orderBySharedState; - uint64_t numTuples; - storage::MemoryManager* memoryManager; + uint64_t numTuples = 0; + storage::MemoryManager* memoryManager = nullptr; }; class TopKBuffer { @@ -54,13 +42,14 @@ class TopKBuffer { std::function; public: - TopKBuffer() { sortState = std::make_unique(); } + TopKBuffer(const OrderByDataInfo& orderByDataInfo) : orderByDataInfo{orderByDataInfo} { + sortState = std::make_unique(); + } - void init(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager, - uint64_t skipNumber, uint64_t limitNumber); + void init(storage::MemoryManager* memoryManager, uint64_t skipNumber, uint64_t limitNumber); - void append(std::vector keyVectors, - std::vector payloadVectors); + void append(const std::vector& keyVectors, + const std::vector& payloadVectors); void reduce(); @@ -68,13 +57,13 @@ class TopKBuffer { void merge(TopKBuffer* other); - inline void initScan(TopKScanState& scanState) { sortState->initScan(scanState, skip, limit); } + inline std::unique_ptr getScanner() { + return sortState->getScanner(skip, limit); + } private: void initVectors(); - uint64_t findKeyVectorPosInPayload(const DataPos& keyPos); - template void getSelectComparisonFunction( common::PhysicalTypeID typeID, vector_select_comparison_func& selectFunc); @@ -83,22 +72,22 @@ class TopKBuffer { void setBoundaryValue(); - bool compareBoundaryValue(std::vector& keyVectors); + bool compareBoundaryValue(const std::vector& keyVectors); - bool compareFlatKeys( - common::vector_idx_t vectorIdxToCompare, std::vector keyVectors); + bool compareFlatKeys(common::vector_idx_t vectorIdxToCompare, + const std::vector keyVectors); - void compareUnflatKeys( - common::vector_idx_t vectorIdxToCompare, std::vector keyVectors); + void compareUnflatKeys(common::vector_idx_t vectorIdxToCompare, + const std::vector keyVectors); static void appendSelState( common::SelectionVector* selVector, common::SelectionVector* selVectorToAppend); public: + const OrderByDataInfo& orderByDataInfo; std::unique_ptr sortState; uint64_t skip; uint64_t limit; - const OrderByDataInfo* orderByDataInfo; storage::MemoryManager* memoryManager; std::vector compareFuncs; std::vector equalsFuncs; @@ -117,29 +106,23 @@ class TopKBuffer { class TopKLocalState { public: - TopKLocalState() { buffer = std::make_unique(); } - void init(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager, ResultSet& resultSet, uint64_t skipNumber, uint64_t limitNumber); - void append(); + void append(const std::vector& keyVectors, + const std::vector& payloadVectors); inline void finalize() { buffer->finalize(); } std::unique_ptr buffer; - -private: - std::vector orderByVectors; - std::vector payloadVectors; }; class TopKSharedState { public: - TopKSharedState() { buffer = std::make_unique(); } - void init(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager, uint64_t skipNumber, uint64_t limitNumber) { - buffer->init(orderByDataInfo, memoryManager, skipNumber, limitNumber); + buffer = std::make_unique(orderByDataInfo); + buffer->init(memoryManager, skipNumber, limitNumber); } void mergeLocalState(TopKLocalState* localState) { @@ -147,7 +130,7 @@ class TopKSharedState { buffer->merge(localState->buffer.get()); } - void finalize() { buffer->finalize(); } + inline void finalize() { buffer->finalize(); } std::unique_ptr buffer; @@ -158,22 +141,27 @@ class TopKSharedState { class TopK : public Sink { public: TopK(std::unique_ptr resultSetDescriptor, - std::unique_ptr localState, std::shared_ptr sharedState, - OrderByDataInfo orderByDataInfo, uint64_t skipNumber, uint64_t limitNumber, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + std::unique_ptr info, std::shared_ptr sharedState, + uint64_t skipNumber, uint64_t limitNumber, std::unique_ptr child, + uint32_t id, const std::string& paramsString) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::TOP_K, std::move(child), id, paramsString}, - localState{std::move(localState)}, sharedState{std::move(sharedState)}, - orderByDataInfo{std::move(orderByDataInfo)}, skipNumber{skipNumber}, limitNumber{ - limitNumber} {} + info(std::move(info)), sharedState{std::move(sharedState)}, skipNumber{skipNumber}, + limitNumber{limitNumber} {} inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final { - localState->init( - orderByDataInfo, context->memoryManager, *resultSet, skipNumber, limitNumber); + localState = std::make_unique(); + localState->init(*info, context->memoryManager, *resultSet, skipNumber, limitNumber); + for (auto& dataPos : info->payloadsPos) { + payloadVectors.push_back(resultSet->getValueVector(dataPos).get()); + } + for (auto& dataPos : info->keysPos) { + orderByVectors.push_back(resultSet->getValueVector(dataPos).get()); + } } inline void initGlobalStateInternal(ExecutionContext* context) final { - sharedState->init(orderByDataInfo, context->memoryManager, skipNumber, limitNumber); + sharedState->init(*info, context->memoryManager, skipNumber, limitNumber); } void executeInternal(ExecutionContext* context) final; @@ -181,17 +169,18 @@ class TopK : public Sink { void finalize(ExecutionContext* context) final { sharedState->finalize(); } std::unique_ptr clone() final { - return std::make_unique(resultSetDescriptor->copy(), - std::make_unique(), sharedState, orderByDataInfo, skipNumber, - limitNumber, children[0]->clone(), id, paramsString); + return std::make_unique(resultSetDescriptor->copy(), info->copy(), sharedState, + skipNumber, limitNumber, children[0]->clone(), id, paramsString); } private: + std::unique_ptr info; std::unique_ptr localState; std::shared_ptr sharedState; - OrderByDataInfo orderByDataInfo; uint64_t skipNumber; uint64_t limitNumber; + std::vector orderByVectors; + std::vector payloadVectors; }; } // namespace processor diff --git a/src/include/processor/operator/order_by/top_k_scanner.h b/src/include/processor/operator/order_by/top_k_scanner.h index e6b0840dd12..9e9f874d479 100644 --- a/src/include/processor/operator/order_by/top_k_scanner.h +++ b/src/include/processor/operator/order_by/top_k_scanner.h @@ -7,12 +7,12 @@ namespace processor { struct TopKLocalScanState { std::vector vectorsToScan; - std::unique_ptr scanState; + std::unique_ptr payloadScanner; void init( std::vector& outVectorPos, TopKSharedState& sharedState, ResultSet& resultSet); - inline uint64_t scan() { return scanState->payloadScanner->scan(vectorsToScan); } + inline uint64_t scan() { return payloadScanner->scan(vectorsToScan); } }; class TopKScan : public PhysicalOperator { diff --git a/src/processor/map/map_order_by.cpp b/src/processor/map/map_order_by.cpp index 58cc5f04a02..81fb89f91c2 100644 --- a/src/processor/map/map_order_by.cpp +++ b/src/processor/map/map_order_by.cpp @@ -18,45 +18,64 @@ std::unique_ptr PlanMapper::mapOrderBy(LogicalOperator* logica auto inSchema = logicalOrderBy->getChild(0)->getSchema(); auto prevOperator = mapOperator(logicalOrderBy->getChild(0).get()); auto paramsString = logicalOrderBy->getExpressionsForPrinting(); - std::vector> keysPosAndType; - for (auto& expression : logicalOrderBy->getExpressionsToOrderBy()) { - keysPosAndType.emplace_back(inSchema->getExpressionPos(*expression), expression->dataType); + auto keyExpressions = logicalOrderBy->getExpressionsToOrderBy(); + + auto payloadExpressions = inSchema->getExpressionsInScope(); + std::vector payloadsPos; + std::vector> payloadTypes; + binder::expression_map payloadToColIdx; + auto payloadSchema = std::make_unique(); + auto mayContainUnFlatKey = inSchema->getNumGroups() == 1; + for (auto i = 0u; i < payloadExpressions.size(); ++i) { + auto expression = payloadExpressions[i]; + auto [dataChunkPos, vectorPos] = inSchema->getExpressionPos(*expression); + payloadsPos.emplace_back(dataChunkPos, vectorPos); + payloadTypes.push_back(expression->dataType.copy()); + std::unique_ptr columnSchema; + if (!inSchema->getGroup(dataChunkPos)->isFlat() && !mayContainUnFlatKey) { + // payload is unFlat and not in the same group as keys + columnSchema = std::make_unique( + true /* isUnFlat */, dataChunkPos, sizeof(overflow_value_t)); + } else { + columnSchema = std::make_unique(false /* isUnFlat */, dataChunkPos, + LogicalTypeUtils::getRowLayoutSize(expression->getDataType())); + } + payloadSchema->appendColumn(std::move(columnSchema)); + payloadToColIdx.insert({expression, i}); } - std::vector> payloadsPosAndType; - std::vector isPayloadFlat; - std::vector outVectorPos; - for (auto& expression : inSchema->getExpressionsInScope()) { - auto expressionName = expression->getUniqueName(); - payloadsPosAndType.emplace_back( - inSchema->getExpressionPos(*expression), expression->dataType); - isPayloadFlat.push_back(inSchema->getGroup(expressionName)->isFlat()); - outVectorPos.emplace_back(outSchema->getExpressionPos(*expression)); + std::vector keysPos; + std::vector> keyTypes; + std::vector keyInPayloadPos; + for (auto& expression : keyExpressions) { + keysPos.emplace_back(inSchema->getExpressionPos(*expression)); + keyTypes.push_back(expression->getDataType().copy()); + assert(payloadToColIdx.contains(expression)); + keyInPayloadPos.push_back(payloadToColIdx.at(expression)); } - // See comment in planOrderBy in projectionPlanner.cpp - auto mayContainUnflatKey = inSchema->getNumGroups() == 1; - auto orderByDataInfo = OrderByDataInfo(keysPosAndType, payloadsPosAndType, isPayloadFlat, - logicalOrderBy->getIsAscOrders(), mayContainUnflatKey); - + std::vector outPos; + for (auto& expression : payloadExpressions) { + outPos.emplace_back(outSchema->getExpressionPos(*expression)); + } + auto orderByDataInfo = std::make_unique(keysPos, payloadsPos, + LogicalType::copy(keyTypes), LogicalType::copy(payloadTypes), + logicalOrderBy->getIsAscOrders(), std::move(payloadSchema), std::move(keyInPayloadPos)); if (logicalOrderBy->isTopK()) { auto topKSharedState = std::make_shared(); auto topK = make_unique(std::make_unique(inSchema), - std::make_unique(), topKSharedState, orderByDataInfo, - logicalOrderBy->getSkipNum(), logicalOrderBy->getLimitNum(), std::move(prevOperator), - getOperatorID(), paramsString); - auto topKScan = make_unique( - outVectorPos, topKSharedState, std::move(topK), getOperatorID(), paramsString); - return topKScan; + std::move(orderByDataInfo), topKSharedState, logicalOrderBy->getSkipNum(), + logicalOrderBy->getLimitNum(), std::move(prevOperator), getOperatorID(), paramsString); + return make_unique( + outPos, topKSharedState, std::move(topK), getOperatorID(), paramsString); } else { auto orderBySharedState = std::make_shared(); auto orderBy = make_unique(std::make_unique(inSchema), - orderByDataInfo, std::make_unique(), orderBySharedState, - std::move(prevOperator), getOperatorID(), paramsString); + std::move(orderByDataInfo), orderBySharedState, std::move(prevOperator), + getOperatorID(), paramsString); auto dispatcher = std::make_shared(); auto orderByMerge = make_unique(orderBySharedState, std::move(dispatcher), std::move(orderBy), getOperatorID(), paramsString); - auto orderByScan = make_unique(outVectorPos, orderBySharedState, - std::move(orderByMerge), getOperatorID(), paramsString); - return orderByScan; + return make_unique( + outPos, orderBySharedState, std::move(orderByMerge), getOperatorID(), paramsString); } } diff --git a/src/processor/operator/order_by/order_by.cpp b/src/processor/operator/order_by/order_by.cpp index 573633f7488..2e0ce296cd6 100644 --- a/src/processor/operator/order_by/order_by.cpp +++ b/src/processor/operator/order_by/order_by.cpp @@ -6,17 +6,18 @@ namespace kuzu { namespace processor { void OrderBy::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - localState->init(orderByDataInfo, *sharedState, context->memoryManager); - for (auto [dataPos, _] : orderByDataInfo.payloadsPosAndType) { + localState = std::make_unique(); + localState->init(*info, *sharedState, context->memoryManager); + for (auto& dataPos : info->payloadsPos) { payloadVectors.push_back(resultSet->getValueVector(dataPos).get()); } - for (auto [dataPos, _] : orderByDataInfo.keysPosAndType) { + for (auto& dataPos : info->keysPos) { orderByVectors.push_back(resultSet->getValueVector(dataPos).get()); } } void OrderBy::initGlobalStateInternal(ExecutionContext* context) { - sharedState->init(orderByDataInfo); + sharedState->init(*info); } void OrderBy::executeInternal(ExecutionContext* context) { diff --git a/src/processor/operator/order_by/order_by_key_encoder.cpp b/src/processor/operator/order_by/order_by_key_encoder.cpp index 6b41ebf864e..58874804045 100644 --- a/src/processor/operator/order_by/order_by_key_encoder.cpp +++ b/src/processor/operator/order_by/order_by_key_encoder.cpp @@ -29,15 +29,15 @@ OrderByKeyEncoder::OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo, "TupleSize({} bytes) is larger than the LARGE_PAGE_SIZE({} bytes)", numBytesPerTuple, BufferPoolConstants::PAGE_256KB_SIZE)); } - encodeFunctions.reserve(orderByDataInfo.keysPosAndType.size()); - for (auto& [_, type] : orderByDataInfo.keysPosAndType) { + encodeFunctions.reserve(orderByDataInfo.keysPos.size()); + for (auto& type : orderByDataInfo.keyTypes) { encode_function_t encodeFunction; - getEncodingFunction(type.getPhysicalType(), encodeFunction); + getEncodingFunction(type->getPhysicalType(), encodeFunction); encodeFunctions.push_back(std::move(encodeFunction)); } } -void OrderByKeyEncoder::encodeKeys(std::vector orderByKeys) { +void OrderByKeyEncoder::encodeKeys(const std::vector& orderByKeys) { uint32_t numEntries = orderByKeys[0]->state->selVector->selectedSize; uint32_t encodedTuples = 0; while (numEntries > 0) { diff --git a/src/processor/operator/order_by/sort_state.cpp b/src/processor/operator/order_by/sort_state.cpp index 5e1999cae92..b68d9e31e93 100644 --- a/src/processor/operator/order_by/sort_state.cpp +++ b/src/processor/operator/order_by/sort_state.cpp @@ -6,34 +6,29 @@ namespace kuzu { namespace processor { void SortSharedState::init(const OrderByDataInfo& orderByDataInfo) { - calculatePayloadSchema(orderByDataInfo); auto encodedKeyBlockColOffset = 0ul; - for (auto i = 0u; i < orderByDataInfo.keysPosAndType.size(); ++i) { - auto& [dataPos, dataType] = orderByDataInfo.keysPosAndType[i]; - if (PhysicalTypeID::STRING == dataType.getPhysicalType()) { + for (auto i = 0u; i < orderByDataInfo.keysPos.size(); ++i) { + auto dataType = orderByDataInfo.keyTypes[i].get(); + if (PhysicalTypeID::STRING == dataType->getPhysicalType()) { // If this is a string column, we need to find the factorizedTable offset for this // column. - auto ftColIdx = 0ul; - for (auto j = 0u; j < orderByDataInfo.payloadsPosAndType.size(); j++) { - auto [payloadDataPos, _] = orderByDataInfo.payloadsPosAndType[j]; - if (payloadDataPos == dataPos) { - ftColIdx = j; - } - } - strKeyColsInfo.emplace_back(payloadSchema->getColOffset(ftColIdx), + auto ftColIdx = orderByDataInfo.keyInPayloadPos[i]; + strKeyColsInfo.emplace_back(orderByDataInfo.payloadTableSchema->getColOffset(ftColIdx), encodedKeyBlockColOffset, orderByDataInfo.isAscOrder[i]); } - encodedKeyBlockColOffset += OrderByKeyEncoder::getEncodingSize(dataType); + encodedKeyBlockColOffset += OrderByKeyEncoder::getEncodingSize(*dataType); } numBytesPerTuple = encodedKeyBlockColOffset + OrderByConstants::NUM_BYTES_FOR_PAYLOAD_IDX; } -LocalPayloadTableInfo SortSharedState::getLocalPayloadTable(storage::MemoryManager& memoryManager) { +std::pair SortSharedState::getLocalPayloadTable( + storage::MemoryManager& memoryManager, const FactorizedTableSchema& payloadTableSchema) { std::unique_lock lck{mtx}; - auto payloadTable = std::make_unique(&memoryManager, payloadSchema->copy()); - auto payloadTableInfo = LocalPayloadTableInfo{nextFactorizedTableIdx++, payloadTable.get()}; + auto payloadTable = + std::make_unique(&memoryManager, payloadTableSchema.copy()); + auto result = std::make_pair(nextTableIdx++, payloadTable.get()); payloadTables.push_back(std::move(payloadTable)); - return payloadTableInfo; + return result; } void SortSharedState::appendLocalSortedKeyBlock(std::shared_ptr mergedDataBlocks) { @@ -56,36 +51,22 @@ std::vector SortSharedState::getPayloadTables() const { return payloadTablesToReturn; } -void SortSharedState::calculatePayloadSchema( - const kuzu::processor::OrderByDataInfo& orderByDataInfo) { - // The orderByKeyEncoder requires that the orderByKey columns are flat in the - // factorizedTable. If there is only one unflat dataChunk, we need to flatten the payload - // columns in factorizedTable because the payload and key columns are in the same - // dataChunk. - payloadSchema = std::make_unique(); - for (auto i = 0u; i < orderByDataInfo.payloadsPosAndType.size(); ++i) { - auto [dataPos, dataType] = orderByDataInfo.payloadsPosAndType[i]; - bool isUnflat = !orderByDataInfo.isPayloadFlat[i] && !orderByDataInfo.mayContainUnflatKey; - payloadSchema->appendColumn(std::make_unique(isUnflat, dataPos.dataChunkPos, - isUnflat ? (uint32_t)sizeof(overflow_value_t) : - LogicalTypeUtils::getRowLayoutSize(dataType))); - } -} - void SortLocalState::init(const OrderByDataInfo& orderByDataInfo, SortSharedState& sharedState, storage::MemoryManager* memoryManager) { - localPayloadTableInfo = sharedState.getLocalPayloadTable(*memoryManager); + auto [idx, table] = + sharedState.getLocalPayloadTable(*memoryManager, *orderByDataInfo.payloadTableSchema); + globalIdx = idx; + payloadTable = table; orderByKeyEncoder = std::make_unique(orderByDataInfo, memoryManager, - localPayloadTableInfo.globalIdx, localPayloadTableInfo.payloadTable->getNumTuplesPerBlock(), - sharedState.getNumBytesPerTuple()); - radixSorter = std::make_unique(memoryManager, *localPayloadTableInfo.payloadTable, - *orderByKeyEncoder, sharedState.getStrKeyColInfo()); + globalIdx, payloadTable->getNumTuplesPerBlock(), sharedState.getNumBytesPerTuple()); + radixSorter = std::make_unique( + memoryManager, *payloadTable, *orderByKeyEncoder, sharedState.getStrKeyColInfo()); } -void SortLocalState::append(std::vector keyVectors, - std::vector payloadVectors) { - orderByKeyEncoder->encodeKeys(std::move(keyVectors)); - localPayloadTableInfo.payloadTable->append(std::move(payloadVectors)); +void SortLocalState::append(const std::vector& keyVectors, + const std::vector& payloadVectors) { + orderByKeyEncoder->encodeKeys(keyVectors); + payloadTable->append(payloadVectors); } void SortLocalState::finalize(kuzu::processor::SortSharedState& sharedState) { diff --git a/src/processor/operator/order_by/top_k.cpp b/src/processor/operator/order_by/top_k.cpp index 756406cab22..36ca7001bb6 100644 --- a/src/processor/operator/order_by/top_k.cpp +++ b/src/processor/operator/order_by/top_k.cpp @@ -5,7 +5,7 @@ using namespace kuzu::common; namespace kuzu { namespace processor { -TopKSortState::TopKSortState() { +TopKSortState::TopKSortState() : numTuples{0}, memoryManager{nullptr} { orderByLocalState = std::make_unique(); orderBySharedState = std::make_unique(); } @@ -18,10 +18,10 @@ void TopKSortState::init( numTuples = 0; } -void TopKSortState::append(std::vector keyVectors, - std::vector payloadVectors) { +void TopKSortState::append(const std::vector& keyVectors, + const std::vector& payloadVectors) { numTuples += keyVectors[0]->state->selVector->selectedSize; - orderByLocalState->append(std::move(keyVectors), std::move(payloadVectors)); + orderByLocalState->append(keyVectors, payloadVectors); } void TopKSortState::finalize() { @@ -39,9 +39,8 @@ void TopKSortState::finalize() { } } -void TopKBuffer::init(const kuzu::processor::OrderByDataInfo& orderByDataInfo, +void TopKBuffer::init( storage::MemoryManager* memoryManager, uint64_t skipNumber, uint64_t limitNumber) { - this->orderByDataInfo = &orderByDataInfo; this->memoryManager = memoryManager; sortState->init(orderByDataInfo, memoryManager); this->skip = skipNumber; @@ -50,8 +49,8 @@ void TopKBuffer::init(const kuzu::processor::OrderByDataInfo& orderByDataInfo, initCompareFuncs(); } -void TopKBuffer::append(std::vector keyVectors, - std::vector payloadVectors) { +void TopKBuffer::append(const std::vector& keyVectors, + const std::vector& payloadVectors) { auto originalSelState = keyVectors[0]->state->selVector; if (hasBoundaryValue && !compareBoundaryValue(keyVectors)) { keyVectors[0]->state->selVector = std::move(originalSelState); @@ -69,11 +68,10 @@ void TopKBuffer::reduce() { } sortState->finalize(); auto newSortState = std::make_unique(); - newSortState->init(*orderByDataInfo, memoryManager); - TopKScanState scanState; - sortState->initScan(scanState, 0, skip + limit); + newSortState->init(orderByDataInfo, memoryManager); + auto scanner = sortState->getScanner(0, skip + limit); while (true) { - auto numTuplesScanned = scanState.payloadScanner->scan(payloadVecsToScan); + auto numTuplesScanned = scanner->scan(payloadVecsToScan); if (numTuplesScanned == 0) { setBoundaryValue(); break; @@ -90,9 +88,8 @@ void TopKBuffer::merge(TopKBuffer* other) { if (other->sortState->getSharedState()->getSortedKeyBlocks()->empty()) { return; } - TopKScanState scanState; - other->sortState->initScan(scanState, 0, skip + limit); - while (scanState.payloadScanner->scan(payloadVecsToScan) > 0) { + auto scanner = other->sortState->getScanner(0, skip + limit); + while (scanner->scan(payloadVecsToScan) > 0) { sortState->append(keyVecsToScan, payloadVecsToScan); } reduce(); @@ -101,9 +98,9 @@ void TopKBuffer::merge(TopKBuffer* other) { void TopKBuffer::initVectors() { auto payloadState = std::make_shared(); auto lastPayloadState = std::make_shared(); - for (auto& [pos, type] : orderByDataInfo->payloadsPosAndType) { - auto payloadVec = std::make_unique(type, memoryManager); - auto lastPayloadVec = std::make_unique(type, memoryManager); + for (auto& type : orderByDataInfo.payloadTypes) { + auto payloadVec = std::make_unique(*type, memoryManager); + auto lastPayloadVec = std::make_unique(*type, memoryManager); payloadVec->setState(payloadState); lastPayloadVec->setState(lastPayloadState); payloadVecsToScan.push_back(payloadVec.get()); @@ -112,36 +109,15 @@ void TopKBuffer::initVectors() { tmpVectors.push_back(std::move(lastPayloadVec)); } auto boundaryState = common::DataChunkState::getSingleValueDataChunkState(); - for (auto& [pos, type] : orderByDataInfo->keysPosAndType) { - auto boundaryVec = std::make_unique(type, memoryManager); + for (auto i = 0u; i < orderByDataInfo.keyTypes.size(); ++i) { + auto type = orderByDataInfo.keyTypes[i].get(); + auto boundaryVec = std::make_unique(*type, memoryManager); boundaryVec->setState(boundaryState); boundaryVecs.push_back(std::move(boundaryVec)); - auto posInPayload = findKeyVectorPosInPayload(pos); - if (posInPayload == UINT64_MAX) { - // If the key is not present in the payload, create a new vector. - auto keyVec = std::make_unique(type, memoryManager); - auto lastKeyVec = std::make_unique(type, memoryManager); - keyVecsToScan.push_back(keyVec.get()); - lastKeyVecsToScan.push_back(lastKeyVec.get()); - tmpVectors.push_back(std::move(keyVec)); - tmpVectors.push_back(std::move(lastKeyVec)); - } else { - // Otherwise grab the vector from the payload. - keyVecsToScan.push_back(payloadVecsToScan[posInPayload]); - lastKeyVecsToScan.push_back(lastPayloadVecsToScan[posInPayload]); - } - } -} - -uint64_t TopKBuffer::findKeyVectorPosInPayload(const DataPos& keyPos) { - // TODO(Xiyang): this information should be passed by front end. (e.g. The key vector pos in the - // payload vector) - for (auto i = 0u; i < orderByDataInfo->payloadsPosAndType.size(); i++) { - if (keyPos == orderByDataInfo->payloadsPosAndType[i].first) { - return i; - } + auto posInPayload = orderByDataInfo.keyInPayloadPos[i]; + keyVecsToScan.push_back(payloadVecsToScan[posInPayload]); + lastKeyVecsToScan.push_back(lastPayloadVecsToScan[posInPayload]); } - return UINT64_MAX; } template @@ -180,13 +156,13 @@ void TopKBuffer::getSelectComparisonFunction( } void TopKBuffer::initCompareFuncs() { - compareFuncs.reserve(orderByDataInfo->isAscOrder.size()); - equalsFuncs.reserve(orderByDataInfo->isAscOrder.size()); + compareFuncs.reserve(orderByDataInfo.isAscOrder.size()); + equalsFuncs.reserve(orderByDataInfo.isAscOrder.size()); vector_select_comparison_func compareFunc; vector_select_comparison_func equalsFunc; - for (auto i = 0u; i < orderByDataInfo->isAscOrder.size(); i++) { - auto physicalType = orderByDataInfo->keysPosAndType[i].second.getPhysicalType(); - if (orderByDataInfo->isAscOrder[i]) { + for (auto i = 0u; i < orderByDataInfo.isAscOrder.size(); i++) { + auto physicalType = orderByDataInfo.keyTypes[i]->getPhysicalType(); + if (orderByDataInfo.isAscOrder[i]) { getSelectComparisonFunction(physicalType, compareFunc); } else { getSelectComparisonFunction(physicalType, compareFunc); @@ -213,7 +189,7 @@ void TopKBuffer::setBoundaryValue() { } } -bool TopKBuffer::compareBoundaryValue(std::vector& keyVectors) { +bool TopKBuffer::compareBoundaryValue(const std::vector& keyVectors) { if (keyVectors[0]->state->isFlat()) { return compareFlatKeys(0 /* startKeyVectorIdxToCompare */, keyVectors); } else { @@ -223,7 +199,7 @@ bool TopKBuffer::compareBoundaryValue(std::vector& keyVect } bool TopKBuffer::compareFlatKeys( - vector_idx_t vectorIdxToCompare, std::vector keyVectors) { + vector_idx_t vectorIdxToCompare, const std::vector keyVectors) { std::shared_ptr selVector = std::make_shared(common::DEFAULT_VECTOR_CAPACITY); selVector->resetSelectorToValuePosBuffer(); @@ -240,7 +216,7 @@ bool TopKBuffer::compareFlatKeys( } void TopKBuffer::compareUnflatKeys( - vector_idx_t vectorIdxToCompare, std::vector keyVectors) { + vector_idx_t vectorIdxToCompare, const std::vector keyVectors) { auto compareSelVector = std::make_shared(common::DEFAULT_VECTOR_CAPACITY); compareSelVector->resetSelectorToValuePosBuffer(); @@ -272,25 +248,20 @@ void TopKBuffer::appendSelState( void TopKLocalState::init(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager, ResultSet& resultSet, uint64_t skipNumber, uint64_t limitNumber) { - buffer->init(orderByDataInfo, memoryManager, skipNumber, limitNumber); - for (auto [dataPos, _] : orderByDataInfo.payloadsPosAndType) { - payloadVectors.push_back(resultSet.getValueVector(dataPos).get()); - } - for (auto [dataPos, _] : orderByDataInfo.keysPosAndType) { - orderByVectors.push_back(resultSet.getValueVector(dataPos).get()); - } + buffer = std::make_unique(orderByDataInfo); + buffer->init(memoryManager, skipNumber, limitNumber); } -void TopKLocalState::append() { - buffer->append(orderByVectors, payloadVectors); +void TopKLocalState::append(const std::vector& keyVectors, + const std::vector& payloadVectors) { + buffer->append(keyVectors, payloadVectors); buffer->reduce(); } void TopK::executeInternal(ExecutionContext* context) { - // Append thread-local tuples. while (children[0]->getNextTuple(context)) { for (auto i = 0u; i < resultSet->multiplicity; i++) { - localState->append(); + localState->append(orderByVectors, payloadVectors); } } localState->finalize(); diff --git a/src/processor/operator/order_by/top_k_scanner.cpp b/src/processor/operator/order_by/top_k_scanner.cpp index 1d036daa808..181fbf05d8b 100644 --- a/src/processor/operator/order_by/top_k_scanner.cpp +++ b/src/processor/operator/order_by/top_k_scanner.cpp @@ -5,11 +5,10 @@ namespace processor { void TopKLocalScanState::init( std::vector& outVectorPos, TopKSharedState& sharedState, ResultSet& resultSet) { - scanState = std::make_unique(); - sharedState.buffer->initScan(*scanState); for (auto& pos : outVectorPos) { vectorsToScan.push_back(resultSet.getValueVector(pos).get()); } + payloadScanner = sharedState.buffer->getScanner(); } void TopKScan::initLocalStateInternal(