Skip to content

Commit

Permalink
Merge pull request #1077 from kuzudb/play
Browse files Browse the repository at this point in the history
Add init global state interface
  • Loading branch information
andyfengHKU committed Nov 29, 2022
2 parents 7e9a770 + 9100668 commit 4d5d975
Show file tree
Hide file tree
Showing 35 changed files with 407 additions and 376 deletions.
40 changes: 22 additions & 18 deletions src/include/processor/operator/hash_join/hash_join_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,41 @@ namespace processor {
// task/pipeline, and probed by the HashJoinProbe operators.
class HashJoinSharedState {
public:
explicit HashJoinSharedState(vector<DataType> payloadDataTypes)
: payloadDataTypes{move(payloadDataTypes)} {}
HashJoinSharedState() = default;

virtual ~HashJoinSharedState() = default;

virtual void initEmptyHashTableIfNecessary(MemoryManager& memoryManager, uint64_t numKeyColumns,
virtual void initEmptyHashTable(MemoryManager& memoryManager, uint64_t numKeyColumns,
unique_ptr<FactorizedTableSchema> tableSchema);

void mergeLocalHashTable(JoinHashTable& localHashTable);

inline JoinHashTable* getHashTable() { return hashTable.get(); }

inline vector<DataType> getPayloadDataTypes() { return payloadDataTypes; }

protected:
mutex hashJoinSharedStateMutex;
mutex mtx;
unique_ptr<JoinHashTable> hashTable;
vector<DataType> payloadDataTypes;
};

struct BuildDataInfo {

public:
BuildDataInfo(vector<DataPos> keysDataPos, vector<DataPos> payloadsDataPos,
vector<bool> isPayloadsFlat, vector<bool> isPayloadsInKeyChunk)
: keysDataPos{std::move(keysDataPos)}, payloadsDataPos{std::move(payloadsDataPos)},
isPayloadsFlat{move(isPayloadsFlat)}, isPayloadsInKeyChunk{move(isPayloadsInKeyChunk)} {}
BuildDataInfo(vector<pair<DataPos, DataType>> keysPosAndType,
vector<pair<DataPos, DataType>> payloadsPosAndType, vector<bool> isPayloadsFlat,
vector<bool> isPayloadsInKeyChunk)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadsFlat{std::move(isPayloadsFlat)}, isPayloadsInKeyChunk{
std::move(isPayloadsInKeyChunk)} {}

BuildDataInfo(const BuildDataInfo& other)
: BuildDataInfo{other.keysDataPos, other.payloadsDataPos, other.isPayloadsFlat,
: BuildDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadsFlat,
other.isPayloadsInKeyChunk} {}

inline uint32_t getNumKeys() const { return keysPosAndType.size(); }

public:
vector<DataPos> keysDataPos;
vector<DataPos> payloadsDataPos;
vector<pair<DataPos, DataType>> keysPosAndType;
vector<pair<DataPos, DataType>> payloadsPosAndType;
vector<bool> isPayloadsFlat;
vector<bool> isPayloadsInKeyChunk;
};
Expand All @@ -63,8 +63,8 @@ class HashJoinBuild : public Sink {
public:
HashJoinBuild(shared_ptr<HashJoinSharedState> sharedState, const BuildDataInfo& buildDataInfo,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, sharedState{move(sharedState)}, buildDataInfo{
buildDataInfo} {}
: Sink{std::move(child), id, paramsString}, sharedState{std::move(sharedState)},
buildDataInfo{buildDataInfo} {}
~HashJoinBuild() override = default;

inline PhysicalOperatorType getOperatorType() override { return HASH_JOIN_BUILD; }
Expand All @@ -80,7 +80,11 @@ class HashJoinBuild : public Sink {
}

protected:
virtual void initHashTable(
// TODO(Guodong/Xiyang): construct schema in mapper.
unique_ptr<FactorizedTableSchema> populateTableSchema();
void initGlobalStateInternal(ExecutionContext* context) override;

virtual void initLocalHashTable(
MemoryManager& memoryManager, unique_ptr<FactorizedTableSchema> tableSchema);
inline void appendVectors() { hashTable->append(vectorsToAppend); }

Expand Down
12 changes: 7 additions & 5 deletions src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@ struct ProbeState {
};

struct ProbeDataInfo {

public:
ProbeDataInfo(vector<DataPos> keysDataPos, vector<DataPos> payloadsOutputDataPos)
ProbeDataInfo(
vector<DataPos> keysDataPos, vector<pair<DataPos, DataType>> payloadsOutPosAndType)
: keysDataPos{std::move(keysDataPos)},
payloadsOutputDataPos{std::move(payloadsOutputDataPos)}, markDataPos{
payloadsOutPosAndType{std::move(payloadsOutPosAndType)}, markDataPos{
UINT32_MAX, UINT32_MAX} {}

ProbeDataInfo(const ProbeDataInfo& other)
: ProbeDataInfo{other.keysDataPos, other.payloadsOutputDataPos} {
: ProbeDataInfo{other.keysDataPos, other.payloadsOutPosAndType} {
markDataPos = other.markDataPos;
}

inline uint32_t getNumPayloads() const { return payloadsOutPosAndType.size(); }

public:
vector<DataPos> keysDataPos;
vector<DataPos> payloadsOutputDataPos;
vector<pair<DataPos, DataType>> payloadsOutPosAndType;
DataPos markDataPos;
};

Expand Down
7 changes: 3 additions & 4 deletions src/include/processor/operator/intersect/intersect_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ namespace processor {

class IntersectSharedState : public HashJoinSharedState {
public:
explicit IntersectSharedState(vector<DataType> payloadDataTypes)
: HashJoinSharedState{move(payloadDataTypes)} {}
IntersectSharedState() = default;

void initEmptyHashTableIfNecessary(MemoryManager& memoryManager, uint64_t numKeyColumns,
void initEmptyHashTable(MemoryManager& memoryManager, uint64_t numKeyColumns,
unique_ptr<FactorizedTableSchema> tableSchema) override;
};

Expand All @@ -30,7 +29,7 @@ class IntersectBuild : public HashJoinBuild {
}

protected:
void initHashTable(
void initLocalHashTable(
MemoryManager& memoryManager, unique_ptr<FactorizedTableSchema> tableSchema) override;
};

Expand Down
9 changes: 3 additions & 6 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ struct KeyBlockMergeMorsel;
// This struct stores the string key column information. We can utilize the
// pre-computed indexes and offsets to expedite the tuple comparison in merge sort.
struct StrKeyColInfo {
StrKeyColInfo(
uint32_t colOffsetInFT, uint32_t colOffsetInEncodedKeyBlock, bool isAscOrder, bool isStrCol)
StrKeyColInfo(uint32_t colOffsetInFT, uint32_t colOffsetInEncodedKeyBlock, bool isAscOrder)
: colOffsetInFT{colOffsetInFT}, colOffsetInEncodedKeyBlock{colOffsetInEncodedKeyBlock},
isAscOrder{isAscOrder} {}

Expand Down Expand Up @@ -192,17 +191,15 @@ class KeyBlockMergeTaskDispatcher {
void doneMorsel(unique_ptr<KeyBlockMergeMorsel> morsel);

// This function is used to initialize the columns of keyBlockMergeTaskDispatcher based on
// sharedFactorizedTablesAndSortedKeyBlocks. If the class is already initialized, then it
// just returns.
void initIfNecessary(MemoryManager* memoryManager,
// sharedFactorizedTablesAndSortedKeyBlocks.
void init(MemoryManager* memoryManager,
shared_ptr<queue<shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks,
vector<shared_ptr<FactorizedTable>>& factorizedTables,
vector<StrKeyColInfo>& strKeyColsInfo, uint64_t numBytesPerTuple);

private:
mutex mtx;

bool isInitialized = false;
MemoryManager* memoryManager;
shared_ptr<queue<shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;
vector<shared_ptr<KeyBlockMergeTask>> activeKeyBlockMergeTasks;
Expand Down
65 changes: 32 additions & 33 deletions src/include/processor/operator/order_by/order_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ class SharedFactorizedTablesAndSortedKeyBlocks {
: nextFactorizedTableIdx{0}, sortedKeyBlocks{
make_shared<queue<shared_ptr<MergedKeyBlocks>>>()} {}

inline DataType getDataType(uint32_t idx) { return dataTypes[idx]; }

uint8_t getNextFactorizedTableIdx() {
unique_lock lck{orderBySharedStateMutex};
unique_lock lck{mtx};
return nextFactorizedTableIdx++;
}

void appendFactorizedTable(
uint8_t factorizedTableIdx, shared_ptr<FactorizedTable> factorizedTable) {
unique_lock lck{orderBySharedStateMutex};
unique_lock lck{mtx};
// If the factorizedTables is full, resize the factorizedTables and
// insert the factorizedTable to the set.
if (factorizedTableIdx >= factorizedTables.size()) {
Expand All @@ -44,18 +42,13 @@ class SharedFactorizedTablesAndSortedKeyBlocks {
}

void appendSortedKeyBlock(shared_ptr<MergedKeyBlocks> mergedDataBlocks) {
unique_lock lck{orderBySharedStateMutex};
unique_lock lck{mtx};
sortedKeyBlocks->emplace(mergedDataBlocks);
}

void setNumBytesPerTuple(uint32_t numBytesPerTuple_) {
unique_lock lck{orderBySharedStateMutex};
this->numBytesPerTuple = numBytesPerTuple_;
}

void setDataTypes(vector<DataType> dataTypes_) {
unique_lock lck{orderBySharedStateMutex};
this->dataTypes = move(dataTypes_);
void setNumBytesPerTuple(uint32_t _numBytesPerTuple) {
assert(numBytesPerTuple == UINT32_MAX);
numBytesPerTuple = _numBytesPerTuple;
}

void combineFTHasNoNullGuarantee() {
Expand All @@ -64,51 +57,53 @@ class SharedFactorizedTablesAndSortedKeyBlocks {
}
}

void setStrKeyColInfo(vector<StrKeyColInfo> strKeyColsInfo) {
unique_lock lck{orderBySharedStateMutex};
this->strKeyColsInfo = move(strKeyColsInfo);
void setStrKeyColInfo(vector<StrKeyColInfo> _strKeyColsInfo) {
assert(strKeyColsInfo.empty());
strKeyColsInfo = std::move(_strKeyColsInfo);
}

private:
mutex orderBySharedStateMutex;
mutex mtx;

public:
vector<shared_ptr<FactorizedTable>> factorizedTables;
uint8_t nextFactorizedTableIdx;
shared_ptr<queue<shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;

uint32_t numBytesPerTuple;
uint32_t numBytesPerTuple = UINT32_MAX; // encoding size
vector<StrKeyColInfo> strKeyColsInfo;
vector<DataType> dataTypes;
};

struct OrderByDataInfo {

public:
OrderByDataInfo(vector<DataPos> keyDataPoses, vector<DataPos> allDataPoses,
vector<bool> isVectorFlat, vector<bool> isAscOrder)
: keyDataPoses{move(keyDataPoses)}, allDataPoses{move(allDataPoses)},
isVectorFlat{move(isVectorFlat)}, isAscOrder{move(isAscOrder)} {}
OrderByDataInfo(vector<pair<DataPos, DataType>> keysPosAndType,
vector<pair<DataPos, DataType>> payloadsPosAndType, vector<bool> isPayloadFlat,
vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{
other.keyDataPoses, other.allDataPoses, other.isVectorFlat, other.isAscOrder} {}
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
vector<DataPos> keyDataPoses;
vector<DataPos> allDataPoses;
vector<bool> isVectorFlat;
vector<pair<DataPos, DataType>> keysPosAndType;
vector<pair<DataPos, DataType>> payloadsPosAndType;
vector<bool> isPayloadFlat;
vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
};

class OrderBy : public Sink {
public:
OrderBy(const OrderByDataInfo& orderByDataInfo,
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, orderByDataInfo{orderByDataInfo}, sharedState{move(
sharedState)} {
}
: Sink{std::move(child), id, paramsString}, orderByDataInfo{orderByDataInfo},
sharedState{std::move(sharedState)} {}

PhysicalOperatorType getOperatorType() override { return ORDER_BY; }

Expand All @@ -129,14 +124,18 @@ class OrderBy : public Sink {
orderByDataInfo, sharedState, children[0]->clone(), id, paramsString);
}

private:
unique_ptr<FactorizedTableSchema> populateTableSchema();

void initGlobalStateInternal(ExecutionContext* context) override;

private:
uint8_t factorizedTableIdx;
OrderByDataInfo orderByDataInfo;
unique_ptr<OrderByKeyEncoder> orderByKeyEncoder;
unique_ptr<RadixSort> radixSorter;
vector<shared_ptr<ValueVector>> keyVectors;
vector<shared_ptr<ValueVector>> vectorsToAppend;
vector<StrKeyColInfo> strKeyColInfo;
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
shared_ptr<FactorizedTable> localFactorizedTable;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class OrderByKeyEncoder {

public:
OrderByKeyEncoder(vector<shared_ptr<ValueVector>>& orderByVectors, vector<bool>& isAscOrder,
MemoryManager* memoryManager, uint8_t ftIdx, uint32_t numTuplesPerBlockInFT);
MemoryManager* memoryManager, uint8_t ftIdx, uint32_t numTuplesPerBlockInFT,
uint32_t numBytesPerTuple);

inline vector<shared_ptr<DataBlock>>& getKeyBlocks() { return keyBlocks; }

Expand Down Expand Up @@ -79,6 +80,8 @@ class OrderByKeyEncoder {
return *(strBuffer + 13) == (isAsc ? UINT8_MAX : 0);
}

static uint32_t getNumBytesPerTuple(const vector<shared_ptr<ValueVector>>& keyVectors);

static uint32_t getEncodingSize(const DataType& dataType);

void encodeKeys();
Expand Down
27 changes: 14 additions & 13 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ class OrderByMerge : public Sink, public SourceOperator {
// This constructor will only be called by the mapper when constructing the orderByMerge
// operator, because the mapper doesn't know the existence of keyBlockMergeTaskDispatcher
OrderByMerge(shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, SourceOperator{nullptr},
sharedFactorizedTablesAndSortedKeyBlocks{move(sharedState)},
keyBlockMergeTaskDispatcher{make_shared<KeyBlockMergeTaskDispatcher>()} {}
: Sink{std::move(child), id, paramsString}, SourceOperator{nullptr},
sharedState{std::move(sharedState)}, sharedDispatcher{std::move(sharedDispatcher)} {}

// This constructor is used for cloning only.
OrderByMerge(shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
shared_ptr<KeyBlockMergeTaskDispatcher> keyBlockMergeTaskDispatcher,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, SourceOperator{nullptr},
sharedFactorizedTablesAndSortedKeyBlocks{move(sharedState)},
keyBlockMergeTaskDispatcher{move(keyBlockMergeTaskDispatcher)} {}
shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher, uint32_t id,
const string& paramsString)
: Sink{id, paramsString}, SourceOperator{nullptr}, sharedState{std::move(sharedState)},
sharedDispatcher{std::move(sharedDispatcher)} {}

PhysicalOperatorType getOperatorType() override { return ORDER_BY_MERGE; }

Expand All @@ -38,18 +37,20 @@ class OrderByMerge : public Sink, public SourceOperator {
void executeInternal(ExecutionContext* context) override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<OrderByMerge>(sharedFactorizedTablesAndSortedKeyBlocks,
keyBlockMergeTaskDispatcher, children[0]->clone(), id, paramsString);
return make_unique<OrderByMerge>(sharedState, sharedDispatcher, id, paramsString);
}

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

private:
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedFactorizedTablesAndSortedKeyBlocks;
unique_ptr<KeyBlockMerger> keyBlockMerger;
shared_ptr<KeyBlockMergeTaskDispatcher> keyBlockMergeTaskDispatcher;
void initGlobalStateInternal(ExecutionContext* context) override;

private:
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
unique_ptr<KeyBlockMerger> localMerger;
shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher;
};

} // namespace processor
Expand Down
Loading

0 comments on commit 4d5d975

Please sign in to comment.