Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Nov 22, 2022
1 parent 18f6f87 commit 6219243
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 89 deletions.
1 change: 0 additions & 1 deletion src/catalog/include/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class CatalogContent {
vector<PropertyNameDataType> structuredPropertyDefinitions,
vector<pair<table_id_t, table_id_t>> srcDstTableIDs);


virtual inline string getNodeTableName(table_id_t tableID) const {
return nodeTableSchemas.at(tableID)->tableName;
}
Expand Down
2 changes: 1 addition & 1 deletion src/parser/query/graph_pattern/include/node_pattern.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class NodePattern {

inline string getVariableName() const { return variableName; }

inline vector<string> getTableNames() const { return tableNames;}
inline vector<string> getTableNames() const { return tableNames; }

inline uint32_t getNumPropertyKeyValPairs() const { return propertyKeyValPairs.size(); }
inline pair<string, ParsedExpression*> getProperty(uint32_t idx) const {
Expand Down
3 changes: 2 additions & 1 deletion src/parser/query/graph_pattern/include/rel_pattern.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class RelPattern : public NodePattern {
RelPattern(string name, string tableName, string lowerBound, string upperBound,
ArrowDirection arrowDirection,
vector<pair<string, unique_ptr<ParsedExpression>>> propertyKeyValPairs)
: NodePattern{std::move(name), vector<string>{std::move(tableName)}, std::move(propertyKeyValPairs)},
: NodePattern{std::move(name), vector<string>{std::move(tableName)},
std::move(propertyKeyValPairs)},
lowerBound{std::move(lowerBound)}, upperBound{std::move(upperBound)},
arrowDirection{arrowDirection} {}

Expand Down
4 changes: 2 additions & 2 deletions src/parser/transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ unique_ptr<NodePattern> Transformer::transformNodePattern(
}
auto properties = ctx.kU_Properties() ? transformProperties(*ctx.kU_Properties()) :
vector<pair<string, unique_ptr<ParsedExpression>>>{};
return make_unique<NodePattern>(std::move(variable), std::move(nodeLabels), std::move(properties));
return make_unique<NodePattern>(
std::move(variable), std::move(nodeLabels), std::move(properties));
}

unique_ptr<PatternElementChain> Transformer::transformPatternElementChain(
Expand Down Expand Up @@ -503,7 +504,6 @@ unique_ptr<ParsedExpression> Transformer::transformListOperatorExpression(
CypherParser::OC_ListOperatorExpressionContext& ctx,
unique_ptr<ParsedExpression> propertyExpression) {
auto rawExpression = propertyExpression->getRawName() + " " + ctx.getText();

if (ctx.children[1]->getText() == ":" || ctx.children[2]->getText() == ":") {
auto listSlice =
make_unique<ParsedFunctionExpression>(LIST_SLICE_FUNC_NAME, move(rawExpression));
Expand Down
6 changes: 4 additions & 2 deletions src/planner/join_order_enumerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,10 @@ void JoinOrderEnumerator::appendScanNode(shared_ptr<NodeExpression>& node, Logic
scan->computeSchema(*schema);
// update cardinality
auto group = schema->getGroup(node->getIDProperty());
auto numNodes =
nodesStatistics.getNodeStatisticsAndDeletedIDs(node->getTableID())->getNumTuples();
auto numNodes = 0u;
for (auto& tableID : node->getTableIDs()) {
numNodes += nodesStatistics.getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples();
}
group->setMultiplier(numNodes);
plan.setLastOperator(std::move(scan));
}
Expand Down
5 changes: 3 additions & 2 deletions src/processor/mapper/map_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ static void mapASPJoin(NodeExpression* joinNode, HashJoinProbe* hashJoinProbe) {
}
}
assert(scanNodeIDCandidates.size() == 1);
auto sharedState = scanNodeIDCandidates[0]->getSharedState();
// set semi masker
auto tableScan = getTableScanForAccHashJoin(hashJoinProbe);
assert(tableScan->getChild(0)->getChild(0)->getOperatorType() ==
PhysicalOperatorType::SEMI_MASKER);
auto semiMasker = (SemiMasker*)tableScan->getChild(0)->getChild(0);
semiMasker->setSharedState(sharedState);
auto sharedState = scanNodeIDCandidates[0]->getSharedState();
assert(sharedState->getNumTableStates() == 1);
semiMasker->setSharedState(sharedState->getTableState(0));
constructAccPipeline(tableScan, hashJoinProbe);
}

Expand Down
11 changes: 7 additions & 4 deletions src/processor/mapper/map_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalScanNodeToPhysical(
auto logicalScan = (LogicalScanNode*)logicalOperator;
auto node = logicalScan->getNode();
auto& nodesStore = storageManager.getNodesStore();
auto nodeTable = nodesStore.getNodeTable(node->getTableID());

auto dataPos = mapperContext.getDataPos(node->getIDProperty());
mapperContext.addComputedExpressions(node->getIDProperty());
auto sharedState = make_shared<ScanNodeIDSharedState>(
&nodesStore.getNodesStatisticsAndDeletedIDs(), node->getTableID());
auto sharedState = make_shared<ScanNodeIDSharedState>();
for (auto& tableID : node->getTableIDs()) {
auto nodeTable = nodesStore.getNodeTable(tableID);
sharedState->addTableState(nodeTable);
}
return make_unique<ScanNodeID>(mapperContext.getResultSetDescriptor()->copy(),
node->getUniqueName(), nodeTable, dataPos, sharedState, getOperatorID(),
node->getUniqueName(), dataPos, sharedState, getOperatorID(),
logicalScan->getExpressionsForPrinting());
}

Expand Down
93 changes: 63 additions & 30 deletions src/processor/operator/include/scan_node_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,51 +54,85 @@ struct ScanNodeIDSemiMask {
unique_ptr<Mask> morselMask;
};

class TableSharedState {

};

class ScanNodeIDSharedState {
class ScanTableNodeIDSharedState {
public:
explicit ScanNodeIDSharedState(
NodesStatisticsAndDeletedIDs* nodesStatistics, table_id_t tableID)
: initialized{false}, nodesStatistics{nodesStatistics},
tableID{tableID}, maxNodeOffset{UINT64_MAX}, maxMorselIdx{UINT64_MAX},
currentNodeOffset{0}, numMaskers{0}, semiMask{nullptr} {}
ScanTableNodeIDSharedState(NodeTable* table)
: table{table}, maxNodeOffset{UINT64_MAX}, maxMorselIdx{UINT64_MAX}, currentNodeOffset{0},
numMaskers{0}, semiMask{nullptr} {}

void initialize(Transaction* transaction);

pair<uint64_t, uint64_t> getNextRangeToRead();
inline NodeTable* getTable() { return table; }

void initSemiMask(Transaction* transaction);
inline void initialize(Transaction* transaction) {
unique_lock lck{mtx};
maxNodeOffset = table->getMaxNodeOffset(transaction);
maxMorselIdx = maxNodeOffset >> DEFAULT_VECTOR_CAPACITY_LOG_2;
}

inline bool isNodeMaskEnabled() const {
return semiMask != nullptr && semiMask->isNodeMaskEnabled();
inline void initSemiMask(Transaction* transaction) {
unique_lock lck{mtx};
if (semiMask == nullptr) {
semiMask =
make_unique<ScanNodeIDSemiMask>(table->getMaxNodeOffset(transaction), numMaskers);
}
}
inline bool isSemiMaskEnabled() { return semiMask != nullptr && semiMask->isNodeMaskEnabled(); }
inline ScanNodeIDSemiMask* getSemiMask() { return semiMask.get(); }
inline uint8_t getNumMaskers() const { return numMaskers; }
inline void incrementNumMaskers() { numMaskers++; }
inline uint8_t getNumMaskers() {
unique_lock lck{mtx};
return numMaskers;
}
inline void incrementNumMaskers() {
unique_lock lck{mtx};
numMaskers++;
}

pair<node_offset_t, node_offset_t> getNextRangeToRead();

private:
// TODO(Xiyang/Guodong): we should get rid of this mutex when shared state is not being
// initialized repeatedly in each task.
mutex mtx;
bool initialized;
NodesStatisticsAndDeletedIDs* nodesStatistics;
table_id_t tableID;

NodeTable* table;
uint64_t maxNodeOffset;
uint64_t maxMorselIdx;
uint64_t currentNodeOffset;
uint8_t numMaskers;
unique_ptr<ScanNodeIDSemiMask> semiMask;
};

class ScanNodeIDSharedState {
public:
ScanNodeIDSharedState() : initialized{false}, currentStateIdx{0} {};

inline void addTableState(NodeTable* table) {
tableStates.push_back(make_unique<ScanTableNodeIDSharedState>(table));
}
inline uint32_t getNumTableStates() const { return tableStates.size(); }
inline ScanTableNodeIDSharedState* getTableState(uint32_t idx) const {
return tableStates[idx].get();
}

void initialize(Transaction* transaction);

tuple<ScanTableNodeIDSharedState*, node_offset_t, node_offset_t> getNextRangeToRead();

private:
mutex mtx;
bool initialized;

vector<unique_ptr<ScanTableNodeIDSharedState>> tableStates;
uint32_t currentStateIdx;
};

class ScanNodeID : public PhysicalOperator, public SourceOperator {
public:
ScanNodeID(unique_ptr<ResultSetDescriptor> resultSetDescriptor, string nodeName,
NodeTable* nodeTable, const DataPos& outDataPos,
shared_ptr<ScanNodeIDSharedState> sharedState, uint32_t id, const string& paramsString)
const DataPos& outDataPos, shared_ptr<ScanNodeIDSharedState> sharedState, uint32_t id,
const string& paramsString)
: PhysicalOperator{id, paramsString}, SourceOperator{std::move(resultSetDescriptor)},
nodeName{std::move(nodeName)}, nodeTable{nodeTable}, outDataPos{outDataPos},
sharedState{std::move(sharedState)} {}
nodeName{std::move(nodeName)}, outDataPos{outDataPos}, sharedState{
std::move(sharedState)} {}

inline string getNodeName() const { return nodeName; }
inline ScanNodeIDSharedState* getSharedState() const { return sharedState.get(); }
Expand All @@ -110,20 +144,19 @@ class ScanNodeID : public PhysicalOperator, public SourceOperator {
bool getNextTuples() override;

inline unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanNodeID>(resultSetDescriptor->copy(), nodeName, nodeTable, outDataPos,
sharedState, id, paramsString);
return make_unique<ScanNodeID>(
resultSetDescriptor->copy(), nodeName, outDataPos, sharedState, id, paramsString);
}

private:
void setSelVector(node_offset_t startOffset, node_offset_t endOffset);
void setSelVector(
ScanTableNodeIDSharedState* tableState, node_offset_t startOffset, node_offset_t endOffset);

private:
string nodeName;
NodeTable* nodeTable;
DataPos outDataPos;
shared_ptr<ScanNodeIDSharedState> sharedState;

shared_ptr<DataChunk> outDataChunk;
shared_ptr<ValueVector> outValueVector;
};

Expand Down
14 changes: 7 additions & 7 deletions src/processor/operator/include/semi_masker.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ class SemiMasker : public PhysicalOperator {
SemiMasker(const DataPos& keyDataPos, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
: PhysicalOperator{std::move(child), id, paramsString},
keyDataPos{keyDataPos}, maskerIdx{0}, scanNodeIDSharedState{nullptr} {}
keyDataPos{keyDataPos}, maskerIdx{0}, scanTableNodeIDSharedState{nullptr} {}

SemiMasker(const SemiMasker& other)
: PhysicalOperator{other.children[0]->clone(), other.id, other.paramsString},
keyDataPos{other.keyDataPos}, maskerIdx{other.maskerIdx},
scanNodeIDSharedState{other.scanNodeIDSharedState} {}
scanTableNodeIDSharedState{other.scanTableNodeIDSharedState} {}

inline void setSharedState(ScanNodeIDSharedState* sharedState) {
scanNodeIDSharedState = sharedState;
maskerIdx = scanNodeIDSharedState->getNumMaskers();
inline void setSharedState(ScanTableNodeIDSharedState* sharedState) {
scanTableNodeIDSharedState = sharedState;
maskerIdx = scanTableNodeIDSharedState->getNumMaskers();
assert(maskerIdx < UINT8_MAX);
scanNodeIDSharedState->incrementNumMaskers();
scanTableNodeIDSharedState->incrementNumMaskers();
}

inline PhysicalOperatorType getOperatorType() override { return SEMI_MASKER; }
Expand All @@ -42,7 +42,7 @@ class SemiMasker : public PhysicalOperator {
// value in the mask by 1. More details are described in ScanNodeIDSemiMask.
uint8_t maskerIdx;
shared_ptr<ValueVector> keyValueVector;
ScanNodeIDSharedState* scanNodeIDSharedState;
ScanTableNodeIDSharedState* scanTableNodeIDSharedState;
};
} // namespace processor
} // namespace kuzu
84 changes: 49 additions & 35 deletions src/processor/operator/scan_node_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,7 @@ void ScanNodeIDSemiMask::setMask(uint64_t nodeOffset, uint8_t maskerIdx) {
morselMask->setMask(nodeOffset >> DEFAULT_VECTOR_CAPACITY_LOG_2, maskerIdx, maskerIdx + 1);
}

void ScanNodeIDSharedState::initialize(kuzu::transaction::Transaction* transaction) {
unique_lock uLck{mtx};
if (initialized) {
return;
}
maxNodeOffset = nodesStatistics->getMaxNodeOffset(transaction, tableID);
maxMorselIdx = maxNodeOffset >> DEFAULT_VECTOR_CAPACITY_LOG_2;
initialized = true;
}

void ScanNodeIDSharedState::initSemiMask(Transaction* transaction) {
unique_lock xLck{mtx};
if (semiMask == nullptr) {
auto maxNodeOffset_ = nodesStatistics->getMaxNodeOffset(transaction, tableID);
semiMask = make_unique<ScanNodeIDSemiMask>(maxNodeOffset_, numMaskers);
}
}

pair<uint64_t, uint64_t> ScanNodeIDSharedState::getNextRangeToRead() {
pair<node_offset_t, node_offset_t> ScanTableNodeIDSharedState::getNextRangeToRead() {
unique_lock lck{mtx};
// Note: we use maxNodeOffset=UINT64_MAX to represent an empty table.
if (currentNodeOffset > maxNodeOffset || maxNodeOffset == UINT64_MAX) {
Expand All @@ -46,10 +28,42 @@ pair<uint64_t, uint64_t> ScanNodeIDSharedState::getNextRangeToRead() {
return make_pair(startOffset, startOffset + range);
}

void ScanNodeIDSharedState::initialize(Transaction* transaction) {
unique_lock lck{mtx};
if (initialized) {
return;
}
for (auto& tableState : tableStates) {
tableState->initialize(transaction);
}
initialized = true;
}

tuple<ScanTableNodeIDSharedState*, node_offset_t, node_offset_t>
ScanNodeIDSharedState::getNextRangeToRead() {
unique_lock lck{mtx};
if (currentStateIdx == tableStates.size()) {
return make_tuple(nullptr, -1, -1);
}
auto [startOffset, endOffset] = tableStates[currentStateIdx]->getNextRangeToRead();
while (startOffset >= endOffset) {
currentStateIdx++;
if (currentStateIdx == tableStates.size()) {
return make_tuple(nullptr, -1, -1);
}
auto [_startOffset, _endOffset] = tableStates[currentStateIdx]->getNextRangeToRead();
startOffset = _startOffset;
endOffset = _endOffset;
}
auto tableState =
currentStateIdx < tableStates.size() ? tableStates[currentStateIdx].get() : nullptr;
return make_tuple(tableState, startOffset, endOffset);
}

shared_ptr<ResultSet> ScanNodeID::init(ExecutionContext* context) {
PhysicalOperator::init(context);
resultSet = populateResultSet();
outDataChunk = resultSet->dataChunks[outDataPos.dataChunkPos];
auto outDataChunk = resultSet->dataChunks[outDataPos.dataChunkPos];
outValueVector = make_shared<ValueVector>(NODE_ID, context->memoryManager);
outValueVector->setSequential();
outDataChunk->insert(outDataPos.valueVectorPos, outValueVector);
Expand All @@ -60,44 +74,44 @@ shared_ptr<ResultSet> ScanNodeID::init(ExecutionContext* context) {
bool ScanNodeID::getNextTuples() {
metrics->executionTime.start();
do {
auto [startOffset, endOffset] = sharedState->getNextRangeToRead();
if (startOffset >= endOffset) {
auto [state, startOffset, endOffset] = sharedState->getNextRangeToRead();
if (state == nullptr) {
metrics->executionTime.stop();
return false;
}
auto nodeIDValues = (nodeID_t*)(outValueVector->getData());
auto size = endOffset - startOffset;
for (auto i = 0u; i < size; ++i) {
nodeIDValues[i].offset = startOffset + i;
nodeIDValues[i].tableID = nodeTable->getTableID();
nodeIDValues[i].tableID = state->getTable()->getTableID();
}
outDataChunk->state->initOriginalAndSelectedSize(size);
setSelVector(startOffset, endOffset);
} while (outDataChunk->state->selVector->selectedSize == 0);
outValueVector->state->initOriginalAndSelectedSize(size);
setSelVector(state, startOffset, endOffset);
} while (outValueVector->state->selVector->selectedSize == 0);
metrics->executionTime.stop();
metrics->numOutputTuple.increase(outValueVector->state->selVector->selectedSize);
return true;
}

void ScanNodeID::setSelVector(node_offset_t startOffset, node_offset_t endOffset) {
if (sharedState->isNodeMaskEnabled()) {
outDataChunk->state->selVector->resetSelectorToValuePosBuffer();
void ScanNodeID::setSelVector(
ScanTableNodeIDSharedState* tableState, node_offset_t startOffset, node_offset_t endOffset) {
if (tableState->isSemiMaskEnabled()) {
outValueVector->state->selVector->resetSelectorToValuePosBuffer();
// Fill selected positions based on node mask for nodes between the given startOffset and
// endOffset. If the node is masked (i.e., valid for read), then it is set to the selected
// positions. Finally, we update the selectedSize for selVector.
sel_t numSelectedValues = 0;
for (auto i = 0u; i < (endOffset - startOffset); i++) {
outDataChunk->state->selVector->selectedPositions[numSelectedValues] = i;
numSelectedValues += sharedState->getSemiMask()->isNodeMasked(i + startOffset);
outValueVector->state->selVector->selectedPositions[numSelectedValues] = i;
numSelectedValues += tableState->getSemiMask()->isNodeMasked(i + startOffset);
}
outDataChunk->state->selVector->selectedSize = numSelectedValues;
outValueVector->state->selVector->selectedSize = numSelectedValues;
} else {
// By default, the selected positions is set to the const incremental pos array.
outDataChunk->state->selVector->resetSelectorToUnselected();
outValueVector->state->selVector->resetSelectorToUnselected();
}
// Apply changes to the selVector from nodes metadata.
nodeTable->getNodeStatisticsAndDeletedIDs()->setDeletedNodeOffsetsForMorsel(
transaction, outValueVector, nodeTable->getTableID());
tableState->getTable()->setSelVectorForDeletedOffsets(transaction, outValueVector);
}

} // namespace processor
Expand Down
Loading

0 comments on commit 6219243

Please sign in to comment.