Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan csr local storage #2468

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ static constexpr common::column_id_t REL_ID_COLUMN_ID = 0;
struct RelNGInfo {
virtual ~RelNGInfo() = default;

virtual bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset,
virtual bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx,
const std::vector<common::row_idx_t>& propertyNodesRowIdx) = 0;
virtual void update(common::offset_t srcNodeOffset, common::offset_t relOffset,
virtual void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx) = 0;
virtual bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) = 0;
virtual bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) = 0;

virtual uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) = 0;

protected:
inline static bool contains(
Expand All @@ -39,12 +41,14 @@ struct RegularRelNGInfo final : public RelNGInfo {
updateInfoPerChunk.resize(numChunks);
}

bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset,
bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx,
const std::vector<common::row_idx_t>& propertyNodesRowIdx) override;
void update(common::offset_t srcNodeOffset, common::offset_t relOffset,
void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx) override;
bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) final;
bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) override;

uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) override;
};

// Info of node groups with CSR chunks for rel tables.
Expand All @@ -60,19 +64,28 @@ struct CSRRelNGInfo final : public RelNGInfo {
updateInfoPerChunk.resize(numChunks);
}

bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset,
bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx,
const std::vector<common::row_idx_t>& propertyNodesRowIdx) override;
void update(common::offset_t srcNodeOffset, common::offset_t relOffset,
void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx) override;
bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) override;
bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) override;

uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) override;
};

class LocalRelNG final : public LocalNodeGroup {
public:
LocalRelNG(common::offset_t nodeGroupStartOffset, common::ColumnDataFormat dataFormat,
std::vector<common::LogicalType*> dataTypes, MemoryManager* mm);

common::row_idx_t scanCSR(common::offset_t srcOffsetInChunk,
common::offset_t posToReadForOffset, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
void applyCSRUpdatesAndDeletions(common::offset_t srcOffsetInChunk,
const std::vector<common::column_id_t>& columnIDs, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVector);

bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
Expand All @@ -86,6 +99,13 @@ class LocalRelNG final : public LocalNodeGroup {
}
inline RelNGInfo* getRelNGInfo() { return relNGInfo.get(); }

private:
void applyCSRUpdates(common::offset_t srcOffsetInChunk, common::column_id_t columnID,
const offset_to_offset_to_row_idx_t& updateInfo, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk,
const offset_to_offset_set_t& deleteInfo, common::ValueVector* relIDVector);

private:
std::unique_ptr<LocalVectorCollection> adjChunk;
std::unique_ptr<RelNGInfo> relNGInfo;
Expand Down
27 changes: 16 additions & 11 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,40 @@
namespace kuzu {
namespace storage {

class LocalRelNG;
struct RelDataReadState : public TableReadState {
common::RelDataDirection direction;
common::ColumnDataFormat dataFormat;
common::offset_t startNodeOffsetInState;
common::offset_t numNodesInState;
common::offset_t currentCSRNodeOffset;
common::offset_t startNodeOffset;
common::offset_t numNodes;
common::offset_t currentNodeOffset;
common::offset_t posInCurrentCSR;
std::vector<common::list_entry_t> csrListEntries;
// Temp auxiliary data structure to scan the offset of each CSR node in the offset column chunk.
std::unique_ptr<ColumnChunk> csrOffsetChunk;

// Following fields are used for local storage.
bool readFromLocalStorage;
LocalRelNG* localNodeGroup;

RelDataReadState(common::ColumnDataFormat dataFormat);
inline bool isOutOfRange(common::offset_t nodeOffset) {
return nodeOffset < startNodeOffsetInState ||
nodeOffset >= (startNodeOffsetInState + numNodesInState);
}
inline bool hasMoreToRead() {
return dataFormat == common::ColumnDataFormat::CSR &&
posInCurrentCSR <
csrListEntries[(currentCSRNodeOffset - startNodeOffsetInState)].size;
return nodeOffset < startNodeOffset || nodeOffset >= (startNodeOffset + numNodes);
}
bool hasMoreToRead(transaction::Transaction* transaction);
void populateCSRListEntries();
std::pair<common::offset_t, common::offset_t> getStartAndEndOffset();

inline bool hasMoreToReadInPersistentStorage() {
return posInCurrentCSR < csrListEntries[(currentNodeOffset - startNodeOffset)].size;
}
bool hasMoreToReadFromLocalStorage();
bool trySwitchToLocalStorage();
};

class RelsStoreStats;
class LocalRelTableData;
struct CSRRelNGInfo;
class LocalRelNG;
class RelTableData final : public TableData {
public:
RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager,
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/scan/scan_multi_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void RelTableCollectionScanner::init() {
bool RelTableCollectionScanner::scan(ValueVector* inVector,
const std::vector<ValueVector*>& outputVectors, Transaction* transaction) {
while (true) {
if (readStates[currentTableIdx]->hasMoreToRead()) {
if (readStates[currentTableIdx]->hasMoreToRead(transaction)) {
KU_ASSERT(readStates[currentTableIdx]->dataFormat == ColumnDataFormat::CSR);
auto scanInfo = scanInfos[currentTableIdx].get();
scanInfo->table->read(
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/scan/scan_rel_csr_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace processor {

bool ScanRelCSRColumns::getNextTuplesInternal(ExecutionContext* context) {
while (true) {
if (scanState->hasMoreToRead()) {
if (scanState->hasMoreToRead(context->clientContext->getActiveTransaction())) {
info->table->read(transaction, *scanState, inVector, outVectors);
return true;
}
Expand Down
Loading