Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Nov 29, 2022
1 parent f3e832e commit 5d1ea20
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 41 deletions.
3 changes: 1 addition & 2 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ struct KeyBlockMergeMorsel;
// This struct stores the string key column information. We can utilize the
// pre-computed indexes and offsets to expedite the tuple comparison in merge sort.
struct StrKeyColInfo {
StrKeyColInfo(
uint32_t colOffsetInFT, uint32_t colOffsetInEncodedKeyBlock, bool isAscOrder)
StrKeyColInfo(uint32_t colOffsetInFT, uint32_t colOffsetInEncodedKeyBlock, bool isAscOrder)
: colOffsetInFT{colOffsetInFT}, colOffsetInEncodedKeyBlock{colOffsetInEncodedKeyBlock},
isAscOrder{isAscOrder} {}

Expand Down
21 changes: 14 additions & 7 deletions src/include/processor/operator/result_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class FTableSharedState {

inline shared_ptr<FactorizedTable> getTable() { return table; }

inline uint64_t getMaxMorselSize() const {
inline uint64_t getMaxMorselSize() {
lock_guard<mutex> lck{mtx};
return table->hasUnflatCol() ? 1 : DEFAULT_VECTOR_CAPACITY;
}
unique_ptr<FTableScanMorsel> getMorsel(uint64_t maxMorselSize);
Expand All @@ -41,13 +42,13 @@ class FTableSharedState {
};

class ResultCollector : public Sink {

public:
ResultCollector(vector<pair<DataPos, bool>> vectorsToCollectInfo,
ResultCollector(vector<pair<DataPos, DataType>> payloadsPosAndType, vector<bool> isPayloadFlat,
shared_ptr<FTableSharedState> sharedState, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
: Sink{move(child), id, paramsString}, vectorsToCollectInfo{move(vectorsToCollectInfo)},
sharedState{move(sharedState)} {}
: Sink{std::move(child), id, paramsString}, payloadsPosAndType{std::move(
payloadsPosAndType)},
isPayloadFlat{std::move(isPayloadFlat)}, sharedState{std::move(sharedState)} {}

PhysicalOperatorType getOperatorType() override { return RESULT_COLLECTOR; }

Expand All @@ -57,7 +58,7 @@ class ResultCollector : public Sink {

unique_ptr<PhysicalOperator> clone() override {
return make_unique<ResultCollector>(
vectorsToCollectInfo, sharedState, children[0]->clone(), id, paramsString);
payloadsPosAndType, isPayloadFlat, sharedState, children[0]->clone(), id, paramsString);
}

inline shared_ptr<FTableSharedState> getSharedState() { return sharedState; }
Expand All @@ -66,7 +67,13 @@ class ResultCollector : public Sink {
}

private:
vector<pair<DataPos, bool>> vectorsToCollectInfo;
void initGlobalStateInternal(ExecutionContext* context) override;

unique_ptr<FactorizedTableSchema> populateTableSchema();

private:
vector<pair<DataPos, DataType>> payloadsPosAndType;
vector<bool> isPayloadFlat;
vector<shared_ptr<ValueVector>> vectorsToCollect;
shared_ptr<FTableSharedState> sharedState;
unique_ptr<FactorizedTable> localTable;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/semi_masker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SemiMasker : public PhysicalOperator {
inline unique_ptr<PhysicalOperator> clone() override { return make_unique<SemiMasker>(*this); }

private:
void initGlobalStateInternal(ExecutionContext *context) override;
void initGlobalStateInternal(ExecutionContext* context) override;

private:
DataPos keyDataPos;
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/processor_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class ProcessorTask : public Task {
void run() override;
void finalizeIfNecessary() override;


private:
Sink* sink;
ExecutionContext* executionContext;
Expand Down
14 changes: 6 additions & 8 deletions src/processor/mapper/plan_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,18 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalOperatorToPhysical(
unique_ptr<ResultCollector> PlanMapper::appendResultCollector(
const expression_vector& expressionsToCollect, const Schema& schema,
unique_ptr<PhysicalOperator> prevOperator, MapperContext& mapperContext) {
vector<pair<DataPos, bool>> valueVectorsToCollectInfo;
vector<pair<DataPos, DataType>> payloadsPosAndType;
vector<bool> isPayloadFlat;
for (auto& expression : expressionsToCollect) {
auto expressionName = expression->getUniqueName();
auto dataPos = mapperContext.getDataPos(expressionName);
auto isFlat = schema.getGroup(expressionName)->getIsFlat();
valueVectorsToCollectInfo.emplace_back(dataPos, isFlat);
}
auto paramsString = string();
for (auto& expression : expressionsToCollect) {
paramsString += expression->getUniqueName() + ", ";
payloadsPosAndType.emplace_back(dataPos, expression->dataType);
isPayloadFlat.push_back(isFlat);
}
auto sharedState = make_shared<FTableSharedState>();
return make_unique<ResultCollector>(
valueVectorsToCollectInfo, sharedState, move(prevOperator), getOperatorID(), paramsString);
return make_unique<ResultCollector>(payloadsPosAndType, isPayloadFlat, sharedState,
std::move(prevOperator), getOperatorID(), ExpressionUtil::toString(expressionsToCollect));
}

} // namespace processor
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/intersect/intersect_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace kuzu {
namespace processor {

void IntersectSharedState::initEmptyHashTable(MemoryManager& memoryManager,
uint64_t numKeyColumns, unique_ptr<FactorizedTableSchema> tableSchema) {
void IntersectSharedState::initEmptyHashTable(MemoryManager& memoryManager, uint64_t numKeyColumns,
unique_ptr<FactorizedTableSchema> tableSchema) {
assert(hashTable == nullptr && numKeyColumns == 1);
hashTable = make_unique<IntersectHashTable>(memoryManager, std::move(tableSchema));
}
Expand Down
5 changes: 2 additions & 3 deletions src/processor/operator/order_by/order_by_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ void OrderByMerge::executeInternal(ExecutionContext* context) {

void OrderByMerge::initGlobalStateInternal(ExecutionContext* context) {
// TODO(Ziyi): directly feed sharedState to merger and dispatcher.
sharedDispatcher->init(context->memoryManager,
sharedState->sortedKeyBlocks, sharedState->factorizedTables, sharedState->strKeyColsInfo,
sharedState->numBytesPerTuple);
sharedDispatcher->init(context->memoryManager, sharedState->sortedKeyBlocks,
sharedState->factorizedTables, sharedState->strKeyColsInfo, sharedState->numBytesPerTuple);
}

} // namespace processor
Expand Down
1 change: 0 additions & 1 deletion src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ shared_ptr<ResultSet> PhysicalOperator::init(ExecutionContext* context) {
}

void PhysicalOperator::initGlobalState(ExecutionContext* context) {
transaction = context->transaction;
for (auto& child : children) {
child->initGlobalState(context);
}
Expand Down
35 changes: 20 additions & 15 deletions src/processor/operator/result_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ namespace processor {

void FTableSharedState::initTableIfNecessary(
MemoryManager* memoryManager, unique_ptr<FactorizedTableSchema> tableSchema) {
lock_guard<mutex> lck{mtx};
if (table == nullptr) {
nextTupleIdxToScan = 0u;
table = make_unique<FactorizedTable>(memoryManager, move(tableSchema));
}
assert(table == nullptr);
nextTupleIdxToScan = 0u;
table = make_unique<FactorizedTable>(memoryManager, std::move(tableSchema));
}

unique_ptr<FTableScanMorsel> FTableSharedState::getMorsel(uint64_t maxMorselSize) {
Expand All @@ -22,20 +20,12 @@ unique_ptr<FTableScanMorsel> FTableSharedState::getMorsel(uint64_t maxMorselSize

shared_ptr<ResultSet> ResultCollector::init(ExecutionContext* context) {
resultSet = PhysicalOperator::init(context);
unique_ptr<FactorizedTableSchema> tableSchema = make_unique<FactorizedTableSchema>();
for (auto& vectorToCollectInfo : vectorsToCollectInfo) {
auto dataPos = vectorToCollectInfo.first;
for (auto [dataPos, _] : payloadsPosAndType) {
auto vector =
resultSet->dataChunks[dataPos.dataChunkPos]->valueVectors[dataPos.valueVectorPos];
vectorsToCollect.push_back(vector);
tableSchema->appendColumn(
make_unique<ColumnSchema>(!vectorToCollectInfo.second, dataPos.dataChunkPos,
vectorToCollectInfo.second ? vector->getNumBytesPerValue() :
(uint32_t)sizeof(overflow_value_t)));
}
localTable = make_unique<FactorizedTable>(
context->memoryManager, make_unique<FactorizedTableSchema>(*tableSchema));
sharedState->initTableIfNecessary(context->memoryManager, move(tableSchema));
localTable = make_unique<FactorizedTable>(context->memoryManager, populateTableSchema());
return resultSet;
}

Expand All @@ -52,5 +42,20 @@ void ResultCollector::executeInternal(ExecutionContext* context) {
}
}

void ResultCollector::initGlobalStateInternal(ExecutionContext* context) {
sharedState->initTableIfNecessary(context->memoryManager, populateTableSchema());
}

unique_ptr<FactorizedTableSchema> ResultCollector::populateTableSchema() {
unique_ptr<FactorizedTableSchema> tableSchema = make_unique<FactorizedTableSchema>();
for (auto i = 0u; i < payloadsPosAndType.size(); ++i) {
auto [dataPos, dataType] = payloadsPosAndType[i];
tableSchema->appendColumn(make_unique<ColumnSchema>(!isPayloadFlat[i], dataPos.dataChunkPos,
isPayloadFlat[i] ? Types::getDataTypeSize(dataType) :
(uint32_t)sizeof(overflow_value_t)));
}
return tableSchema;
}

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/processor/operator/scan_node_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ bool ScanNodeID::getNextTuplesInternal() {
}

void ScanNodeID::initGlobalStateInternal(ExecutionContext* context) {
sharedState->initialize(transaction);
sharedState->initialize(context->transaction);
}

void ScanNodeID::setSelVector(
Expand Down

0 comments on commit 5d1ea20

Please sign in to comment.