Skip to content

Commit

Permalink
csr header: seprating offset and length
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Dec 21, 2023
1 parent d591bff commit 0a8bbca
Show file tree
Hide file tree
Showing 18 changed files with 431 additions and 295 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.1.0.2 LANGUAGES CXX C)
project(Kuzu VERSION 0.1.0.3 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ using node_group_idx_t = uint64_t;
constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX;
using partition_idx_t = uint64_t;
constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX;
using length_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
7 changes: 7 additions & 0 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ struct PartitionerSharedState {
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline common::DataChunkCollection* getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get();
}
};

struct PartitionerLocalState {
Expand Down
38 changes: 18 additions & 20 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ struct CopyRelInfo {
inline std::unique_ptr<CopyRelInfo> copy() { return std::make_unique<CopyRelInfo>(*this); }
};

class CopyRel;
class CopyRelSharedState {
friend class CopyRel;
struct CopyRelSharedState {
common::table_id_t tableID;
storage::RelTable* table;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
storage::RelsStoreStats* relsStatistics;
std::shared_ptr<FactorizedTable> fTable;
std::atomic<common::row_idx_t> numRows;

public:
CopyRelSharedState(common::table_id_t tableID, storage::RelTable* table,
std::vector<std::unique_ptr<common::LogicalType>> columnTypes,
storage::RelsStoreStats* relsStatistics, storage::MemoryManager* memoryManager);
Expand All @@ -47,16 +50,9 @@ class CopyRelSharedState {
numRows.fetch_add(numRowsToIncrement);
}
inline void updateRelsStatistics() { relsStatistics->setNumTuplesForTable(tableID, numRows); }

inline std::shared_ptr<FactorizedTable> getFTable() { return fTable; }

private:
common::table_id_t tableID;
storage::RelTable* table;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
storage::RelsStoreStats* relsStatistics;
std::shared_ptr<FactorizedTable> fTable;
std::atomic<common::row_idx_t> numRows;
inline common::offset_t getNextRelOffset(transaction::Transaction* transaction) const {
return relsStatistics->getRelStatistics(tableID, transaction)->getNextRelOffset();
}
};

struct CopyRelLocalState {
Expand Down Expand Up @@ -90,14 +86,16 @@ class CopyRel : public Sink {

private:
inline bool isCopyAllowed() const {
return sharedState->relsStatistics
->getRelStatistics(
info->schema->tableID, transaction::Transaction::getDummyReadOnlyTrx().get())
->getNextRelOffset() == 0;
return sharedState->getNextRelOffset(
transaction::Transaction::getDummyReadOnlyTrx().get()) == 0;
}

static void populateCSROffsets(storage::ColumnChunk* csrOffsetChunk,
common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx);
void prepareCSRNodeGroup(common::DataChunkCollection* partition,
common::vector_idx_t offsetVectorIdx, common::offset_t numNodes);

static void populateCSROffsetsAndLengths(storage::CSRNodeGroup* csrNodeGroup,
common::offset_t numNodes, common::DataChunkCollection* partition,
common::vector_idx_t offsetVectorIdx);
static void setOffsetToWithinNodeGroup(
common::ValueVector* vector, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class LocalRelNG final : public LocalNodeGroup {

class LocalRelTableData final : public LocalTableData {
friend class RelTableData;
friend class CSRRelTableData;

public:
LocalRelTableData(std::vector<common::LogicalType*> dataTypes, MemoryManager* mm,
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/stats/rel_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class RelTableStats : public TableStatistics {
return direction == common::RelDataDirection::FWD ? fwdCSROffsetMetadataDAHInfo.get() :
bwdCSROffsetMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdCSRLengthMetadataDAHInfo.get() :
bwdCSRLengthMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getAdjMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdNbrIDMetadataDAHInfo.get() :
bwdNbrIDMetadataDAHInfo.get();
Expand Down Expand Up @@ -74,6 +78,8 @@ class RelTableStats : public TableStatistics {
// CSROffsetMetadataDAHInfo are only valid for CSRColumns.
std::unique_ptr<MetadataDAHInfo> fwdCSROffsetMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> bwdCSROffsetMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> fwdCSRLengthMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> bwdCSRLengthMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> fwdNbrIDMetadataDAHInfo;
std::unique_ptr<MetadataDAHInfo> bwdNbrIDMetadataDAHInfo;
std::vector<std::unique_ptr<MetadataDAHInfo>> fwdPropertyMetadataDAHInfos;
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/stats/rels_store_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RelsStoreStats : public TablesStatistics {
void removeMetadataDAHInfo(common::table_id_t tableID, common::column_id_t columnID);
MetadataDAHInfo* getCSROffsetMetadataDAHInfo(transaction::Transaction* transaction,
common::table_id_t tableID, common::RelDataDirection direction);
MetadataDAHInfo* getCSRLengthMetadataDAHInfo(transaction::Transaction* transaction,
common::table_id_t tableID, common::RelDataDirection direction);
MetadataDAHInfo* getAdjMetadataDAHInfo(transaction::Transaction* transaction,
common::table_id_t tableID, common::RelDataDirection direction);
MetadataDAHInfo* getPropertyMetadataDAHInfo(transaction::Transaction* transaction,
Expand Down
7 changes: 4 additions & 3 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ class StorageUtils {
OFFSET = 2, // This is used for offset columns in VAR_LIST and STRING columns.
DATA = 3, // This is used for data columns in VAR_LIST and STRING columns.
CSR_OFFSET = 4,
ADJ = 5,
STRUCT_CHILD = 6,
NULL_MASK = 7,
CSR_LENGTH = 5,
ADJ = 6,
STRUCT_CHILD = 7,
NULL_MASK = 8,
};

static std::string getColumnName(
Expand Down
26 changes: 17 additions & 9 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,26 @@ class NodeGroup {
common::row_idx_t numRows;
};

struct CSRHeaderChunks {
std::unique_ptr<ColumnChunk> offset;
std::unique_ptr<ColumnChunk> length;

void init(bool enableCompression);
};

class CSRNodeGroup : public NodeGroup {
public:
CSRNodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression)
// By default, initialize all column chunks except for the csrOffsetChunk to empty, as they
// should be resized after csr offset calculation (e.g., during CopyRel).
: NodeGroup{columnTypes, enableCompression, 0 /* capacity */} {
csrOffsetChunk =
ColumnChunkFactory::createColumnChunk(common::LogicalType::INT64(), enableCompression);
}
bool enableCompression);

inline ColumnChunk* getCSROffsetChunk() { return csrOffsetChunk.get(); }
inline ColumnChunk* getCSROffsetChunk() const {
KU_ASSERT(csrHeaderChunks.offset != nullptr);
return csrHeaderChunks.offset.get();
}
inline ColumnChunk* getCSRLengthChunk() const {
KU_ASSERT(csrHeaderChunks.length != nullptr);
return csrHeaderChunks.length.get();
}

inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx,
common::DataChunk* dataChunk, common::ValueVector* offsetVector) override {
Expand All @@ -71,7 +79,7 @@ class CSRNodeGroup : public NodeGroup {
}

private:
std::unique_ptr<ColumnChunk> csrOffsetChunk;
CSRHeaderChunks csrHeaderChunks;
};

struct NodeGroupFactory {
Expand Down
100 changes: 58 additions & 42 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#pragma once

#include "catalog/rel_table_schema.h"
#include "common/cast.h"
#include "storage/store/node_group.h"
#include "storage/store/table_data.h"

namespace kuzu {
namespace storage {

struct CSRHeaderColumns {
std::unique_ptr<Column> offset;
std::unique_ptr<Column> length;
};

class LocalRelNG;
struct RelDataReadState : public TableReadState {
common::RelDataDirection direction;
Expand All @@ -17,7 +22,7 @@ struct RelDataReadState : public TableReadState {
common::offset_t posInCurrentCSR;
std::vector<common::list_entry_t> csrListEntries;
// Temp auxiliary data structure to scan the offset of each CSR node in the offset column chunk.
std::unique_ptr<ColumnChunk> csrOffsetChunk;
CSRHeaderChunks csrHeaderChunks;

// Following fields are used for local storage.
bool readFromLocalStorage;
Expand All @@ -41,23 +46,19 @@ struct RelDataReadState : public TableReadState {
class RelsStoreStats;
class LocalRelTableData;
struct CSRRelNGInfo;
class RelTableData final : public TableData {
class RelTableData : public TableData {
public:
RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager,
WAL* wal, catalog::RelTableSchema* tableSchema, RelsStoreStats* relsStoreStats,
common::RelDataDirection direction, bool enableCompression);
common::RelDataDirection direction, bool enableCompression,
common::ColumnDataFormat dataFormat = common::ColumnDataFormat::REGULAR);

void initializeReadState(transaction::Transaction* transaction,
virtual void initializeReadState(transaction::Transaction* transaction,
std::vector<common::column_id_t> columnIDs, common::ValueVector* inNodeIDVector,
RelDataReadState* readState);
inline void scan(transaction::Transaction* transaction, TableReadState& readState,
void scan(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) override {
auto& relReadState = common::ku_dynamic_cast<TableReadState&, RelDataReadState&>(readState);
dataFormat == common::ColumnDataFormat::REGULAR ?
scanRegularColumns(transaction, relReadState, inNodeIDVector, outputVectors) :
scanCSRColumns(transaction, relReadState, inNodeIDVector, outputVectors);
}
const std::vector<common::ValueVector*>& outputVectors) override;
void lookup(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) override;
Expand All @@ -73,10 +74,10 @@ class RelTableData final : public TableData {
// we remove the restriction of flatten all tuples.
bool delete_(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector, common::ValueVector* relIDVector);
bool checkIfNodeHasRels(
virtual bool checkIfNodeHasRels(
transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector);
void append(NodeGroup* nodeGroup) override;
void resizeColumns(common::node_group_idx_t numNodeGroups);
virtual void resizeColumns(common::node_group_idx_t numNodeGroups);

inline Column* getAdjColumn() const { return adjColumn.get(); }
inline common::ColumnDataFormat getDataFormat() const { return dataFormat; }
Expand All @@ -91,50 +92,65 @@ class RelTableData final : public TableData {
return adjColumn->getNumNodeGroups(transaction);
}

private:
protected:
LocalRelNG* getLocalNodeGroup(
transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx);

void scanRegularColumns(transaction::Transaction* transaction, RelDataReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);
void scanCSRColumns(transaction::Transaction* transaction, RelDataReadState& readState,
private:
static inline common::vector_idx_t getDataIdxFromDirection(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? 0 : 1;
}

protected:
common::RelDataDirection direction;
std::unique_ptr<Column> adjColumn;
};

class CSRRelTableData final : public RelTableData {
public:
CSRRelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager,
WAL* wal, catalog::RelTableSchema* tableSchema, RelsStoreStats* relsStoreStats,
common::RelDataDirection direction, bool enableCompression);

void initializeReadState(transaction::Transaction* transaction,
std::vector<common::column_id_t> columnIDs, common::ValueVector* inNodeIDVector,
RelDataReadState* readState) override;
void scan(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);
const std::vector<common::ValueVector*>& outputVectors) override;

bool checkIfNodeHasRels(
transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector) override;
void append(NodeGroup* nodeGroup) override;
void resizeColumns(common::node_group_idx_t numNodeGroups) override;

void prepareLocalTableToCommit(
transaction::Transaction* transaction, LocalTableData* localTable) override;

void prepareCommitForRegularColumns(
transaction::Transaction* transaction, LocalRelTableData* localTableData);
void prepareCommitForCSRColumns(
transaction::Transaction* transaction, LocalRelTableData* localTableData);
void checkpointInMemory() override;
void rollbackInMemory() override;

private:
void prepareCommitCSRNGWithoutSliding(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup);
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk,
LocalRelNG* localNodeGroup);
void prepareCommitCSRNGWithSliding(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup);
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk,
LocalRelNG* localNodeGroup);

std::unique_ptr<ColumnChunk> slideCSROffsetChunk(
ColumnChunk* csrOffsetChunk, CSRRelNGInfo* relNodeGroupInfo);
std::pair<std::unique_ptr<ColumnChunk>, std::unique_ptr<ColumnChunk>> slideCSRAuxChunks(
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, CSRRelNGInfo* relNodeGroupInfo);
std::unique_ptr<ColumnChunk> slideCSRColumnChunk(transaction::Transaction* transaction,
ColumnChunk* csrOffsetChunk, ColumnChunk* slidedCSROffsetChunkForCheck,
ColumnChunk* relIDChunk, const offset_to_offset_to_row_idx_t& insertInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk,
ColumnChunk* slidedCSROffsetChunkForCheck, ColumnChunk* relIDChunk,
const offset_to_offset_to_row_idx_t& insertInfo,
const offset_to_offset_to_row_idx_t& updateInfo, const offset_to_offset_set_t& deleteInfo,
common::node_group_idx_t nodeGroupIdx, Column* column, LocalVectorCollection* localChunk);

static inline common::ColumnDataFormat getDataFormatFromSchema(
catalog::RelTableSchema* tableSchema, common::RelDataDirection direction) {
return tableSchema->isSingleMultiplicity(direction) ? common::ColumnDataFormat::REGULAR :
common::ColumnDataFormat::CSR;
}
static inline common::vector_idx_t getDataIdxFromDirection(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? 0 : 1;
}

private:
common::RelDataDirection direction;
std::unique_ptr<Column> adjColumn;
std::unique_ptr<Column> csrOffsetColumn;
CSRHeaderColumns csrHeaderColumns;
};

} // namespace storage
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyRelFrom(
auto copyRelBWD =
createCopyRel(partitionerSharedState, copyRelSharedState, copyFrom, RelDataDirection::BWD);
auto outputExpressions = expression_vector{copyFrom->getOutputExpression()->copy()};
auto fTableScan = createFactorizedTableScanAligned(outputExpressions, outFSchema,
copyRelSharedState->getFTable(), DEFAULT_VECTOR_CAPACITY /* maxMorselSize */,
std::move(copyRelBWD));
auto fTableScan =
createFactorizedTableScanAligned(outputExpressions, outFSchema, copyRelSharedState->fTable,
DEFAULT_VECTOR_CAPACITY /* maxMorselSize */, std::move(copyRelBWD));
// Pipelines are scheduled as the order: partitioner -> copyRelFWD -> copyRelBWD.
fTableScan->addChild(std::move(copyRelFWD));
fTableScan->addChild(std::move(prevOperator));
Expand Down
Loading

0 comments on commit 0a8bbca

Please sign in to comment.