Skip to content

Commit

Permalink
scan csr local storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Nov 20, 2023
1 parent bf50060 commit 638b802
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 126 deletions.
38 changes: 29 additions & 9 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ static constexpr common::column_id_t REL_ID_COLUMN_ID = 0;
struct RelNGInfo {
virtual ~RelNGInfo() = default;

virtual bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset,
virtual bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx,
const std::vector<common::row_idx_t>& propertyNodesRowIdx) = 0;
virtual void update(common::offset_t srcNodeOffset, common::offset_t relOffset,
virtual void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx) = 0;
virtual bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) = 0;
virtual bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) = 0;

virtual uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) = 0;

protected:
inline static bool contains(
Expand All @@ -39,12 +41,14 @@ struct RegularRelNGInfo final : public RelNGInfo {
updateInfoPerChunk.resize(numChunks);
}

bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset,
bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx,
const std::vector<common::row_idx_t>& propertyNodesRowIdx) override;
void update(common::offset_t srcNodeOffset, common::offset_t relOffset,
void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx) override;
bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) final;
bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) override;

uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) override;
};

// Info of node groups with CSR chunks for rel tables.
Expand All @@ -60,19 +64,28 @@ struct CSRRelNGInfo final : public RelNGInfo {
updateInfoPerChunk.resize(numChunks);
}

bool insert(common::offset_t srcNodeOffset, common::offset_t relOffset,
bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx,
const std::vector<common::row_idx_t>& propertyNodesRowIdx) override;
void update(common::offset_t srcNodeOffset, common::offset_t relOffset,
void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx) override;
bool delete_(common::offset_t srcNodeOffset, common::offset_t relOffset) override;
bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset) override;

uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk) override;
};

class LocalRelNG final : public LocalNodeGroup {
public:
LocalRelNG(common::offset_t nodeGroupStartOffset, common::ColumnDataFormat dataFormat,
std::vector<common::LogicalType*> dataTypes, MemoryManager* mm);

common::row_idx_t scanCSR(common::offset_t srcOffsetInChunk,
common::offset_t posToReadForOffset, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
void applyCSRUpdatesAndDeletions(common::offset_t srcOffsetInChunk,
const std::vector<common::column_id_t>& columnIDs, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVector);

bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
Expand All @@ -86,6 +99,13 @@ class LocalRelNG final : public LocalNodeGroup {
}
inline RelNGInfo* getRelNGInfo() { return relNGInfo.get(); }

private:
void applyCSRUpdates(common::offset_t srcOffsetInChunk, common::column_id_t columnID,
const offset_to_offset_to_row_idx_t& updateInfo, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk,
const offset_to_offset_set_t& deleteInfo, common::ValueVector* relIDVector);

private:
std::unique_ptr<LocalVectorCollection> adjChunk;
std::unique_ptr<RelNGInfo> relNGInfo;
Expand Down
27 changes: 16 additions & 11 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,40 @@
namespace kuzu {
namespace storage {

class LocalRelNG;
struct RelDataReadState : public TableReadState {
common::RelDataDirection direction;
common::ColumnDataFormat dataFormat;
common::offset_t startNodeOffsetInState;
common::offset_t numNodesInState;
common::offset_t currentCSRNodeOffset;
common::offset_t startNodeOffset;
common::offset_t numNodes;
common::offset_t currentNodeOffset;
common::offset_t posInCurrentCSR;
std::vector<common::list_entry_t> csrListEntries;
// Temp auxiliary data structure to scan the offset of each CSR node in the offset column chunk.
std::unique_ptr<ColumnChunk> csrOffsetChunk;

// Following fields are used for local storage.
bool readFromLocalStorage;
LocalRelNG* localNodeGroup;

RelDataReadState(common::ColumnDataFormat dataFormat);
inline bool isOutOfRange(common::offset_t nodeOffset) {
return nodeOffset < startNodeOffsetInState ||
nodeOffset >= (startNodeOffsetInState + numNodesInState);
}
inline bool hasMoreToRead() {
return dataFormat == common::ColumnDataFormat::CSR &&
posInCurrentCSR <
csrListEntries[(currentCSRNodeOffset - startNodeOffsetInState)].size;
return nodeOffset < startNodeOffset || nodeOffset >= (startNodeOffset + numNodes);
}
bool hasMoreToRead(transaction::Transaction* transaction);
void populateCSRListEntries();
std::pair<common::offset_t, common::offset_t> getStartAndEndOffset();

inline bool hasMoreToReadInPersistentStorage() {
return posInCurrentCSR < csrListEntries[(currentNodeOffset - startNodeOffset)].size;
}
bool hasMoreToReadFromLocalStorage();
bool trySwitchToLocalStorage();
};

class RelsStoreStats;
class LocalRelTableData;
struct CSRRelNGInfo;
class LocalRelNG;
class RelTableData final : public TableData {
public:
RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager,
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/scan/scan_multi_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void RelTableCollectionScanner::init() {
bool RelTableCollectionScanner::scan(ValueVector* inVector,
const std::vector<ValueVector*>& outputVectors, Transaction* transaction) {
while (true) {
if (readStates[currentTableIdx]->hasMoreToRead()) {
if (readStates[currentTableIdx]->hasMoreToRead(transaction)) {
KU_ASSERT(readStates[currentTableIdx]->dataFormat == ColumnDataFormat::CSR);
auto scanInfo = scanInfos[currentTableIdx].get();
scanInfo->table->read(
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/scan/scan_rel_csr_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace processor {

bool ScanRelCSRColumns::getNextTuplesInternal(ExecutionContext* context) {
while (true) {
if (scanState->hasMoreToRead()) {
if (scanState->hasMoreToRead(context->clientContext->getActiveTransaction())) {
info->table->read(transaction, *scanState, inVector, outVectors);
return true;
}
Expand Down
Loading

0 comments on commit 638b802

Please sign in to comment.