Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSR header: seprating offset and length #2601

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.1.0.2 LANGUAGES CXX C)
project(Kuzu VERSION 0.1.0.3 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ using node_group_idx_t = uint64_t;
constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX;
using partition_idx_t = uint64_t;
constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX;
using length_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
7 changes: 7 additions & 0 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ struct PartitionerSharedState {
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline common::DataChunkCollection* getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get();
}
};

struct PartitionerLocalState {
Expand Down
38 changes: 18 additions & 20 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ struct CopyRelInfo {
inline std::unique_ptr<CopyRelInfo> copy() { return std::make_unique<CopyRelInfo>(*this); }
};

class CopyRel;
class CopyRelSharedState {
friend class CopyRel;
struct CopyRelSharedState {
common::table_id_t tableID;
storage::RelTable* table;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
storage::RelsStoreStats* relsStatistics;
std::shared_ptr<FactorizedTable> fTable;
std::atomic<common::row_idx_t> numRows;

public:
CopyRelSharedState(common::table_id_t tableID, storage::RelTable* table,
std::vector<std::unique_ptr<common::LogicalType>> columnTypes,
storage::RelsStoreStats* relsStatistics, storage::MemoryManager* memoryManager);
Expand All @@ -47,16 +50,9 @@ class CopyRelSharedState {
numRows.fetch_add(numRowsToIncrement);
}
inline void updateRelsStatistics() { relsStatistics->setNumTuplesForTable(tableID, numRows); }

public:
std::shared_ptr<FactorizedTable> fTable;

private:
common::table_id_t tableID;
storage::RelTable* table;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
storage::RelsStoreStats* relsStatistics;
std::atomic<common::row_idx_t> numRows;
inline common::offset_t getNextRelOffset(transaction::Transaction* transaction) const {
return relsStatistics->getRelStatistics(tableID, transaction)->getNextRelOffset();
}
};

struct CopyRelLocalState {
Expand Down Expand Up @@ -92,14 +88,16 @@ class CopyRel : public Sink {

private:
inline bool isCopyAllowed() const {
return sharedState->relsStatistics
->getRelStatistics(
info->schema->tableID, transaction::Transaction::getDummyReadOnlyTrx().get())
->getNextRelOffset() == 0;
return sharedState->getNextRelOffset(
transaction::Transaction::getDummyReadOnlyTrx().get()) == 0;
}

static void populateCSROffsets(storage::ColumnChunk* csrOffsetChunk,
common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx);
void prepareCSRNodeGroup(common::DataChunkCollection* partition,
common::vector_idx_t offsetVectorIdx, common::offset_t numNodes);

static void populateCSROffsetsAndLengths(storage::CSRNodeGroup* csrNodeGroup,
common::offset_t numNodes, common::DataChunkCollection* partition,
common::vector_idx_t offsetVectorIdx);
static void setOffsetToWithinNodeGroup(
common::ValueVector* vector, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class NodeInsertExecutor;
class RelInsertExecutor;
class NodeSetExecutor;
class RelSetExecutor;
class CopyRelSharedState;
struct CopyRelSharedState;
struct PartitionerSharedState;

class PlanMapper {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class LocalRelNG final : public LocalNodeGroup {

class LocalRelTableData final : public LocalTableData {
friend class RelTableData;
friend class CSRRelTableData;

public:
LocalRelTableData(std::vector<common::LogicalType*> dataTypes, MemoryManager* mm,
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/stats/rel_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class RelTableStats : public TableStatistics {
return direction == common::RelDataDirection::FWD ? fwdCSROffsetMetadataDAHInfo.get() :
bwdCSROffsetMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdCSRLengthMetadataDAHInfo.get() :
bwdCSRLengthMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getAdjMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdNbrIDMetadataDAHInfo.get() :
bwdNbrIDMetadataDAHInfo.get();
Expand Down Expand Up @@ -74,6 +78,8 @@ class RelTableStats : public TableStatistics {
// CSROffsetMetadataDAHInfo are only valid for CSRColumns.
std::unique_ptr<MetadataDAHInfo> fwdCSROffsetMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> bwdCSROffsetMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> fwdCSRLengthMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> bwdCSRLengthMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> fwdNbrIDMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> bwdNbrIDMetadataDAHInfo;
std::vector<std::unique_ptr<MetadataDAHInfo>> fwdPropertyMetadataDAHInfos;
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/stats/rels_store_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RelsStoreStats : public TablesStatistics {
void removeMetadataDAHInfo(common::table_id_t tableID, common::column_id_t columnID);
MetadataDAHInfo* getCSROffsetMetadataDAHInfo(transaction::Transaction* transaction,
common::table_id_t tableID, common::RelDataDirection direction);
MetadataDAHInfo* getCSRLengthMetadataDAHInfo(transaction::Transaction* transaction,
common::table_id_t tableID, common::RelDataDirection direction);
MetadataDAHInfo* getAdjMetadataDAHInfo(transaction::Transaction* transaction,
common::table_id_t tableID, common::RelDataDirection direction);
MetadataDAHInfo* getPropertyMetadataDAHInfo(transaction::Transaction* transaction,
Expand Down
7 changes: 4 additions & 3 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ class StorageUtils {
OFFSET = 2, // This is used for offset columns in VAR_LIST and STRING columns.
DATA = 3, // This is used for data columns in VAR_LIST and STRING columns.
CSR_OFFSET = 4,
ADJ = 5,
STRUCT_CHILD = 6,
NULL_MASK = 7,
CSR_LENGTH = 5,
ADJ = 6,
STRUCT_CHILD = 7,
NULL_MASK = 8,
};

static std::string getColumnName(
Expand Down
26 changes: 17 additions & 9 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,26 @@ class NodeGroup {
common::row_idx_t numRows;
};

struct CSRHeaderChunks {
std::unique_ptr<ColumnChunk> offset;
std::unique_ptr<ColumnChunk> length;

void init(bool enableCompression);
};

class CSRNodeGroup : public NodeGroup {
public:
CSRNodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression)
// By default, initialize all column chunks except for the csrOffsetChunk to empty, as they
// should be resized after csr offset calculation (e.g., during CopyRel).
: NodeGroup{columnTypes, enableCompression, 0 /* capacity */} {
csrOffsetChunk =
ColumnChunkFactory::createColumnChunk(common::LogicalType::INT64(), enableCompression);
}
bool enableCompression);

inline ColumnChunk* getCSROffsetChunk() { return csrOffsetChunk.get(); }
inline ColumnChunk* getCSROffsetChunk() const {
KU_ASSERT(csrHeaderChunks.offset != nullptr);
return csrHeaderChunks.offset.get();
}
inline ColumnChunk* getCSRLengthChunk() const {
KU_ASSERT(csrHeaderChunks.length != nullptr);
return csrHeaderChunks.length.get();
}

inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx,
common::DataChunk* dataChunk, common::ValueVector* offsetVector) override {
Expand All @@ -71,7 +79,7 @@ class CSRNodeGroup : public NodeGroup {
}

private:
std::unique_ptr<ColumnChunk> csrOffsetChunk;
CSRHeaderChunks csrHeaderChunks;
};

struct NodeGroupFactory {
Expand Down
100 changes: 58 additions & 42 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#pragma once

#include "catalog/rel_table_schema.h"
#include "common/cast.h"
#include "storage/store/node_group.h"
#include "storage/store/table_data.h"

namespace kuzu {
namespace storage {

struct CSRHeaderColumns {
std::unique_ptr<Column> offset;
std::unique_ptr<Column> length;
};

class LocalRelNG;
struct RelDataReadState : public TableReadState {
common::RelDataDirection direction;
Expand All @@ -17,7 +22,7 @@ struct RelDataReadState : public TableReadState {
common::offset_t posInCurrentCSR;
std::vector<common::list_entry_t> csrListEntries;
// Temp auxiliary data structure to scan the offset of each CSR node in the offset column chunk.
std::unique_ptr<ColumnChunk> csrOffsetChunk;
CSRHeaderChunks csrHeaderChunks;

// Following fields are used for local storage.
bool readFromLocalStorage;
Expand All @@ -41,23 +46,19 @@ struct RelDataReadState : public TableReadState {
class RelsStoreStats;
class LocalRelTableData;
struct CSRRelNGInfo;
class RelTableData final : public TableData {
class RelTableData : public TableData {
public:
RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager,
WAL* wal, catalog::RelTableSchema* tableSchema, RelsStoreStats* relsStoreStats,
common::RelDataDirection direction, bool enableCompression);
common::RelDataDirection direction, bool enableCompression,
common::ColumnDataFormat dataFormat = common::ColumnDataFormat::REGULAR);

void initializeReadState(transaction::Transaction* transaction,
virtual void initializeReadState(transaction::Transaction* transaction,
std::vector<common::column_id_t> columnIDs, common::ValueVector* inNodeIDVector,
RelDataReadState* readState);
inline void scan(transaction::Transaction* transaction, TableReadState& readState,
void scan(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) override {
auto& relReadState = common::ku_dynamic_cast<TableReadState&, RelDataReadState&>(readState);
dataFormat == common::ColumnDataFormat::REGULAR ?
scanRegularColumns(transaction, relReadState, inNodeIDVector, outputVectors) :
scanCSRColumns(transaction, relReadState, inNodeIDVector, outputVectors);
}
const std::vector<common::ValueVector*>& outputVectors) override;
void lookup(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) override;
Expand All @@ -73,10 +74,10 @@ class RelTableData final : public TableData {
// we remove the restriction of flatten all tuples.
bool delete_(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector, common::ValueVector* relIDVector);
bool checkIfNodeHasRels(
virtual bool checkIfNodeHasRels(
transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector);
void append(NodeGroup* nodeGroup) override;
void resizeColumns(common::node_group_idx_t numNodeGroups);
virtual void resizeColumns(common::node_group_idx_t numNodeGroups);

inline Column* getAdjColumn() const { return adjColumn.get(); }
inline common::ColumnDataFormat getDataFormat() const { return dataFormat; }
Expand All @@ -91,50 +92,65 @@ class RelTableData final : public TableData {
return adjColumn->getNumNodeGroups(transaction);
}

private:
protected:
LocalRelNG* getLocalNodeGroup(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx);

void scanRegularColumns(transaction::Transaction* transaction, RelDataReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);
void scanCSRColumns(transaction::Transaction* transaction, RelDataReadState& readState,
private:
static inline common::vector_idx_t getDataIdxFromDirection(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? 0 : 1;
}

protected:
common::RelDataDirection direction;
std::unique_ptr<Column> adjColumn;
};

class CSRRelTableData final : public RelTableData {
public:
CSRRelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager,
WAL* wal, catalog::RelTableSchema* tableSchema, RelsStoreStats* relsStoreStats,
common::RelDataDirection direction, bool enableCompression);

void initializeReadState(transaction::Transaction* transaction,
std::vector<common::column_id_t> columnIDs, common::ValueVector* inNodeIDVector,
RelDataReadState* readState) override;
void scan(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);
const std::vector<common::ValueVector*>& outputVectors) override;

bool checkIfNodeHasRels(
transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector) override;
void append(NodeGroup* nodeGroup) override;
void resizeColumns(common::node_group_idx_t numNodeGroups) override;

void prepareLocalTableToCommit(
transaction::Transaction* transaction, LocalTableData* localTable) override;

void prepareCommitForRegularColumns(
transaction::Transaction* transaction, LocalRelTableData* localTableData);
void prepareCommitForCSRColumns(
transaction::Transaction* transaction, LocalRelTableData* localTableData);
void checkpointInMemory() override;
void rollbackInMemory() override;

private:
void prepareCommitCSRNGWithoutSliding(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup);
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk,
LocalRelNG* localNodeGroup);
void prepareCommitCSRNGWithSliding(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup);
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk,
LocalRelNG* localNodeGroup);

std::unique_ptr<ColumnChunk> slideCSROffsetChunk(
ColumnChunk* csrOffsetChunk, CSRRelNGInfo* relNodeGroupInfo);
std::pair<std::unique_ptr<ColumnChunk>, std::unique_ptr<ColumnChunk>> slideCSRAuxChunks(
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, CSRRelNGInfo* relNodeGroupInfo);
std::unique_ptr<ColumnChunk> slideCSRColumnChunk(transaction::Transaction* transaction,
ColumnChunk* csrOffsetChunk, ColumnChunk* slidedCSROffsetChunkForCheck,
ColumnChunk* relIDChunk, const offset_to_offset_to_row_idx_t& insertInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk,
ColumnChunk* slidedCSROffsetChunkForCheck, ColumnChunk* relIDChunk,
const offset_to_offset_to_row_idx_t& insertInfo,
const offset_to_offset_to_row_idx_t& updateInfo, const offset_to_offset_set_t& deleteInfo,
common::node_group_idx_t nodeGroupIdx, Column* column, LocalVectorCollection* localChunk);

static inline common::ColumnDataFormat getDataFormatFromSchema(
catalog::RelTableSchema* tableSchema, common::RelDataDirection direction) {
return tableSchema->isSingleMultiplicity(direction) ? common::ColumnDataFormat::REGULAR :
common::ColumnDataFormat::CSR;
}
static inline common::vector_idx_t getDataIdxFromDirection(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? 0 : 1;
}

private:
common::RelDataDirection direction;
std::unique_ptr<Column> adjColumn;
std::unique_ptr<Column> csrOffsetColumn;
CSRHeaderColumns csrHeaderColumns;
};

} // namespace storage
Expand Down
Loading