Skip to content

Commit

Permalink
X
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed May 14, 2023
1 parent ff5180a commit 67f1bb0
Show file tree
Hide file tree
Showing 18 changed files with 188 additions and 206 deletions.
2 changes: 1 addition & 1 deletion src/include/binder/expression/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Expression : public std::enable_shared_from_this<Expression> {
expressionType, std::move(dataType), expression_vector{}, std::move(uniqueName)} {}

virtual ~Expression() = default;

public:
inline void setAlias(const std::string& name) { alias = name; }

Expand Down
4 changes: 2 additions & 2 deletions src/include/binder/expression/node_expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ class NodeExpression : public NodeOrRelExpression {
public:
NodeExpression(
std::string uniqueName, std::string variableName, std::vector<common::table_id_t> tableIDs)
: NodeOrRelExpression{
common::DataType(common::NODE), std::move(uniqueName), std::move(variableName), std::move(tableIDs)} {}
: NodeOrRelExpression{common::DataType(common::NODE), std::move(uniqueName),
std::move(variableName), std::move(tableIDs)} {}

inline void setInternalIDProperty(std::unique_ptr<Expression> expression) {
internalIDExpression = std::move(expression);
Expand Down
4 changes: 2 additions & 2 deletions src/include/binder/expression/node_rel_expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ namespace binder {

class NodeOrRelExpression : public Expression {
public:
NodeOrRelExpression(common::DataType dataType, std::string uniqueName,
std::string variableName, std::vector<common::table_id_t> tableIDs)
NodeOrRelExpression(common::DataType dataType, std::string uniqueName, std::string variableName,
std::vector<common::table_id_t> tableIDs)
: Expression{common::VARIABLE, std::move(dataType), std::move(uniqueName)},
variableName(std::move(variableName)), tableIDs{std::move(tableIDs)} {}
virtual ~NodeOrRelExpression() override = default;
Expand Down
31 changes: 25 additions & 6 deletions src/include/processor/operator/recursive_extend/path_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,36 @@ struct FixedSizePathScanner {

std::vector<common::offset_t> path;
std::vector<size_t> cursorPerDepth;
std::vector<std::vector<common::offset_t>*> nbrsPerDepth;
common::offset_t currentDstOffset;

FixedSizePathScanner(const BaseBFSMorsel& bfsMorsel, size_t depth)
: bfsMorsel{bfsMorsel}, depth{depth} {
path.resize(depth + 1);
cursorPerDepth.resize(depth + 1);
std::fill(cursorPerDepth.begin(), cursorPerDepth.end(), 0);
nbrsPerDepth.resize(depth + 1);
resetState();
}

bool getNext();

inline void resetState() {
std::fill(cursorPerDepth.begin(), cursorPerDepth.end(), 0);
currentDstOffset = common::INVALID_OFFSET;
}

private:
bool dfs(size_t currentDepth, const std::vector<common::offset_t>& nbrs);
bool dfs(size_t currentDepth);

void initDfs(common::offset_t offset, size_t currentDepth) {
path[currentDepth] = offset;
if (currentDepth == 0) {
return;
}
nbrsPerDepth[currentDepth - 1] = &bfsMorsel.frontiers[currentDepth]->bwdEdges.at(offset);
cursorPerDepth[currentDepth - 1] = 0;
initDfs(nbrsPerDepth[currentDepth - 1]->at(0), currentDepth - 1);
}
};

struct PathScanner {
Expand All @@ -34,17 +52,18 @@ struct PathScanner {
PathScanner(const BaseBFSMorsel& bfsMorsel, size_t lowerBound, size_t upperBound)
: lowerBound{lowerBound}, upperBound{upperBound}, cursor{0} {
for (auto i = lowerBound; i <= upperBound; ++i) {
scanners.push_back(std::make_unique<FixedSizePathScanner>(bfsMorsel, lowerBound));
scanners.push_back(std::make_unique<FixedSizePathScanner>(bfsMorsel, i));
}
}

bool getNext();

inline FixedSizePathScanner* getCurrentScanner() {
return scanners[cursor].get();
}
inline FixedSizePathScanner* getCurrentScanner() { return scanners[cursor].get(); }
inline void resetState() {
cursor = 0;
for (auto& scanner : scanners) {
scanner->resetState();
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class BaseRecursiveJoin : public PhysicalOperator {

void initLocalRecursivePlan(ExecutionContext* context);

virtual bool scanOutput() = 0;
bool scanOutput();

// Compute BFS for a given src node.
void computeBFS(ExecutionContext* context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,32 @@ class ShortestPathRecursiveJoin : public BaseRecursiveJoin {
std::shared_ptr<RecursiveJoinSharedState> sharedState,
std::vector<DataPos> vectorsToScanPos, std::vector<ft_col_idx_t> colIndicesToScan,
const DataPos& srcNodeIDVectorPos, const DataPos& pathVectorPos,
const DataPos& dstNodeIDVectorPos, const DataPos& distanceVectorPos,
const DataPos& tmpDstNodeIDVectorPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString, std::unique_ptr<PhysicalOperator> recursiveRoot)
const DataPos& dstNodeIDVectorPos, const DataPos& tmpDstNodeIDVectorPos,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString,
std::unique_ptr<PhysicalOperator> recursiveRoot)
: BaseRecursiveJoin{lowerBound, upperBound, nodeTable, std::move(sharedState),
std::move(vectorsToScanPos), std::move(colIndicesToScan), srcNodeIDVectorPos,
pathVectorPos, dstNodeIDVectorPos, tmpDstNodeIDVectorPos, std::move(child), id,
paramsString, std::move(recursiveRoot)},
distanceVectorPos{distanceVectorPos} {}
paramsString, std::move(recursiveRoot)} {}

ShortestPathRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, storage::NodeTable* nodeTable,
std::shared_ptr<RecursiveJoinSharedState> sharedState,
std::vector<DataPos> vectorsToScanPos, std::vector<ft_col_idx_t> colIndicesToScan,
const DataPos& srcNodeIDVectorPos, const DataPos& pathVectorPos,
const DataPos& dstNodeIDVectorPos, const DataPos& distanceVectorPos,
const DataPos& tmpDstNodeIDVectorPos, uint32_t id, const std::string& paramsString,
std::unique_ptr<PhysicalOperator> recursiveRoot)
const DataPos& dstNodeIDVectorPos, const DataPos& tmpDstNodeIDVectorPos, uint32_t id,
const std::string& paramsString, std::unique_ptr<PhysicalOperator> recursiveRoot)
: BaseRecursiveJoin{lowerBound, upperBound, nodeTable, std::move(sharedState),
std::move(vectorsToScanPos), std::move(colIndicesToScan), srcNodeIDVectorPos,
pathVectorPos, dstNodeIDVectorPos, tmpDstNodeIDVectorPos, id, paramsString,
std::move(recursiveRoot)},
distanceVectorPos{distanceVectorPos} {}
std::move(recursiveRoot)} {}

void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ShortestPathRecursiveJoin>(lowerBound, upperBound, nodeTable,
sharedState, vectorsToScanPos, colIndicesToScan, srcNodeIDVectorPos, pathVectorPos,
dstNodeIDVectorPos, distanceVectorPos, tmpDstNodeIDVectorPos, id, paramsString,
recursiveRoot->clone());
dstNodeIDVectorPos, tmpDstNodeIDVectorPos, id, paramsString, recursiveRoot->clone());
}

private:
bool scanOutput() override;

private:
DataPos distanceVectorPos;

std::shared_ptr<common::ValueVector> distanceVector;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ class VariableLengthRecursiveJoin : public BaseRecursiveJoin {
sharedState, vectorsToScanPos, colIndicesToScan, srcNodeIDVectorPos, pathVectorPos,
dstNodeIDVectorPos, tmpDstNodeIDVectorPos, id, paramsString, recursiveRoot->clone());
}

private:
bool scanOutput() override;
};

} // namespace processor
Expand Down
4 changes: 1 addition & 3 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalRecursiveExtendToPhysica
auto sharedState = std::make_shared<RecursiveJoinSharedState>(sharedInputFTable, nodeTable);
switch (rel->getRelType()) {
case common::QueryRelType::SHORTEST: {
// TODO: remove
auto distanceVectorPos = DataPos{0, 0};
return std::make_unique<ShortestPathRecursiveJoin>(rel->getLowerBound(),
rel->getUpperBound(), nodeTable, sharedState, outDataPoses, colIndicesToScan,
inNodeIDVectorPos, pathVectorPos, outNodeIDVectorPos, distanceVectorPos, tmpDstNodePos,
inNodeIDVectorPos, pathVectorPos, outNodeIDVectorPos, tmpDstNodePos,
std::move(resultCollector), getOperatorID(), extend->getExpressionsForPrinting(),
std::move(scanRelTable));
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/recursive_extend/bfs_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ void BaseBFSMorsel::moveNextLevelAsCurrentLevel() {
currentFrontier = nextFrontier;
currentLevel++;
nextNodeIdxToExtend = 0;
addNextFrontier();
if (currentLevel < upperBound) { // No need to sort if we are not extending further.
addNextFrontier();
std::sort(currentFrontier->nodeOffsets.begin(), currentFrontier->nodeOffsets.end());
}
}
Expand Down
61 changes: 30 additions & 31 deletions src/processor/operator/recursive_extend/path_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,49 @@ namespace kuzu {
namespace processor {

bool FixedSizePathScanner::getNext() {
if (bfsMorsel.frontiers.empty()) {
if (depth >= bfsMorsel.frontiers.size()) { // BFS terminate before current depth
return false;
}
auto frontier = bfsMorsel.frontiers[depth].get();
auto& cursor = cursorPerDepth[depth];
while (true) {
if (cursor >= frontier->nodeOffsets.size()) {
return false;
}
auto offset = frontier->nodeOffsets[cursor];
if (dfs(0)) {
return true;
}
while (cursor < frontier->nodeOffsets.size()) {
currentDstOffset = frontier->nodeOffsets[cursor++];
// Skip nodes that is not in semi mask.
if (!bfsMorsel.isAllDstTarget() && !bfsMorsel.targetDstNodeOffsets.contains(offset)) {
cursor++;
if (!bfsMorsel.isAllDstTarget() &&
!bfsMorsel.targetDstNodeOffsets.contains(currentDstOffset)) {
continue;
}
path[depth] = offset;
if (dfs(1 /* currentDepth */, frontier->bwdEdges.at(offset))) { // found a path
return true;
} else {
cursorPerDepth[depth - 1] = 0;
cursor++;
}
initDfs(currentDstOffset, depth);
return true;
}
return false;
}

bool FixedSizePathScanner::dfs(size_t currentDepth, const std::vector<common::offset_t>& nbrs) {
auto reverseDepth = depth - currentDepth;
auto& cursor = cursorPerDepth[reverseDepth];
if (reverseDepth == 0) { // base case
assert(nbrs.size() == 1); // We perform since source BFS.
path[reverseDepth] = nbrs[0];
return cursor++ == 0;
bool FixedSizePathScanner::dfs(size_t currentDepth) {
if (currentDstOffset == common::INVALID_OFFSET || currentDepth >= depth) {
return false;
}
auto frontier = bfsMorsel.frontiers[reverseDepth].get();
common::offset_t offset;
do {
if (cursor >= nbrs.size()) {
auto& cursor = cursorPerDepth[currentDepth];
cursor++;
while (true) {
if (cursor < nbrsPerDepth[currentDepth]->size()) {
auto nbrOffset = nbrsPerDepth[currentDepth]->at(cursor);
path[currentDepth] = nbrOffset;
if (currentDepth > 0) {
assert(bfsMorsel.frontiers[currentDepth]->bwdEdges.contains(nbrOffset));
nbrsPerDepth[currentDepth - 1] =
&bfsMorsel.frontiers[currentDepth]->bwdEdges.at(nbrOffset);
}
return true;
}
cursor = 0;
if (!dfs(currentDepth + 1)) {
return false;
}
offset = nbrs[cursor++];
path[reverseDepth] = offset;
cursorPerDepth[reverseDepth - 1] = 0;
} while (!dfs(currentDepth + 1, frontier->bwdEdges.at(offset)));
return true;
}
}

bool PathScanner::getNext() {
Expand Down
24 changes: 24 additions & 0 deletions src/processor/operator/recursive_extend/recursive_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ bool BaseRecursiveJoin::getNextTuplesInternal(ExecutionContext* context) {
}
}

bool BaseRecursiveJoin::scanOutput() {
auto vectorSize = 0u;
auto dataVector = common::ListVector::getDataVector(pathVector.get());
auto dataVectorSize = 0u;
while (vectorSize != common::DEFAULT_VECTOR_CAPACITY && pathScanner->getNext()) {
auto currentScanner = pathScanner->getCurrentScanner();
auto& path = currentScanner->path;
auto listEntry = common::ListVector::addList(pathVector.get(), path.size());
pathVector->setValue(vectorSize, listEntry);
for (auto i = 0; i < path.size(); ++i) {
dataVector->setValue<common::nodeID_t>(
dataVectorSize++, common::nodeID_t{path[i], nodeTable->getTableID()});
}
dstNodeIDVector->setValue<common::nodeID_t>(
vectorSize, common::nodeID_t{path[path.size() - 1], nodeTable->getTableID()});
vectorSize++;
}
if (vectorSize == 0) {
return false;
}
pathVector->state->initOriginalAndSelectedSize(vectorSize);
return true;
}

void BaseRecursiveJoin::computeBFS(ExecutionContext* context) {
auto nodeID = srcNodeIDVector->getValue<common::nodeID_t>(
srcNodeIDVector->state->selVector->selectedPositions[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,10 @@ namespace processor {
void ShortestPathRecursiveJoin::initLocalStateInternal(
ResultSet* resultSet_, ExecutionContext* context) {
BaseRecursiveJoin::initLocalStateInternal(resultSet_, context);
distanceVector = resultSet->getValueVector(distanceVectorPos);
auto maxNodeOffset = nodeTable->getMaxNodeOffset(transaction);
bfsMorsel = std::make_unique<ShortestPathBFSMorsel>(
maxNodeOffset, lowerBound, upperBound, sharedState->semiMask.get());
bfsMorsel->resetState();
}

bool ShortestPathRecursiveJoin::scanOutput() {
auto morsel = (ShortestPathBFSMorsel*)bfsMorsel.get();
// if (outputCursor == morsel->dstNodeOffsets.size()) {
// return false;
// }
// auto vectorSize = 0u;
// while (vectorSize != common::DEFAULT_VECTOR_CAPACITY &&
// outputCursor < morsel->dstNodeOffsets.size()) {
// auto offset = morsel->dstNodeOffsets[outputCursor];
// dstNodeIDVector->setValue<common::nodeID_t>(
// vectorSize, common::nodeID_t{offset, nodeTable->getTableID()});
// distanceVector->setValue<int64_t>(vectorSize,
// morsel->dstNodeOffset2PathLength.at(offset)); vectorSize++; outputCursor++;
// }
// dstNodeIDVector->state->initOriginalAndSelectedSize(vectorSize);
return true;
pathScanner = std::make_unique<PathScanner>(*bfsMorsel, lowerBound, upperBound);
}

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,5 @@ void VariableLengthRecursiveJoin::initLocalStateInternal(
pathScanner = std::make_unique<PathScanner>(*bfsMorsel, lowerBound, upperBound);
}

bool VariableLengthRecursiveJoin::scanOutput() {
auto vectorSize = 0u;
auto dataVector = (common::nodeID_t*)common::ListVector::getDataVector(pathVector.get());
auto dataVectorSize = 0u;
while (pathScanner->getNext() && vectorSize != common::DEFAULT_VECTOR_CAPACITY) {
auto currentScanner = pathScanner->getCurrentScanner();
auto pathLength = currentScanner->path.size();
auto listEntry = common::ListVector::addList(pathVector.get(), pathLength);
pathVector->setValue(vectorSize, listEntry);
for (auto i = 0; i < pathLength; ++i) {
dataVector[dataVectorSize++] =
common::nodeID_t{nodeTable->getTableID(), currentScanner->path[i]};
}
dstNodeIDVector->setValue<common::nodeID_t>(vectorSize,
common::nodeID_t{nodeTable->getTableID(), currentScanner->path[pathLength - 1]});
vectorSize++;
}
if (vectorSize == 0) {
return false;
}
pathVector->state->initOriginalAndSelectedSize(vectorSize);
return true;
}

} // namespace processor
} // namespace kuzu
4 changes: 2 additions & 2 deletions test/runner/e2e_exception_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class TinySnbExceptionTest : public DBTest {

TEST_F(TinySnbExceptionTest, ReadVarlengthRelPropertyTest1) {
auto result = conn->query("MATCH (a:person)-[e:knows*1..3]->(b:person) RETURN e.age;");
ASSERT_STREQ("Binder exception: Cannot read property of variable length rel e.",
ASSERT_STREQ("Binder exception: e has data type VAR_LIST. (STRUCT,REL,NODE) was expected.",
result->getErrorMessage().c_str());
}

TEST_F(TinySnbExceptionTest, ReadVarlengthRelPropertyTest2) {
auto result =
conn->query("MATCH (a:person)-[e:knows*1..3]->(b:person) WHERE ID(e) = 0 RETURN COUNT(*);");
ASSERT_STREQ("Binder exception: Cannot read property of variable length rel e.",
ASSERT_STREQ("Binder exception: e has data type VAR_LIST. (REL,NODE) was expected.",
result->getErrorMessage().c_str());
}

Expand Down
Loading

0 comments on commit 67f1bb0

Please sign in to comment.