Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add init global state interface #1077

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions src/include/processor/operator/hash_join/hash_join_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,41 @@ namespace processor {
// task/pipeline, and probed by the HashJoinProbe operators.
class HashJoinSharedState {
public:
explicit HashJoinSharedState(vector<DataType> payloadDataTypes)
: payloadDataTypes{move(payloadDataTypes)} {}
HashJoinSharedState() = default;

virtual ~HashJoinSharedState() = default;

virtual void initEmptyHashTableIfNecessary(MemoryManager& memoryManager, uint64_t numKeyColumns,
virtual void initEmptyHashTable(MemoryManager& memoryManager, uint64_t numKeyColumns,
unique_ptr<FactorizedTableSchema> tableSchema);

void mergeLocalHashTable(JoinHashTable& localHashTable);

inline JoinHashTable* getHashTable() { return hashTable.get(); }

inline vector<DataType> getPayloadDataTypes() { return payloadDataTypes; }

protected:
mutex hashJoinSharedStateMutex;
mutex mtx;
unique_ptr<JoinHashTable> hashTable;
vector<DataType> payloadDataTypes;
};

struct BuildDataInfo {

public:
BuildDataInfo(vector<DataPos> keysDataPos, vector<DataPos> payloadsDataPos,
vector<bool> isPayloadsFlat, vector<bool> isPayloadsInKeyChunk)
: keysDataPos{std::move(keysDataPos)}, payloadsDataPos{std::move(payloadsDataPos)},
isPayloadsFlat{move(isPayloadsFlat)}, isPayloadsInKeyChunk{move(isPayloadsInKeyChunk)} {}
BuildDataInfo(vector<pair<DataPos, DataType>> keysPosAndType,
vector<pair<DataPos, DataType>> payloadsPosAndType, vector<bool> isPayloadsFlat,
vector<bool> isPayloadsInKeyChunk)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadsFlat{std::move(isPayloadsFlat)}, isPayloadsInKeyChunk{
std::move(isPayloadsInKeyChunk)} {}

BuildDataInfo(const BuildDataInfo& other)
: BuildDataInfo{other.keysDataPos, other.payloadsDataPos, other.isPayloadsFlat,
: BuildDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadsFlat,
other.isPayloadsInKeyChunk} {}

inline uint32_t getNumKeys() const { return keysPosAndType.size(); }

public:
vector<DataPos> keysDataPos;
vector<DataPos> payloadsDataPos;
vector<pair<DataPos, DataType>> keysPosAndType;
vector<pair<DataPos, DataType>> payloadsPosAndType;
vector<bool> isPayloadsFlat;
vector<bool> isPayloadsInKeyChunk;
};
Expand All @@ -63,8 +63,8 @@ class HashJoinBuild : public Sink {
public:
HashJoinBuild(shared_ptr<HashJoinSharedState> sharedState, const BuildDataInfo& buildDataInfo,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, sharedState{move(sharedState)}, buildDataInfo{
buildDataInfo} {}
: Sink{std::move(child), id, paramsString}, sharedState{std::move(sharedState)},
buildDataInfo{buildDataInfo} {}
~HashJoinBuild() override = default;

inline PhysicalOperatorType getOperatorType() override { return HASH_JOIN_BUILD; }
Expand All @@ -80,7 +80,11 @@ class HashJoinBuild : public Sink {
}

protected:
virtual void initHashTable(
// TODO(Guodong/Xiyang): construct schema in mapper.
unique_ptr<FactorizedTableSchema> populateTableSchema();
void initGlobalStateInternal(ExecutionContext* context) override;

virtual void initLocalHashTable(
MemoryManager& memoryManager, unique_ptr<FactorizedTableSchema> tableSchema);
inline void appendVectors() { hashTable->append(vectorsToAppend); }

Expand Down
12 changes: 7 additions & 5 deletions src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@ struct ProbeState {
};

struct ProbeDataInfo {

public:
ProbeDataInfo(vector<DataPos> keysDataPos, vector<DataPos> payloadsOutputDataPos)
ProbeDataInfo(
vector<DataPos> keysDataPos, vector<pair<DataPos, DataType>> payloadsOutPosAndType)
: keysDataPos{std::move(keysDataPos)},
payloadsOutputDataPos{std::move(payloadsOutputDataPos)}, markDataPos{
payloadsOutPosAndType{std::move(payloadsOutPosAndType)}, markDataPos{
UINT32_MAX, UINT32_MAX} {}

ProbeDataInfo(const ProbeDataInfo& other)
: ProbeDataInfo{other.keysDataPos, other.payloadsOutputDataPos} {
: ProbeDataInfo{other.keysDataPos, other.payloadsOutPosAndType} {
markDataPos = other.markDataPos;
}

inline uint32_t getNumPayloads() const { return payloadsOutPosAndType.size(); }

public:
vector<DataPos> keysDataPos;
vector<DataPos> payloadsOutputDataPos;
vector<pair<DataPos, DataType>> payloadsOutPosAndType;
DataPos markDataPos;
};

Expand Down
7 changes: 3 additions & 4 deletions src/include/processor/operator/intersect/intersect_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ namespace processor {

class IntersectSharedState : public HashJoinSharedState {
public:
explicit IntersectSharedState(vector<DataType> payloadDataTypes)
: HashJoinSharedState{move(payloadDataTypes)} {}
IntersectSharedState() = default;

void initEmptyHashTableIfNecessary(MemoryManager& memoryManager, uint64_t numKeyColumns,
void initEmptyHashTable(MemoryManager& memoryManager, uint64_t numKeyColumns,
unique_ptr<FactorizedTableSchema> tableSchema) override;
};

Expand All @@ -30,7 +29,7 @@ class IntersectBuild : public HashJoinBuild {
}

protected:
void initHashTable(
void initLocalHashTable(
MemoryManager& memoryManager, unique_ptr<FactorizedTableSchema> tableSchema) override;
};

Expand Down
9 changes: 3 additions & 6 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ struct KeyBlockMergeMorsel;
// This struct stores the string key column information. We can utilize the
// pre-computed indexes and offsets to expedite the tuple comparison in merge sort.
struct StrKeyColInfo {
StrKeyColInfo(
uint32_t colOffsetInFT, uint32_t colOffsetInEncodedKeyBlock, bool isAscOrder, bool isStrCol)
StrKeyColInfo(uint32_t colOffsetInFT, uint32_t colOffsetInEncodedKeyBlock, bool isAscOrder)
: colOffsetInFT{colOffsetInFT}, colOffsetInEncodedKeyBlock{colOffsetInEncodedKeyBlock},
isAscOrder{isAscOrder} {}

Expand Down Expand Up @@ -192,17 +191,15 @@ class KeyBlockMergeTaskDispatcher {
void doneMorsel(unique_ptr<KeyBlockMergeMorsel> morsel);

// This function is used to initialize the columns of keyBlockMergeTaskDispatcher based on
// sharedFactorizedTablesAndSortedKeyBlocks. If the class is already initialized, then it
// just returns.
void initIfNecessary(MemoryManager* memoryManager,
// sharedFactorizedTablesAndSortedKeyBlocks.
void init(MemoryManager* memoryManager,
shared_ptr<queue<shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks,
vector<shared_ptr<FactorizedTable>>& factorizedTables,
vector<StrKeyColInfo>& strKeyColsInfo, uint64_t numBytesPerTuple);

private:
mutex mtx;

bool isInitialized = false;
MemoryManager* memoryManager;
shared_ptr<queue<shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;
vector<shared_ptr<KeyBlockMergeTask>> activeKeyBlockMergeTasks;
Expand Down
65 changes: 32 additions & 33 deletions src/include/processor/operator/order_by/order_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ class SharedFactorizedTablesAndSortedKeyBlocks {
: nextFactorizedTableIdx{0}, sortedKeyBlocks{
make_shared<queue<shared_ptr<MergedKeyBlocks>>>()} {}

inline DataType getDataType(uint32_t idx) { return dataTypes[idx]; }

uint8_t getNextFactorizedTableIdx() {
unique_lock lck{orderBySharedStateMutex};
unique_lock lck{mtx};
return nextFactorizedTableIdx++;
}

void appendFactorizedTable(
uint8_t factorizedTableIdx, shared_ptr<FactorizedTable> factorizedTable) {
unique_lock lck{orderBySharedStateMutex};
unique_lock lck{mtx};
// If the factorizedTables is full, resize the factorizedTables and
// insert the factorizedTable to the set.
if (factorizedTableIdx >= factorizedTables.size()) {
Expand All @@ -44,18 +42,13 @@ class SharedFactorizedTablesAndSortedKeyBlocks {
}

void appendSortedKeyBlock(shared_ptr<MergedKeyBlocks> mergedDataBlocks) {
unique_lock lck{orderBySharedStateMutex};
unique_lock lck{mtx};
sortedKeyBlocks->emplace(mergedDataBlocks);
}

void setNumBytesPerTuple(uint32_t numBytesPerTuple_) {
unique_lock lck{orderBySharedStateMutex};
this->numBytesPerTuple = numBytesPerTuple_;
}

void setDataTypes(vector<DataType> dataTypes_) {
unique_lock lck{orderBySharedStateMutex};
this->dataTypes = move(dataTypes_);
void setNumBytesPerTuple(uint32_t _numBytesPerTuple) {
assert(numBytesPerTuple == UINT32_MAX);
numBytesPerTuple = _numBytesPerTuple;
}

void combineFTHasNoNullGuarantee() {
Expand All @@ -64,51 +57,53 @@ class SharedFactorizedTablesAndSortedKeyBlocks {
}
}

void setStrKeyColInfo(vector<StrKeyColInfo> strKeyColsInfo) {
unique_lock lck{orderBySharedStateMutex};
this->strKeyColsInfo = move(strKeyColsInfo);
void setStrKeyColInfo(vector<StrKeyColInfo> _strKeyColsInfo) {
assert(strKeyColsInfo.empty());
strKeyColsInfo = std::move(_strKeyColsInfo);
}

private:
mutex orderBySharedStateMutex;
mutex mtx;

public:
vector<shared_ptr<FactorizedTable>> factorizedTables;
uint8_t nextFactorizedTableIdx;
shared_ptr<queue<shared_ptr<MergedKeyBlocks>>> sortedKeyBlocks;

uint32_t numBytesPerTuple;
uint32_t numBytesPerTuple = UINT32_MAX; // encoding size
vector<StrKeyColInfo> strKeyColsInfo;
vector<DataType> dataTypes;
};

struct OrderByDataInfo {

public:
OrderByDataInfo(vector<DataPos> keyDataPoses, vector<DataPos> allDataPoses,
vector<bool> isVectorFlat, vector<bool> isAscOrder)
: keyDataPoses{move(keyDataPoses)}, allDataPoses{move(allDataPoses)},
isVectorFlat{move(isVectorFlat)}, isAscOrder{move(isAscOrder)} {}
OrderByDataInfo(vector<pair<DataPos, DataType>> keysPosAndType,
vector<pair<DataPos, DataType>> payloadsPosAndType, vector<bool> isPayloadFlat,
vector<bool> isAscOrder, bool mayContainUnflatKey)
: keysPosAndType{std::move(keysPosAndType)}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, isAscOrder{std::move(isAscOrder)},
mayContainUnflatKey{mayContainUnflatKey} {}

OrderByDataInfo(const OrderByDataInfo& other)
: OrderByDataInfo{
other.keyDataPoses, other.allDataPoses, other.isVectorFlat, other.isAscOrder} {}
: OrderByDataInfo{other.keysPosAndType, other.payloadsPosAndType, other.isPayloadFlat,
other.isAscOrder, other.mayContainUnflatKey} {}

public:
vector<DataPos> keyDataPoses;
vector<DataPos> allDataPoses;
vector<bool> isVectorFlat;
vector<pair<DataPos, DataType>> keysPosAndType;
vector<pair<DataPos, DataType>> payloadsPosAndType;
vector<bool> isPayloadFlat;
vector<bool> isAscOrder;
// TODO(Ziyi): We should figure out unflat keys in a more general way.
bool mayContainUnflatKey;
};

class OrderBy : public Sink {
public:
OrderBy(const OrderByDataInfo& orderByDataInfo,
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, orderByDataInfo{orderByDataInfo}, sharedState{move(
sharedState)} {
}
: Sink{std::move(child), id, paramsString}, orderByDataInfo{orderByDataInfo},
sharedState{std::move(sharedState)} {}

PhysicalOperatorType getOperatorType() override { return ORDER_BY; }

Expand All @@ -129,14 +124,18 @@ class OrderBy : public Sink {
orderByDataInfo, sharedState, children[0]->clone(), id, paramsString);
}

private:
unique_ptr<FactorizedTableSchema> populateTableSchema();

void initGlobalStateInternal(ExecutionContext* context) override;

private:
uint8_t factorizedTableIdx;
OrderByDataInfo orderByDataInfo;
unique_ptr<OrderByKeyEncoder> orderByKeyEncoder;
unique_ptr<RadixSort> radixSorter;
vector<shared_ptr<ValueVector>> keyVectors;
vector<shared_ptr<ValueVector>> vectorsToAppend;
vector<StrKeyColInfo> strKeyColInfo;
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
shared_ptr<FactorizedTable> localFactorizedTable;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class OrderByKeyEncoder {

public:
OrderByKeyEncoder(vector<shared_ptr<ValueVector>>& orderByVectors, vector<bool>& isAscOrder,
MemoryManager* memoryManager, uint8_t ftIdx, uint32_t numTuplesPerBlockInFT);
MemoryManager* memoryManager, uint8_t ftIdx, uint32_t numTuplesPerBlockInFT,
uint32_t numBytesPerTuple);

inline vector<shared_ptr<DataBlock>>& getKeyBlocks() { return keyBlocks; }

Expand Down Expand Up @@ -79,6 +80,8 @@ class OrderByKeyEncoder {
return *(strBuffer + 13) == (isAsc ? UINT8_MAX : 0);
}

static uint32_t getNumBytesPerTuple(const vector<shared_ptr<ValueVector>>& keyVectors);

static uint32_t getEncodingSize(const DataType& dataType);

void encodeKeys();
Expand Down
27 changes: 14 additions & 13 deletions src/include/processor/operator/order_by/order_by_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ class OrderByMerge : public Sink, public SourceOperator {
// This constructor will only be called by the mapper when constructing the orderByMerge
// operator, because the mapper doesn't know the existence of keyBlockMergeTaskDispatcher
OrderByMerge(shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, SourceOperator{nullptr},
sharedFactorizedTablesAndSortedKeyBlocks{move(sharedState)},
keyBlockMergeTaskDispatcher{make_shared<KeyBlockMergeTaskDispatcher>()} {}
: Sink{std::move(child), id, paramsString}, SourceOperator{nullptr},
sharedState{std::move(sharedState)}, sharedDispatcher{std::move(sharedDispatcher)} {}

// This constructor is used for cloning only.
OrderByMerge(shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState,
shared_ptr<KeyBlockMergeTaskDispatcher> keyBlockMergeTaskDispatcher,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: Sink{move(child), id, paramsString}, SourceOperator{nullptr},
sharedFactorizedTablesAndSortedKeyBlocks{move(sharedState)},
keyBlockMergeTaskDispatcher{move(keyBlockMergeTaskDispatcher)} {}
shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher, uint32_t id,
const string& paramsString)
: Sink{id, paramsString}, SourceOperator{nullptr}, sharedState{std::move(sharedState)},
sharedDispatcher{std::move(sharedDispatcher)} {}

PhysicalOperatorType getOperatorType() override { return ORDER_BY_MERGE; }

Expand All @@ -38,18 +37,20 @@ class OrderByMerge : public Sink, public SourceOperator {
void executeInternal(ExecutionContext* context) override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<OrderByMerge>(sharedFactorizedTablesAndSortedKeyBlocks,
keyBlockMergeTaskDispatcher, children[0]->clone(), id, paramsString);
return make_unique<OrderByMerge>(sharedState, sharedDispatcher, id, paramsString);
}

inline double getExecutionTime(Profiler& profiler) const override {
return profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
}

private:
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedFactorizedTablesAndSortedKeyBlocks;
unique_ptr<KeyBlockMerger> keyBlockMerger;
shared_ptr<KeyBlockMergeTaskDispatcher> keyBlockMergeTaskDispatcher;
void initGlobalStateInternal(ExecutionContext* context) override;

private:
shared_ptr<SharedFactorizedTablesAndSortedKeyBlocks> sharedState;
unique_ptr<KeyBlockMerger> localMerger;
shared_ptr<KeyBlockMergeTaskDispatcher> sharedDispatcher;
};

} // namespace processor
Expand Down
Loading