From 0ce0c87113dc920f3a89c6c378e2fd8cfe8301cc Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Wed, 20 Dec 2023 20:16:44 -0500 Subject: [PATCH] csr header: seprating offset and length --- CMakeLists.txt | 2 +- src/include/common/types/types.h | 1 + src/include/processor/operator/partitioner.h | 7 + .../processor/operator/persistent/copy_rel.h | 38 +- src/include/processor/plan_mapper.h | 2 +- .../storage/local_storage/local_rel_table.h | 1 + .../storage/stats/rel_table_statistics.h | 6 + .../storage/stats/rels_store_statistics.h | 2 + src/include/storage/storage_utils.h | 7 +- src/include/storage/store/node_group.h | 26 +- src/include/storage/store/rel_table_data.h | 100 +++-- .../operator/persistent/copy_rel.cpp | 75 ++-- src/storage/stats/rel_table_statistics.cpp | 15 +- src/storage/stats/rels_store_statistics.cpp | 9 + src/storage/storage_structure/disk_array.cpp | 2 +- src/storage/store/node_group.cpp | 15 + src/storage/store/rel_table.cpp | 22 +- src/storage/store/rel_table_data.cpp | 392 ++++++++++-------- 18 files changed, 429 insertions(+), 293 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 245d6b386a..3f187194bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15) -project(Kuzu VERSION 0.1.0.2 LANGUAGES CXX C) +project(Kuzu VERSION 0.1.0.3 LANGUAGES CXX C) find_package(Threads REQUIRED) diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 89ddc2bafc..54c93b66c3 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -45,6 +45,7 @@ using node_group_idx_t = uint64_t; constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX; using partition_idx_t = uint64_t; constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX; +using length_t = uint64_t; // System representation for a variable-sized overflow value. struct overflow_value_t { diff --git a/src/include/processor/operator/partitioner.h b/src/include/processor/operator/partitioner.h index 162911f90f..36edda2284 100644 --- a/src/include/processor/operator/partitioner.h +++ b/src/include/processor/operator/partitioner.h @@ -45,6 +45,13 @@ struct PartitionerSharedState { common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx); void resetState(); void merge(std::vector> localPartitioningStates); + + inline common::DataChunkCollection* getPartitionBuffer( + common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) { + KU_ASSERT(partitioningIdx < partitioningBuffers.size()); + KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size()); + return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get(); + } }; struct PartitionerLocalState { diff --git a/src/include/processor/operator/persistent/copy_rel.h b/src/include/processor/operator/persistent/copy_rel.h index 2e3f2e83e4..8fe5cf50e8 100644 --- a/src/include/processor/operator/persistent/copy_rel.h +++ b/src/include/processor/operator/persistent/copy_rel.h @@ -33,11 +33,14 @@ struct CopyRelInfo { inline std::unique_ptr copy() { return std::make_unique(*this); } }; -class CopyRel; -class CopyRelSharedState { - friend class CopyRel; +struct CopyRelSharedState { + common::table_id_t tableID; + storage::RelTable* table; + std::vector> columnTypes; + storage::RelsStoreStats* relsStatistics; + std::shared_ptr fTable; + std::atomic numRows; -public: CopyRelSharedState(common::table_id_t tableID, storage::RelTable* table, std::vector> columnTypes, storage::RelsStoreStats* relsStatistics, storage::MemoryManager* memoryManager); @@ -47,16 +50,9 @@ class CopyRelSharedState { numRows.fetch_add(numRowsToIncrement); } inline void updateRelsStatistics() { relsStatistics->setNumTuplesForTable(tableID, numRows); } - -public: - std::shared_ptr fTable; - -private: - common::table_id_t tableID; - storage::RelTable* table; - std::vector> columnTypes; - storage::RelsStoreStats* relsStatistics; - std::atomic numRows; + inline common::offset_t getNextRelOffset(transaction::Transaction* transaction) const { + return relsStatistics->getRelStatistics(tableID, transaction)->getNextRelOffset(); + } }; struct CopyRelLocalState { @@ -92,14 +88,16 @@ class CopyRel : public Sink { private: inline bool isCopyAllowed() const { - return sharedState->relsStatistics - ->getRelStatistics( - info->schema->tableID, transaction::Transaction::getDummyReadOnlyTrx().get()) - ->getNextRelOffset() == 0; + return sharedState->getNextRelOffset( + transaction::Transaction::getDummyReadOnlyTrx().get()) == 0; } - static void populateCSROffsets(storage::ColumnChunk* csrOffsetChunk, - common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx); + void prepareCSRNodeGroup(common::DataChunkCollection* partition, + common::vector_idx_t offsetVectorIdx, common::offset_t numNodes); + + static void populateCSROffsetsAndLengths(storage::CSRNodeGroup* csrNodeGroup, + common::offset_t numNodes, common::DataChunkCollection* partition, + common::vector_idx_t offsetVectorIdx); static void setOffsetToWithinNodeGroup( common::ValueVector* vector, common::offset_t startOffset); static void setOffsetFromCSROffsets( diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index 3123212261..3412d3bc8b 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -24,7 +24,7 @@ class NodeInsertExecutor; class RelInsertExecutor; class NodeSetExecutor; class RelSetExecutor; -class CopyRelSharedState; +struct CopyRelSharedState; struct PartitionerSharedState; class PlanMapper { diff --git a/src/include/storage/local_storage/local_rel_table.h b/src/include/storage/local_storage/local_rel_table.h index 21519d484d..84ad39a1cf 100644 --- a/src/include/storage/local_storage/local_rel_table.h +++ b/src/include/storage/local_storage/local_rel_table.h @@ -134,6 +134,7 @@ class LocalRelNG final : public LocalNodeGroup { class LocalRelTableData final : public LocalTableData { friend class RelTableData; + friend class CSRRelTableData; public: LocalRelTableData(std::vector dataTypes, MemoryManager* mm, diff --git a/src/include/storage/stats/rel_table_statistics.h b/src/include/storage/stats/rel_table_statistics.h index 85352553be..6af43fa474 100644 --- a/src/include/storage/stats/rel_table_statistics.h +++ b/src/include/storage/stats/rel_table_statistics.h @@ -43,6 +43,10 @@ class RelTableStats : public TableStatistics { return direction == common::RelDataDirection::FWD ? fwdCSROffsetMetadataDAHInfo.get() : bwdCSROffsetMetadataDAHInfo.get(); } + inline MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) { + return direction == common::RelDataDirection::FWD ? fwdCSRLengthMetadataDAHInfo.get() : + bwdCSRLengthMetadataDAHInfo.get(); + } inline MetadataDAHInfo* getAdjMetadataDAHInfo(common::RelDataDirection direction) { return direction == common::RelDataDirection::FWD ? fwdNbrIDMetadataDAHInfo.get() : bwdNbrIDMetadataDAHInfo.get(); @@ -74,6 +78,8 @@ class RelTableStats : public TableStatistics { // CSROffsetMetadataDAHInfo are only valid for CSRColumns. std::unique_ptr fwdCSROffsetMetadataDAHInfo; std::unique_ptr bwdCSROffsetMetadataDAHInfo; + std::unique_ptr fwdCSRLengthMetadataDAHInfo; + std::unique_ptr bwdCSRLengthMetadataDAHInfo; std::unique_ptr fwdNbrIDMetadataDAHInfo; std::unique_ptr bwdNbrIDMetadataDAHInfo; std::vector> fwdPropertyMetadataDAHInfos; diff --git a/src/include/storage/stats/rels_store_statistics.h b/src/include/storage/stats/rels_store_statistics.h index 461fe5f795..48d7b588b1 100644 --- a/src/include/storage/stats/rels_store_statistics.h +++ b/src/include/storage/stats/rels_store_statistics.h @@ -46,6 +46,8 @@ class RelsStoreStats : public TablesStatistics { void removeMetadataDAHInfo(common::table_id_t tableID, common::column_id_t columnID); MetadataDAHInfo* getCSROffsetMetadataDAHInfo(transaction::Transaction* transaction, common::table_id_t tableID, common::RelDataDirection direction); + MetadataDAHInfo* getCSRLengthMetadataDAHInfo(transaction::Transaction* transaction, + common::table_id_t tableID, common::RelDataDirection direction); MetadataDAHInfo* getAdjMetadataDAHInfo(transaction::Transaction* transaction, common::table_id_t tableID, common::RelDataDirection direction); MetadataDAHInfo* getPropertyMetadataDAHInfo(transaction::Transaction* transaction, diff --git a/src/include/storage/storage_utils.h b/src/include/storage/storage_utils.h index 45036d4f6a..f2b37d065e 100644 --- a/src/include/storage/storage_utils.h +++ b/src/include/storage/storage_utils.h @@ -89,9 +89,10 @@ class StorageUtils { OFFSET = 2, // This is used for offset columns in VAR_LIST and STRING columns. DATA = 3, // This is used for data columns in VAR_LIST and STRING columns. CSR_OFFSET = 4, - ADJ = 5, - STRUCT_CHILD = 6, - NULL_MASK = 7, + CSR_LENGTH = 5, + ADJ = 6, + STRUCT_CHILD = 7, + NULL_MASK = 8, }; static std::string getColumnName( diff --git a/src/include/storage/store/node_group.h b/src/include/storage/store/node_group.h index 22cd2b3d11..ca9e88c0b5 100644 --- a/src/include/storage/store/node_group.h +++ b/src/include/storage/store/node_group.h @@ -51,18 +51,26 @@ class NodeGroup { common::row_idx_t numRows; }; +struct CSRHeaderChunks { + std::unique_ptr offset; + std::unique_ptr length; + + void init(bool enableCompression); +}; + class CSRNodeGroup : public NodeGroup { public: CSRNodeGroup(const std::vector>& columnTypes, - bool enableCompression) - // By default, initialize all column chunks except for the csrOffsetChunk to empty, as they - // should be resized after csr offset calculation (e.g., during CopyRel). - : NodeGroup{columnTypes, enableCompression, 0 /* capacity */} { - csrOffsetChunk = - ColumnChunkFactory::createColumnChunk(common::LogicalType::INT64(), enableCompression); - } + bool enableCompression); - inline ColumnChunk* getCSROffsetChunk() { return csrOffsetChunk.get(); } + inline ColumnChunk* getCSROffsetChunk() const { + KU_ASSERT(csrHeaderChunks.offset != nullptr); + return csrHeaderChunks.offset.get(); + } + inline ColumnChunk* getCSRLengthChunk() const { + KU_ASSERT(csrHeaderChunks.length != nullptr); + return csrHeaderChunks.length.get(); + } inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx, common::DataChunk* dataChunk, common::ValueVector* offsetVector) override { @@ -71,7 +79,7 @@ class CSRNodeGroup : public NodeGroup { } private: - std::unique_ptr csrOffsetChunk; + CSRHeaderChunks csrHeaderChunks; }; struct NodeGroupFactory { diff --git a/src/include/storage/store/rel_table_data.h b/src/include/storage/store/rel_table_data.h index b3592f003d..fd8b73b774 100644 --- a/src/include/storage/store/rel_table_data.h +++ b/src/include/storage/store/rel_table_data.h @@ -1,12 +1,17 @@ #pragma once #include "catalog/rel_table_schema.h" -#include "common/cast.h" +#include "storage/store/node_group.h" #include "storage/store/table_data.h" namespace kuzu { namespace storage { +struct CSRHeaderColumns { + std::unique_ptr offset; + std::unique_ptr length; +}; + class LocalRelNG; struct RelDataReadState : public TableReadState { common::RelDataDirection direction; @@ -17,7 +22,7 @@ struct RelDataReadState : public TableReadState { common::offset_t posInCurrentCSR; std::vector csrListEntries; // Temp auxiliary data structure to scan the offset of each CSR node in the offset column chunk. - std::unique_ptr csrOffsetChunk; + CSRHeaderChunks csrHeaderChunks; // Following fields are used for local storage. bool readFromLocalStorage; @@ -41,23 +46,19 @@ struct RelDataReadState : public TableReadState { class RelsStoreStats; class LocalRelTableData; struct CSRRelNGInfo; -class RelTableData final : public TableData { +class RelTableData : public TableData { public: RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, catalog::RelTableSchema* tableSchema, RelsStoreStats* relsStoreStats, - common::RelDataDirection direction, bool enableCompression); + common::RelDataDirection direction, bool enableCompression, + common::ColumnDataFormat dataFormat = common::ColumnDataFormat::REGULAR); - void initializeReadState(transaction::Transaction* transaction, + virtual void initializeReadState(transaction::Transaction* transaction, std::vector columnIDs, common::ValueVector* inNodeIDVector, RelDataReadState* readState); - inline void scan(transaction::Transaction* transaction, TableReadState& readState, + void scan(transaction::Transaction* transaction, TableReadState& readState, common::ValueVector* inNodeIDVector, - const std::vector& outputVectors) override { - auto& relReadState = common::ku_dynamic_cast(readState); - dataFormat == common::ColumnDataFormat::REGULAR ? - scanRegularColumns(transaction, relReadState, inNodeIDVector, outputVectors) : - scanCSRColumns(transaction, relReadState, inNodeIDVector, outputVectors); - } + const std::vector& outputVectors) override; void lookup(transaction::Transaction* transaction, TableReadState& readState, common::ValueVector* inNodeIDVector, const std::vector& outputVectors) override; @@ -73,10 +74,10 @@ class RelTableData final : public TableData { // we remove the restriction of flatten all tuples. bool delete_(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector, common::ValueVector* relIDVector); - bool checkIfNodeHasRels( + virtual bool checkIfNodeHasRels( transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector); void append(NodeGroup* nodeGroup) override; - void resizeColumns(common::node_group_idx_t numNodeGroups); + virtual void resizeColumns(common::node_group_idx_t numNodeGroups); inline Column* getAdjColumn() const { return adjColumn.get(); } inline common::ColumnDataFormat getDataFormat() const { return dataFormat; } @@ -91,50 +92,65 @@ class RelTableData final : public TableData { return adjColumn->getNumNodeGroups(transaction); } -private: +protected: LocalRelNG* getLocalNodeGroup( transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx); - void scanRegularColumns(transaction::Transaction* transaction, RelDataReadState& readState, - common::ValueVector* inNodeIDVector, - const std::vector& outputVectors); - void scanCSRColumns(transaction::Transaction* transaction, RelDataReadState& readState, +private: + static inline common::vector_idx_t getDataIdxFromDirection(common::RelDataDirection direction) { + return direction == common::RelDataDirection::FWD ? 0 : 1; + } + +protected: + common::RelDataDirection direction; + std::unique_ptr adjColumn; +}; + +class CSRRelTableData final : public RelTableData { +public: + CSRRelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, + WAL* wal, catalog::RelTableSchema* tableSchema, RelsStoreStats* relsStoreStats, + common::RelDataDirection direction, bool enableCompression); + + void initializeReadState(transaction::Transaction* transaction, + std::vector columnIDs, common::ValueVector* inNodeIDVector, + RelDataReadState* readState) override; + void scan(transaction::Transaction* transaction, TableReadState& readState, common::ValueVector* inNodeIDVector, - const std::vector& outputVectors); + const std::vector& outputVectors) override; + + bool checkIfNodeHasRels( + transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector) override; + void append(NodeGroup* nodeGroup) override; + void resizeColumns(common::node_group_idx_t numNodeGroups) override; + + void prepareLocalTableToCommit( + transaction::Transaction* transaction, LocalTableData* localTable) override; - void prepareCommitForRegularColumns( - transaction::Transaction* transaction, LocalRelTableData* localTableData); - void prepareCommitForCSRColumns( - transaction::Transaction* transaction, LocalRelTableData* localTableData); + void checkpointInMemory() override; + void rollbackInMemory() override; +private: void prepareCommitCSRNGWithoutSliding(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo, - ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup); + ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk, + LocalRelNG* localNodeGroup); void prepareCommitCSRNGWithSliding(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo, - ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup); + ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk, + LocalRelNG* localNodeGroup); - std::unique_ptr slideCSROffsetChunk( - ColumnChunk* csrOffsetChunk, CSRRelNGInfo* relNodeGroupInfo); + std::pair, std::unique_ptr> slideCSRAuxChunks( + ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, CSRRelNGInfo* relNodeGroupInfo); std::unique_ptr slideCSRColumnChunk(transaction::Transaction* transaction, - ColumnChunk* csrOffsetChunk, ColumnChunk* slidedCSROffsetChunkForCheck, - ColumnChunk* relIDChunk, const offset_to_offset_to_row_idx_t& insertInfo, + ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, + ColumnChunk* slidedCSROffsetChunkForCheck, ColumnChunk* relIDChunk, + const offset_to_offset_to_row_idx_t& insertInfo, const offset_to_offset_to_row_idx_t& updateInfo, const offset_to_offset_set_t& deleteInfo, common::node_group_idx_t nodeGroupIdx, Column* column, LocalVectorCollection* localChunk); - static inline common::ColumnDataFormat getDataFormatFromSchema( - catalog::RelTableSchema* tableSchema, common::RelDataDirection direction) { - return tableSchema->isSingleMultiplicity(direction) ? common::ColumnDataFormat::REGULAR : - common::ColumnDataFormat::CSR; - } - static inline common::vector_idx_t getDataIdxFromDirection(common::RelDataDirection direction) { - return direction == common::RelDataDirection::FWD ? 0 : 1; - } - private: - common::RelDataDirection direction; - std::unique_ptr adjColumn; - std::unique_ptr csrOffsetColumn; + CSRHeaderColumns csrHeaderColumns; }; } // namespace storage diff --git a/src/processor/operator/persistent/copy_rel.cpp b/src/processor/operator/persistent/copy_rel.cpp index 37ae33d1e4..cd51fe3fc8 100644 --- a/src/processor/operator/persistent/copy_rel.cpp +++ b/src/processor/operator/persistent/copy_rel.cpp @@ -51,40 +51,25 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) { break; } // Read the whole partition, and set to node group. - auto partitioningBuffer = partitionerSharedState->partitioningBuffers[info->partitioningIdx] - ->partitions[localState->currentPartition] - .get(); + auto partitioningBuffer = partitionerSharedState->getPartitionBuffer( + info->partitioningIdx, localState->currentPartition); auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(localState->currentPartition); auto offsetVectorIdx = info->dataDirection == RelDataDirection::FWD ? 0 : 1; - row_idx_t numRels = 0; for (auto dataChunk : partitioningBuffer->getChunks()) { - auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get(); - setOffsetToWithinNodeGroup(offsetVector, startOffset); - numRels += offsetVector->state->selVector->selectedSize; + setOffsetToWithinNodeGroup( + dataChunk->getValueVector(offsetVectorIdx).get(), startOffset); } - ColumnChunk* csrOffsetChunk = nullptr; // Calculate num of source nodes in this node group. // This will be used to set the num of values of the node group. auto numNodes = std::min(StorageConstants::NODE_GROUP_SIZE, partitionerSharedState->maxNodeOffsets[info->partitioningIdx] - startOffset + 1); if (info->dataFormat == ColumnDataFormat::CSR) { - auto csrNodeGroup = static_cast(localState->nodeGroup.get()); - csrOffsetChunk = csrNodeGroup->getCSROffsetChunk(); - // CSR offset chunk should be aligned with num of source nodes in this node group. - csrOffsetChunk->setNumValues(numNodes); - populateCSROffsets(csrOffsetChunk, partitioningBuffer, offsetVectorIdx); - // Resize csr data column chunks. - localState->nodeGroup->resizeChunks(numRels); + prepareCSRNodeGroup(partitioningBuffer, offsetVectorIdx, numNodes); } else { localState->nodeGroup->setAllNull(); localState->nodeGroup->getColumnChunk(0)->setNumValues(numNodes); } for (auto dataChunk : partitioningBuffer->getChunks()) { - if (info->dataFormat == ColumnDataFormat::CSR) { - KU_ASSERT(csrOffsetChunk); - auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get(); - setOffsetFromCSROffsets(offsetVector, (offset_t*)csrOffsetChunk->getData()); - } localState->nodeGroup->write(dataChunk, offsetVectorIdx); } localState->nodeGroup->finalize(localState->currentPartition); @@ -95,31 +80,53 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) { } } -void CopyRel::populateCSROffsets( - ColumnChunk* csrOffsetChunk, DataChunkCollection* partition, vector_idx_t offsetVectorIdx) { +void CopyRel::prepareCSRNodeGroup( + DataChunkCollection* partition, vector_idx_t offsetVectorIdx, offset_t numNodes) { + auto csrNodeGroup = ku_dynamic_cast(localState->nodeGroup.get()); + auto csrOffsetChunk = csrNodeGroup->getCSROffsetChunk(); + csrOffsetChunk->setNumValues(numNodes); + csrNodeGroup->getCSRLengthChunk()->setNumValues(numNodes); + populateCSROffsetsAndLengths(csrNodeGroup, numNodes, partition, offsetVectorIdx); + // Resize csr data column chunks. + offset_t numRels = 0; + for (auto dataChunk : partition->getChunks()) { + numRels += dataChunk->getValueVector(offsetVectorIdx).get()->state->selVector->selectedSize; + } + localState->nodeGroup->resizeChunks(numRels); + for (auto dataChunk : partition->getChunks()) { + auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get(); + setOffsetFromCSROffsets(offsetVector, (offset_t*)csrOffsetChunk->getData()); + } +} + +void CopyRel::populateCSROffsetsAndLengths(CSRNodeGroup* csrNodeGroup, offset_t numNodes, + common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx) { + auto csrOffsetChunk = csrNodeGroup->getCSROffsetChunk(); + csrOffsetChunk->setNumValues(numNodes); + auto csrLengthChunk = csrNodeGroup->getCSRLengthChunk(); + csrLengthChunk->setNumValues(numNodes); auto csrOffsets = (offset_t*)csrOffsetChunk->getData(); + auto csrLengths = (length_t*)csrLengthChunk->getData(); std::fill(csrOffsets, csrOffsets + csrOffsetChunk->getCapacity(), 0); - // Calculate num of tuples for each node. Store the num of tuples of node i at csrOffsets[i+1]. + std::fill(csrLengths, csrLengths + csrLengthChunk->getCapacity(), 0); + // Calculate length for each node. Store the num of tuples of node i at csrOffsets[i]. for (auto chunk : partition->getChunks()) { auto offsetVector = chunk->getValueVector(offsetVectorIdx); for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) { auto pos = offsetVector->state->selVector->selectedPositions[i]; auto nodeOffset = offsetVector->getValue(pos); - if (nodeOffset >= StorageConstants::NODE_GROUP_SIZE - 1) { - continue; - } - csrOffsets[nodeOffset + 1]++; + csrLengths[nodeOffset]++; } } // Calculate starting offset of each node. for (auto i = 1u; i < csrOffsetChunk->getCapacity(); i++) { - csrOffsets[i] = csrOffsets[i] + csrOffsets[i - 1]; + csrOffsets[i] = csrOffsets[i - 1] + csrLengths[i - 1]; } } void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) { - KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64); - KU_ASSERT(vector->state->selVector->isUnfiltered()); + KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && + vector->state->selVector->isUnfiltered()); auto offsets = (offset_t*)vector->getData(); for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { offsets[i] -= startOffset; @@ -127,8 +134,8 @@ void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffs } void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, offset_t* csrOffsets) { - KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64); - KU_ASSERT(offsetVector->state->selVector->isUnfiltered()); + KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && + offsetVector->state->selVector->isUnfiltered()); for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) { auto nodeOffset = offsetVector->getValue(i); offsetVector->setValue(i, csrOffsets[nodeOffset]); @@ -137,14 +144,14 @@ void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, offset_t* csrOf } void CopyRel::finalize(ExecutionContext* context) { - if (info->partitioningIdx == 1) { + if (info->partitioningIdx == partitionerSharedState->partitioningBuffers.size() - 1) { sharedState->updateRelsStatistics(); auto outputMsg = stringFormat("{} number of tuples has been copied to table {}.", sharedState->numRows.load(), info->schema->tableName); FactorizedTableUtils::appendStringToTable( sharedState->fTable.get(), outputMsg, context->memoryManager); } - sharedState->numRows = 0; + sharedState->numRows.store(0); partitionerSharedState->resetState(); partitionerSharedState->partitioningBuffers[info->partitioningIdx].reset(); } diff --git a/src/storage/stats/rel_table_statistics.cpp b/src/storage/stats/rel_table_statistics.cpp index cb3a8f1c41..fcd81a27ee 100644 --- a/src/storage/stats/rel_table_statistics.cpp +++ b/src/storage/stats/rel_table_statistics.cpp @@ -19,10 +19,14 @@ RelTableStats::RelTableStats( if (!relTableSchema.isSingleMultiplicity(RelDataDirection::FWD)) { fwdCSROffsetMetadataDAHInfo = TablesStatistics::createMetadataDAHInfo( LogicalType{LogicalTypeID::INT64}, *metadataFH, bufferManager, wal); + fwdCSRLengthMetadataDAHInfo = TablesStatistics::createMetadataDAHInfo( + LogicalType{LogicalTypeID::INT64}, *metadataFH, bufferManager, wal); } if (!relTableSchema.isSingleMultiplicity(RelDataDirection::BWD)) { bwdCSROffsetMetadataDAHInfo = TablesStatistics::createMetadataDAHInfo( LogicalType{LogicalTypeID::INT64}, *metadataFH, bufferManager, wal); + bwdCSRLengthMetadataDAHInfo = TablesStatistics::createMetadataDAHInfo( + LogicalType{LogicalTypeID::INT64}, *metadataFH, bufferManager, wal); } fwdNbrIDMetadataDAHInfo = TablesStatistics::createMetadataDAHInfo( LogicalType{LogicalTypeID::INTERNAL_ID}, *metadataFH, bufferManager, wal); @@ -44,9 +48,11 @@ RelTableStats::RelTableStats(const RelTableStats& other) : TableStatistics{other nextRelOffset = other.nextRelOffset; if (other.fwdCSROffsetMetadataDAHInfo) { fwdCSROffsetMetadataDAHInfo = other.fwdCSROffsetMetadataDAHInfo->copy(); + fwdCSRLengthMetadataDAHInfo = other.fwdCSRLengthMetadataDAHInfo->copy(); } if (other.bwdCSROffsetMetadataDAHInfo) { bwdCSROffsetMetadataDAHInfo = other.bwdCSROffsetMetadataDAHInfo->copy(); + bwdCSRLengthMetadataDAHInfo = other.bwdCSRLengthMetadataDAHInfo->copy(); } fwdNbrIDMetadataDAHInfo = other.fwdNbrIDMetadataDAHInfo->copy(); bwdNbrIDMetadataDAHInfo = other.bwdNbrIDMetadataDAHInfo->copy(); @@ -66,6 +72,8 @@ void RelTableStats::serializeInternal(Serializer& serializer) { serializer.serializeValue(nextRelOffset); serializer.serializeOptionalValue(fwdCSROffsetMetadataDAHInfo); serializer.serializeOptionalValue(bwdCSROffsetMetadataDAHInfo); + serializer.serializeOptionalValue(fwdCSRLengthMetadataDAHInfo); + serializer.serializeOptionalValue(bwdCSRLengthMetadataDAHInfo); fwdNbrIDMetadataDAHInfo->serialize(serializer); bwdNbrIDMetadataDAHInfo->serialize(serializer); serializer.serializeVectorOfPtrs(fwdPropertyMetadataDAHInfos); @@ -76,9 +84,12 @@ std::unique_ptr RelTableStats::deserialize( uint64_t numRels, table_id_t tableID, Deserializer& deserializer) { offset_t nextRelOffset; deserializer.deserializeValue(nextRelOffset); - std::unique_ptr fwdCSROffsetMetadataDAHInfo, bwdCSROffsetMetadataDAHInfo; + std::unique_ptr fwdCSROffsetMetadataDAHInfo, bwdCSROffsetMetadataDAHInfo, + fwdCSRLengthMetadataDAHInfo, bwdCSRLengthMetadataDAHInfo; deserializer.deserializeOptionalValue(fwdCSROffsetMetadataDAHInfo); deserializer.deserializeOptionalValue(bwdCSROffsetMetadataDAHInfo); + deserializer.deserializeOptionalValue(fwdCSRLengthMetadataDAHInfo); + deserializer.deserializeOptionalValue(bwdCSRLengthMetadataDAHInfo); auto fwdNbrIDMetadataDAHInfo = MetadataDAHInfo::deserialize(deserializer); auto bwdNbrIDMetadataDAHInfo = MetadataDAHInfo::deserialize(deserializer); std::vector> fwdPropertyMetadataDAHInfos; @@ -88,6 +99,8 @@ std::unique_ptr RelTableStats::deserialize( auto result = std::make_unique(numRels, tableID, nextRelOffset); result->fwdCSROffsetMetadataDAHInfo = std::move(fwdCSROffsetMetadataDAHInfo); result->bwdCSROffsetMetadataDAHInfo = std::move(bwdCSROffsetMetadataDAHInfo); + result->fwdCSRLengthMetadataDAHInfo = std::move(fwdCSRLengthMetadataDAHInfo); + result->bwdCSRLengthMetadataDAHInfo = std::move(bwdCSRLengthMetadataDAHInfo); result->fwdNbrIDMetadataDAHInfo = std::move(fwdNbrIDMetadataDAHInfo); result->bwdNbrIDMetadataDAHInfo = std::move(bwdNbrIDMetadataDAHInfo); result->fwdPropertyMetadataDAHInfos = std::move(fwdPropertyMetadataDAHInfos); diff --git a/src/storage/stats/rels_store_statistics.cpp b/src/storage/stats/rels_store_statistics.cpp index 7858933d13..5078232512 100644 --- a/src/storage/stats/rels_store_statistics.cpp +++ b/src/storage/stats/rels_store_statistics.cpp @@ -82,6 +82,15 @@ MetadataDAHInfo* RelsStoreStats::getCSROffsetMetadataDAHInfo( return tableStats->getCSROffsetMetadataDAHInfo(direction); } +MetadataDAHInfo* RelsStoreStats::getCSRLengthMetadataDAHInfo( + Transaction* transaction, table_id_t tableID, RelDataDirection direction) { + if (transaction->isWriteTransaction()) { + initTableStatisticsForWriteTrx(); + } + auto tableStats = getRelStatistics(tableID, transaction); + return tableStats->getCSRLengthMetadataDAHInfo(direction); +} + MetadataDAHInfo* RelsStoreStats::getAdjMetadataDAHInfo( Transaction* transaction, table_id_t tableID, RelDataDirection direction) { if (transaction->isWriteTransaction()) { diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 58374f25e3..a816f63b78 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -136,7 +136,7 @@ template uint64_t BaseDiskArray::resize(uint64_t newNumElements) { std::unique_lock xLck{diskArraySharedMtx}; hasTransactionalUpdates = true; - auto currentNumElements = getNumElementsNoLock(transaction::TransactionType::WRITE); + auto currentNumElements = getNumElementsNoLock(TransactionType::WRITE); U val{}; while (currentNumElements < newNumElements) { pushBackNoLock(val); diff --git a/src/storage/store/node_group.cpp b/src/storage/store/node_group.cpp index 924a1e6e47..299ac0e0b5 100644 --- a/src/storage/store/node_group.cpp +++ b/src/storage/store/node_group.cpp @@ -110,5 +110,20 @@ void NodeGroup::finalize(uint64_t nodeGroupIdx_) { } } +void CSRHeaderChunks::init(bool enableCompression) { + offset = + ColumnChunkFactory::createColumnChunk(common::LogicalType::UINT64(), enableCompression); + length = + ColumnChunkFactory::createColumnChunk(common::LogicalType::UINT64(), enableCompression); +} + +CSRNodeGroup::CSRNodeGroup( + const std::vector>& columnTypes, bool enableCompression) + // By default, initialize all column chunks except for the csrOffsetChunk to empty, as they + // should be resized after csr offset calculation (e.g., during CopyRel). + : NodeGroup{columnTypes, enableCompression, 0 /* capacity */} { + csrHeaderChunks.init(enableCompression); +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index 8bd2a14e93..f04db9f50d 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -10,6 +10,12 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { +static inline common::ColumnDataFormat getDataFormatFromSchema( + catalog::RelTableSchema* tableSchema, common::RelDataDirection direction) { + return tableSchema->isSingleMultiplicity(direction) ? common::ColumnDataFormat::REGULAR : + common::ColumnDataFormat::CSR; +} + RelDetachDeleteState::RelDetachDeleteState() { auto tempSharedState = std::make_shared(); dstNodeIDVector = std::make_unique(LogicalType{LogicalTypeID::INTERNAL_ID}); @@ -21,10 +27,18 @@ RelDetachDeleteState::RelDetachDeleteState() { RelTable::RelTable(BMFileHandle* dataFH, BMFileHandle* metadataFH, RelsStoreStats* relsStoreStats, MemoryManager* memoryManager, RelTableSchema* tableSchema, WAL* wal, bool enableCompression) : Table{tableSchema, relsStoreStats, memoryManager, wal} { - fwdRelTableData = std::make_unique(dataFH, metadataFH, bufferManager, wal, - tableSchema, relsStoreStats, RelDataDirection::FWD, enableCompression); - bwdRelTableData = std::make_unique(dataFH, metadataFH, bufferManager, wal, - tableSchema, relsStoreStats, RelDataDirection::BWD, enableCompression); + fwdRelTableData = + getDataFormatFromSchema(tableSchema, RelDataDirection::FWD) == ColumnDataFormat::REGULAR ? + std::make_unique(dataFH, metadataFH, bufferManager, wal, tableSchema, + relsStoreStats, RelDataDirection::FWD, enableCompression) : + std::make_unique(dataFH, metadataFH, bufferManager, wal, tableSchema, + relsStoreStats, RelDataDirection::FWD, enableCompression); + bwdRelTableData = + getDataFormatFromSchema(tableSchema, RelDataDirection::BWD) == ColumnDataFormat::REGULAR ? + std::make_unique(dataFH, metadataFH, bufferManager, wal, tableSchema, + relsStoreStats, RelDataDirection::BWD, enableCompression) : + std::make_unique(dataFH, metadataFH, bufferManager, wal, tableSchema, + relsStoreStats, RelDataDirection::BWD, enableCompression); } void RelTable::read(Transaction* transaction, TableReadState& readState, diff --git a/src/storage/store/rel_table_data.cpp b/src/storage/store/rel_table_data.cpp index 1671699762..d8d082d9bd 100644 --- a/src/storage/store/rel_table_data.cpp +++ b/src/storage/store/rel_table_data.cpp @@ -16,8 +16,7 @@ RelDataReadState::RelDataReadState(ColumnDataFormat dataFormat) : dataFormat{dataFormat}, startNodeOffset{0}, numNodes{0}, currentNodeOffset{0}, posInCurrentCSR{0}, readFromLocalStorage{false}, localNodeGroup{nullptr} { csrListEntries.resize(StorageConstants::NODE_GROUP_SIZE, {0, 0}); - csrOffsetChunk = - ColumnChunkFactory::createColumnChunk(LogicalType::INT64(), false /* enableCompression */); + csrHeaderChunks.init(false /*enableCompression*/); } bool RelDataReadState::hasMoreToReadFromLocalStorage() const { @@ -59,11 +58,13 @@ bool RelDataReadState::hasMoreToRead(transaction::Transaction* transaction) { void RelDataReadState::populateCSRListEntries() { csrListEntries[0].offset = 0; - csrListEntries[0].size = numNodes > 0 ? csrOffsetChunk->getValue(0) : 0; + csrListEntries[0].size = numNodes == 0 ? 0 : csrHeaderChunks.length->getValue(0); for (auto i = 1u; i < numNodes; i++) { - csrListEntries[i].offset = csrOffsetChunk->getValue(i - 1); - csrListEntries[i].size = - csrOffsetChunk->getValue(i) - csrOffsetChunk->getValue(i - 1); + csrListEntries[i].size = csrHeaderChunks.length->getValue(i); + KU_ASSERT(csrListEntries[i].size == csrHeaderChunks.offset->getValue(i) - + csrHeaderChunks.offset->getValue(i - 1)); + csrListEntries[i].offset = + csrHeaderChunks.offset->getValue(i) - csrListEntries[i].size; } } @@ -78,21 +79,11 @@ std::pair RelDataReadState::getStartAndEndOffset() { RelTableData::RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, RelTableSchema* tableSchema, - RelsStoreStats* relsStoreStats, RelDataDirection direction, bool enableCompression) + RelsStoreStats* relsStoreStats, RelDataDirection direction, bool enableCompression, + ColumnDataFormat dataFormat) : TableData{dataFH, metadataFH, tableSchema->tableID, bufferManager, wal, enableCompression, - getDataFormatFromSchema(tableSchema, direction)}, - direction{direction}, csrOffsetColumn{nullptr} { - if (dataFormat == ColumnDataFormat::CSR) { - auto csrOffsetMetadataDAHInfo = relsStoreStats->getCSROffsetMetadataDAHInfo( - &DUMMY_WRITE_TRANSACTION, tableID, direction); - // No NULL values is allowed for the csr offset column. - auto columnName = StorageUtils::getColumnName("", StorageUtils::ColumnType::CSR_OFFSET, - RelDataDirectionUtils::relDirectionToString(direction)); - csrOffsetColumn = - std::make_unique(columnName, LogicalType::INT64(), *csrOffsetMetadataDAHInfo, - dataFH, metadataFH, bufferManager, wal, &DUMMY_READ_TRANSACTION, - RWPropertyStats::empty(), enableCompression, false /* requireNUllColumn */); - } + dataFormat}, + direction{direction} { auto adjMetadataDAHInfo = relsStoreStats->getAdjMetadataDAHInfo(&DUMMY_WRITE_TRANSACTION, tableID, direction); auto adjColName = StorageUtils::getColumnName( @@ -120,32 +111,11 @@ RelTableData::RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, dynamic_cast(columns[REL_ID_COLUMN_ID].get())->setCommonTableID(tableID); } -void RelTableData::initializeReadState(Transaction* transaction, - std::vector columnIDs, ValueVector* inNodeIDVector, +void RelTableData::initializeReadState(Transaction* /*transaction*/, + std::vector columnIDs, ValueVector* /*inNodeIDVector*/, RelDataReadState* readState) { readState->direction = direction; readState->columnIDs = std::move(columnIDs); - auto nodeOffset = - inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - if (dataFormat == ColumnDataFormat::CSR) { - auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - // Reset to read from beginning for the csr of the new node offset. - readState->posInCurrentCSR = 0; - if (readState->isOutOfRange(nodeOffset)) { - // Scan csr offsets and populate csr list entries for the new node group. - readState->startNodeOffset = startNodeOffset; - csrOffsetColumn->scan(transaction, nodeGroupIdx, readState->csrOffsetChunk.get()); - readState->numNodes = readState->csrOffsetChunk->getNumValues(); - readState->populateCSRListEntries(); - if (transaction->isWriteTransaction()) { - readState->localNodeGroup = getLocalNodeGroup(transaction, nodeGroupIdx); - } - } - if (nodeOffset != readState->currentNodeOffset) { - readState->currentNodeOffset = nodeOffset; - } - } // Reset to read from persistent storage. readState->readFromLocalStorage = false; } @@ -166,7 +136,7 @@ LocalRelNG* RelTableData::getLocalNodeGroup( return localNodeGroup; } -void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState& readState, +void RelTableData::scan(Transaction* transaction, TableReadState& readState, ValueVector* inNodeIDVector, const std::vector& outputVectors) { adjColumn->scan(transaction, inNodeIDVector, outputVectors[0]); if (transaction->isReadOnly() && !ValueVector::discardNull(*outputVectors[0])) { @@ -194,48 +164,6 @@ void RelTableData::scanRegularColumns(Transaction* transaction, RelDataReadState } } -void RelTableData::scanCSRColumns(Transaction* transaction, RelDataReadState& readState, - ValueVector* inNodeIDVector, const std::vector& outputVectors) { - KU_ASSERT(dataFormat == ColumnDataFormat::CSR); - if (readState.readFromLocalStorage) { - auto offsetInChunk = readState.currentNodeOffset - readState.startNodeOffset; - auto numValuesRead = readState.localNodeGroup->scanCSR( - offsetInChunk, readState.posInCurrentCSR, readState.columnIDs, outputVectors); - readState.posInCurrentCSR += numValuesRead; - return; - } - auto [startOffset, endOffset] = readState.getStartAndEndOffset(); - auto numRowsToRead = endOffset - startOffset; - outputVectors[0]->state->selVector->resetSelectorToUnselectedWithSize(numRowsToRead); - outputVectors[0]->state->setOriginalSize(numRowsToRead); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(readState.currentNodeOffset); - adjColumn->scan(transaction, nodeGroupIdx, startOffset, endOffset, outputVectors[0], - 0 /* offsetInVector */); - auto relIDVectorIdx = INVALID_VECTOR_IDX; - for (auto i = 0u; i < readState.columnIDs.size(); i++) { - auto columnID = readState.columnIDs[i]; - auto outputVectorId = i + 1; // Skip output from adj column. - if (columnID == INVALID_COLUMN_ID) { - outputVectors[outputVectorId]->setAllNull(); - continue; - } - if (columnID == REL_ID_COLUMN_ID) { - relIDVectorIdx = outputVectorId; - } - columns[readState.columnIDs[i]]->scan(transaction, nodeGroupIdx, startOffset, endOffset, - outputVectors[outputVectorId], 0 /* offsetInVector */); - } - if (transaction->isWriteTransaction() && readState.localNodeGroup) { - auto nodeOffset = - inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]); - KU_ASSERT(relIDVectorIdx != INVALID_VECTOR_IDX); - auto relIDVector = outputVectors[relIDVectorIdx]; - readState.localNodeGroup->applyLocalChangesForCSRColumns( - nodeOffset - readState.startNodeOffset, readState.columnIDs, relIDVector, - outputVectors); - } -} - void RelTableData::lookup(Transaction* transaction, TableReadState& readState, ValueVector* inNodeIDVector, const std::vector& outputVectors) { KU_ASSERT(dataFormat == ColumnDataFormat::REGULAR); @@ -306,25 +234,10 @@ bool RelTableData::checkIfNodeHasRels(Transaction* transaction, ValueVector* src auto nodeIDPos = srcNodeIDVector->state->selVector->selectedPositions[0]; auto nodeOffset = srcNodeIDVector->getValue(nodeIDPos).offset; auto [nodeGroupIdx, offsetInChunk] = StorageUtils::getNodeGroupIdxAndOffsetInChunk(nodeOffset); - if (dataFormat == ColumnDataFormat::CSR) { - auto readState = csrOffsetColumn->getReadState(transaction->getType(), nodeGroupIdx); - std::vector offsets; - offsets.resize(2); - csrOffsetColumn->scan(transaction, readState, offsetInChunk == 0 ? 0 : offsetInChunk - 1, - offsetInChunk + 1, reinterpret_cast(&offsets[0])); - int64_t csrSize = - offsetInChunk == 0 ? offsets[0] : (int64_t)(offsets[1]) - (int64_t)(offsets[0]); - return csrSize > 0; - } else { - return !adjColumn->isNull(transaction, nodeGroupIdx, offsetInChunk); - } + return !adjColumn->isNull(transaction, nodeGroupIdx, offsetInChunk); } void RelTableData::append(NodeGroup* nodeGroup) { - if (dataFormat == ColumnDataFormat::CSR) { - auto csrNodeGroup = static_cast(nodeGroup); - csrOffsetColumn->append(csrNodeGroup->getCSROffsetChunk(), nodeGroup->getNodeGroupIdx()); - } adjColumn->append(nodeGroup->getColumnChunk(0), nodeGroup->getNodeGroupIdx()); for (auto columnID = 0u; columnID < columns.size(); columnID++) { columns[columnID]->append( @@ -333,11 +246,6 @@ void RelTableData::append(NodeGroup* nodeGroup) { } void RelTableData::resizeColumns(node_group_idx_t numNodeGroups) { - if (dataFormat == ColumnDataFormat::CSR) { - // TODO(Guodong): This is a special logic for regular columns only, and should be organized - // in a better way. - return; - } auto currentNumNodeGroups = adjColumn->getNumNodeGroups(&DUMMY_WRITE_TRANSACTION); if (numNodeGroups < currentNumNodeGroups) { return; @@ -361,18 +269,8 @@ void RelTableData::resizeColumns(node_group_idx_t numNodeGroups) { void RelTableData::prepareLocalTableToCommit( Transaction* transaction, LocalTableData* localTableData) { auto localRelTableData = ku_dynamic_cast(localTableData); - if (dataFormat == ColumnDataFormat::REGULAR) { - prepareCommitForRegularColumns(transaction, localRelTableData); - } else { - prepareCommitForCSRColumns(transaction, localRelTableData); - } -} - -void RelTableData::prepareCommitForRegularColumns( - Transaction* transaction, LocalRelTableData* localTableData) { - for (auto& [nodeGroupIdx, nodeGroup] : localTableData->nodeGroups) { + for (auto& [nodeGroupIdx, nodeGroup] : localRelTableData->nodeGroups) { auto relNG = ku_dynamic_cast(nodeGroup.get()); - KU_ASSERT(relNG); auto relNodeGroupInfo = ku_dynamic_cast(relNG->getRelNGInfo()); adjColumn->prepareCommitForChunk(transaction, nodeGroupIdx, relNG->getAdjChunk(), @@ -385,16 +283,152 @@ void RelTableData::prepareCommitForRegularColumns( } } -void RelTableData::prepareCommitForCSRColumns( - Transaction* transaction, LocalRelTableData* localTableData) { - for (auto& [nodeGroupIdx, nodeGroup] : localTableData->nodeGroups) { +void RelTableData::checkpointInMemory() { + adjColumn->checkpointInMemory(); + TableData::checkpointInMemory(); +} + +void RelTableData::rollbackInMemory() { + adjColumn->rollbackInMemory(); + TableData::rollbackInMemory(); +} + +CSRRelTableData::CSRRelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, + BufferManager* bufferManager, WAL* wal, catalog::RelTableSchema* tableSchema, + RelsStoreStats* relsStoreStats, RelDataDirection direction, bool enableCompression) + : RelTableData{dataFH, metadataFH, bufferManager, wal, tableSchema, relsStoreStats, direction, + enableCompression, ColumnDataFormat::CSR} { + // No NULL values is allowed for the csr offset column. + auto csrOffsetMetadataDAHInfo = + relsStoreStats->getCSROffsetMetadataDAHInfo(&DUMMY_WRITE_TRANSACTION, tableID, direction); + auto csrOffsetColumnName = StorageUtils::getColumnName("", StorageUtils::ColumnType::CSR_OFFSET, + RelDataDirectionUtils::relDirectionToString(direction)); + csrHeaderColumns.offset = std::make_unique(csrOffsetColumnName, LogicalType::UINT64(), + *csrOffsetMetadataDAHInfo, dataFH, metadataFH, bufferManager, wal, &DUMMY_READ_TRANSACTION, + RWPropertyStats::empty(), enableCompression, false /* requireNUllColumn */); + auto csrLengthMetadataDAHInfo = + relsStoreStats->getCSRLengthMetadataDAHInfo(&DUMMY_WRITE_TRANSACTION, tableID, direction); + auto csrLengthColumnName = StorageUtils::getColumnName("", StorageUtils::ColumnType::CSR_LENGTH, + RelDataDirectionUtils::relDirectionToString(direction)); + csrHeaderColumns.length = std::make_unique(csrLengthColumnName, LogicalType::UINT64(), + *csrLengthMetadataDAHInfo, dataFH, metadataFH, bufferManager, wal, &DUMMY_READ_TRANSACTION, + RWPropertyStats::empty(), enableCompression, false /* requireNUllColumn */); +} + +void CSRRelTableData::initializeReadState(Transaction* transaction, + std::vector columnIDs, ValueVector* inNodeIDVector, RelDataReadState* readState) { + RelTableData::initializeReadState(transaction, columnIDs, inNodeIDVector, readState); + auto nodeOffset = + inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + // Reset to read from beginning for the csr of the new node offset. + readState->posInCurrentCSR = 0; + if (readState->isOutOfRange(nodeOffset)) { + // Scan csr offsets and populate csr list entries for the new node group. + readState->startNodeOffset = startNodeOffset; + csrHeaderColumns.offset->scan( + transaction, nodeGroupIdx, readState->csrHeaderChunks.offset.get()); + csrHeaderColumns.length->scan( + transaction, nodeGroupIdx, readState->csrHeaderChunks.length.get()); + KU_ASSERT(readState->csrHeaderChunks.offset->getNumValues() == + readState->csrHeaderChunks.length->getNumValues()); + readState->numNodes = readState->csrHeaderChunks.offset->getNumValues(); + readState->populateCSRListEntries(); + if (transaction->isWriteTransaction()) { + readState->localNodeGroup = getLocalNodeGroup(transaction, nodeGroupIdx); + } + } + if (nodeOffset != readState->currentNodeOffset) { + readState->currentNodeOffset = nodeOffset; + } +} + +void CSRRelTableData::scan(Transaction* transaction, TableReadState& readState, + ValueVector* inNodeIDVector, const std::vector& outputVectors) { + auto& relReadState = common::ku_dynamic_cast(readState); + KU_ASSERT(dataFormat == ColumnDataFormat::CSR); + if (relReadState.readFromLocalStorage) { + auto offsetInChunk = relReadState.currentNodeOffset - relReadState.startNodeOffset; + KU_ASSERT(relReadState.localNodeGroup); + auto numValuesRead = relReadState.localNodeGroup->scanCSR( + offsetInChunk, relReadState.posInCurrentCSR, relReadState.columnIDs, outputVectors); + relReadState.posInCurrentCSR += numValuesRead; + return; + } + auto [startOffset, endOffset] = relReadState.getStartAndEndOffset(); + auto numRowsToRead = endOffset - startOffset; + outputVectors[0]->state->selVector->resetSelectorToUnselectedWithSize(numRowsToRead); + outputVectors[0]->state->setOriginalSize(numRowsToRead); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(relReadState.currentNodeOffset); + adjColumn->scan(transaction, nodeGroupIdx, startOffset, endOffset, outputVectors[0], + 0 /* offsetInVector */); + auto relIDVectorIdx = INVALID_VECTOR_IDX; + for (auto i = 0u; i < relReadState.columnIDs.size(); i++) { + auto columnID = relReadState.columnIDs[i]; + auto outputVectorId = i + 1; // Skip output from adj column. + if (columnID == INVALID_COLUMN_ID) { + outputVectors[outputVectorId]->setAllNull(); + continue; + } + if (columnID == REL_ID_COLUMN_ID) { + relIDVectorIdx = outputVectorId; + } + columns[relReadState.columnIDs[i]]->scan(transaction, nodeGroupIdx, startOffset, endOffset, + outputVectors[outputVectorId], 0 /* offsetInVector */); + } + if (transaction->isWriteTransaction() && relReadState.localNodeGroup) { + auto nodeOffset = + inNodeIDVector->readNodeOffset(inNodeIDVector->state->selVector->selectedPositions[0]); + KU_ASSERT(relIDVectorIdx != INVALID_VECTOR_IDX); + auto relIDVector = outputVectors[relIDVectorIdx]; + relReadState.localNodeGroup->applyLocalChangesForCSRColumns( + nodeOffset - relReadState.startNodeOffset, relReadState.columnIDs, relIDVector, + outputVectors); + } +} + +bool CSRRelTableData::checkIfNodeHasRels(Transaction* transaction, ValueVector* srcNodeIDVector) { + auto nodeIDPos = srcNodeIDVector->state->selVector->selectedPositions[0]; + auto nodeOffset = srcNodeIDVector->getValue(nodeIDPos).offset; + auto [nodeGroupIdx, offsetInChunk] = StorageUtils::getNodeGroupIdxAndOffsetInChunk(nodeOffset); + auto readState = csrHeaderColumns.offset->getReadState(transaction->getType(), nodeGroupIdx); + std::vector offsets; + offsets.resize(2); + csrHeaderColumns.offset->scan(transaction, readState, + offsetInChunk == 0 ? 0 : offsetInChunk - 1, offsetInChunk + 1, + reinterpret_cast(&offsets[0])); + int64_t csrSize = + offsetInChunk == 0 ? offsets[0] : (int64_t)(offsets[1]) - (int64_t)(offsets[0]); + return csrSize > 0; +} + +void CSRRelTableData::append(NodeGroup* nodeGroup) { + auto csrNodeGroup = static_cast(nodeGroup); + csrHeaderColumns.offset->append( + csrNodeGroup->getCSROffsetChunk(), nodeGroup->getNodeGroupIdx()); + csrHeaderColumns.length->append( + csrNodeGroup->getCSRLengthChunk(), nodeGroup->getNodeGroupIdx()); + RelTableData::append(nodeGroup); +} + +void CSRRelTableData::resizeColumns(node_group_idx_t /*numNodeGroups*/) { + // TODO(Guodong): This is a special logic for regular columns only, and should be organized + // in a better way. + return; +} + +void CSRRelTableData::prepareLocalTableToCommit( + Transaction* transaction, LocalTableData* localTable) { + auto localRelTableData = ku_dynamic_cast(localTable); + for (auto& [nodeGroupIdx, nodeGroup] : localRelTableData->nodeGroups) { auto relNG = ku_dynamic_cast(nodeGroup.get()); - KU_ASSERT(relNG); auto relNodeGroupInfo = ku_dynamic_cast(relNG->getRelNGInfo()); - // First, scan the whole csr offset column chunk, whose size is NODE_GROUP_SIZE. auto csrOffsetChunk = ColumnChunkFactory::createColumnChunk( - LogicalType::INT64(), false /* enableCompression */); - csrOffsetColumn->scan(transaction, nodeGroupIdx, csrOffsetChunk.get()); + LogicalType::UINT64(), false /* enableCompression */); + auto csrLengthChunk = ColumnChunkFactory::createColumnChunk(LogicalType::UINT64(), false); + csrHeaderColumns.offset->scan(transaction, nodeGroupIdx, csrOffsetChunk.get()); + csrHeaderColumns.length->scan(transaction, nodeGroupIdx, csrLengthChunk.get()); // Next, scan the whole relID column chunk. // TODO: We can only scan partial relID column chunk based on csr offset of the max // nodeOffset. @@ -413,22 +447,24 @@ void RelTableData::prepareCommitForCSRColumns( // Thus, we can fall back to directly update the adj column and property columns based // on csr offsets. prepareCommitCSRNGWithoutSliding(transaction, nodeGroupIdx, relNodeGroupInfo, - csrOffsetChunk.get(), relIDChunk.get(), relNG); + csrOffsetChunk.get(), csrLengthChunk.get(), relIDChunk.get(), relNG); } else { // We need to update the csr offset column. Thus, we cannot simply fall back to directly // update the adj column and property columns based on csr offsets. prepareCommitCSRNGWithSliding(transaction, nodeGroupIdx, relNodeGroupInfo, - csrOffsetChunk.get(), relIDChunk.get(), relNG); + csrOffsetChunk.get(), csrLengthChunk.get(), relIDChunk.get(), relNG); } } } -static std::pair getCSRStartAndEndOffset( - offset_t offsetInNodeGroup, const ColumnChunk& csrOffsets) { - return offsetInNodeGroup == 0 ? - std::make_pair((offset_t)0, csrOffsets.getValue(offsetInNodeGroup)) : - std::make_pair(csrOffsets.getValue(offsetInNodeGroup - 1), - csrOffsets.getValue(offsetInNodeGroup)); +static std::pair getCSRStartAndEndOffset(offset_t offsetInNodeGroup, + const ColumnChunk& csrOffsetChunk, const ColumnChunk& csrLengthChunk) { + auto endOffset = csrOffsetChunk.getValue(offsetInNodeGroup); + auto length = csrLengthChunk.getValue(offsetInNodeGroup); + KU_ASSERT(offsetInNodeGroup == 0 ? + endOffset == length : + csrOffsetChunk.getValue(offsetInNodeGroup - 1) + length == endOffset); + return std::make_pair(endOffset - length, endOffset); } static uint64_t findPosOfRelIDFromArray( @@ -441,9 +477,9 @@ static uint64_t findPosOfRelIDFromArray( return UINT64_MAX; } -void RelTableData::prepareCommitCSRNGWithoutSliding(Transaction* transaction, +void CSRRelTableData::prepareCommitCSRNGWithoutSliding(Transaction* transaction, node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo, ColumnChunk* csrOffsetChunk, - ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup) { + ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup) { // We can figure out the actual csr offset of each value to be updated based on csr and relID // chunks. auto relIDs = (offset_t*)relIDChunk->getData(); @@ -452,7 +488,7 @@ void RelTableData::prepareCommitCSRNGWithoutSliding(Transaction* transaction, auto& updateInfo = relNodeGroupInfo->updateInfoPerChunk[columnID]; for (auto& [offsetInChunk, relIDToRowIdx] : updateInfo) { auto [startCSROffset, endCSROffset] = - getCSRStartAndEndOffset(offsetInChunk, *csrOffsetChunk); + getCSRStartAndEndOffset(offsetInChunk, *csrOffsetChunk, *csrLengthChunk); for (auto [relID, rowIdx] : relIDToRowIdx) { auto csrOffset = findPosOfRelIDFromArray(relIDs, startCSROffset, endCSROffset, relID); @@ -468,9 +504,9 @@ void RelTableData::prepareCommitCSRNGWithoutSliding(Transaction* transaction, } } -void RelTableData::prepareCommitCSRNGWithSliding(Transaction* transaction, +void CSRRelTableData::prepareCommitCSRNGWithSliding(Transaction* transaction, node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo, ColumnChunk* csrOffsetChunk, - ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup) { + ColumnChunk* csrLengthChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup) { // We need to update the csr offset column. Thus, we cannot simply fall back to directly update // the adj column and property columns based on csr offsets. Instead, we need to for loop each // node in the node group, slide accordingly, and update the csr offset column, adj column and @@ -479,17 +515,19 @@ void RelTableData::prepareCommitCSRNGWithSliding(Transaction* transaction, // Slide column by column. // First we slide the csr offset column chunk, and keep the slided csr offset column chunk in // memory, so it can be used for assertion checking later. - auto slidedCSROffsetChunk = slideCSROffsetChunk(csrOffsetChunk, relNodeGroupInfo); - csrOffsetColumn->append(slidedCSROffsetChunk.get(), nodeGroupIdx); + auto [slidedCSROffsetChunk, slidedCSRLengthChunk] = + slideCSRAuxChunks(csrOffsetChunk, csrLengthChunk, relNodeGroupInfo); + csrHeaderColumns.offset->append(slidedCSROffsetChunk.get(), nodeGroupIdx); + csrHeaderColumns.length->append(slidedCSRLengthChunk.get(), nodeGroupIdx); // Then we slide the adj column chunk, rel id column chunk, and all property column chunks. - auto slidedAdjColumnChunk = - slideCSRColumnChunk(transaction, csrOffsetChunk, slidedCSROffsetChunk.get(), relIDChunk, - relNodeGroupInfo->adjInsertInfo, {} /* updateInfo */, relNodeGroupInfo->deleteInfo, - nodeGroupStartOffset, adjColumn.get(), localNodeGroup->getAdjChunk()); + auto slidedAdjColumnChunk = slideCSRColumnChunk(transaction, csrOffsetChunk, csrLengthChunk, + slidedCSROffsetChunk.get(), relIDChunk, relNodeGroupInfo->adjInsertInfo, + {} /* updateInfo */, relNodeGroupInfo->deleteInfo, nodeGroupStartOffset, adjColumn.get(), + localNodeGroup->getAdjChunk()); adjColumn->append(slidedAdjColumnChunk.get(), nodeGroupIdx); slidedAdjColumnChunk.reset(); for (auto columnID = 0u; columnID < columns.size(); columnID++) { - auto slidedColumnChunk = slideCSRColumnChunk(transaction, csrOffsetChunk, + auto slidedColumnChunk = slideCSRColumnChunk(transaction, csrOffsetChunk, csrLengthChunk, slidedCSROffsetChunk.get(), relIDChunk, relNodeGroupInfo->insertInfoPerChunk[columnID], relNodeGroupInfo->updateInfoPerChunk[columnID], relNodeGroupInfo->deleteInfo, nodeGroupStartOffset, columns[columnID].get(), @@ -499,18 +537,18 @@ void RelTableData::prepareCommitCSRNGWithSliding(Transaction* transaction, } } -std::unique_ptr RelTableData::slideCSROffsetChunk( - ColumnChunk* csrOffsetChunk, CSRRelNGInfo* relNodeGroupInfo) { - auto slidedCSRChunk = - ColumnChunkFactory::createColumnChunk(LogicalType::INT64(), enableCompression); +std::pair, std::unique_ptr> +CSRRelTableData::slideCSRAuxChunks( + ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, CSRRelNGInfo* relNodeGroupInfo) { + auto slidedCSROffsetChunk = + ColumnChunkFactory::createColumnChunk(LogicalType::UINT64(), enableCompression); + auto slidedCSRLengthChunk = + ColumnChunkFactory::createColumnChunk(LogicalType::UINT64(), enableCompression); int64_t currentCSROffset = 0; auto currentNumSrcNodesInNG = csrOffsetChunk->getNumValues(); auto newNumSrcNodesInNG = currentNumSrcNodesInNG; for (auto offsetInNG = 0u; offsetInNG < currentNumSrcNodesInNG; offsetInNG++) { - int64_t numRowsInCSR = offsetInNG == 0 ? - csrOffsetChunk->getValue(offsetInNG) : - csrOffsetChunk->getValue(offsetInNG) - - csrOffsetChunk->getValue(offsetInNG - 1); + int64_t numRowsInCSR = csrLengthChunk->getValue(offsetInNG); KU_ASSERT(numRowsInCSR >= 0); int64_t numDeletions = relNodeGroupInfo->deleteInfo.contains(offsetInNG) ? relNodeGroupInfo->deleteInfo[offsetInNG].size() : @@ -521,7 +559,8 @@ std::unique_ptr RelTableData::slideCSROffsetChunk( int64_t numRowsAfterSlide = numRowsInCSR + numInsertions - numDeletions; KU_ASSERT(numRowsAfterSlide >= 0); currentCSROffset += numRowsAfterSlide; - slidedCSRChunk->setValue(currentCSROffset, offsetInNG); + slidedCSROffsetChunk->setValue(currentCSROffset, offsetInNG); + slidedCSRLengthChunk->setValue(numRowsAfterSlide, offsetInNG); } for (auto offsetInNG = currentNumSrcNodesInNG; offsetInNG < StorageConstants::NODE_GROUP_SIZE; offsetInNG++) { @@ -537,24 +576,26 @@ std::unique_ptr RelTableData::slideCSROffsetChunk( newNumSrcNodesInNG = offsetInNG + 1; } currentCSROffset += numRowsInCSR; - slidedCSRChunk->setValue(currentCSROffset, offsetInNG); + slidedCSROffsetChunk->setValue(currentCSROffset, offsetInNG); + slidedCSRLengthChunk->setValue(numRowsInCSR, offsetInNG); } - slidedCSRChunk->setNumValues(newNumSrcNodesInNG); - return slidedCSRChunk; + slidedCSROffsetChunk->setNumValues(newNumSrcNodesInNG); + slidedCSRLengthChunk->setNumValues(newNumSrcNodesInNG); + return std::make_pair(std::move(slidedCSROffsetChunk), std::move(slidedCSRLengthChunk)); } -std::unique_ptr RelTableData::slideCSRColumnChunk(Transaction* transaction, - ColumnChunk* csrOffsetChunk, ColumnChunk* slidedCSROffsetChunkForCheck, ColumnChunk* relIDChunk, - const offset_to_offset_to_row_idx_t& insertInfo, +std::unique_ptr CSRRelTableData::slideCSRColumnChunk(Transaction* transaction, + ColumnChunk* csrOffsetChunk, ColumnChunk* csrLengthChunk, ColumnChunk* slidedCSROffsetChunk, + ColumnChunk* relIDChunk, const offset_to_offset_to_row_idx_t& insertInfo, const offset_to_offset_to_row_idx_t& updateInfo, const offset_to_offset_set_t& deleteInfo, node_group_idx_t nodeGroupIdx, Column* column, LocalVectorCollection* localChunk) { auto oldCapacity = csrOffsetChunk->getNumValues() == 0 ? 0 : csrOffsetChunk->getValue(csrOffsetChunk->getNumValues() - 1); - auto newCapacity = slidedCSROffsetChunkForCheck->getNumValues() == 0 ? - 0 : - slidedCSROffsetChunkForCheck->getValue( - slidedCSROffsetChunkForCheck->getNumValues() - 1); + auto newCapacity = + slidedCSROffsetChunk->getNumValues() == 0 ? + 0 : + slidedCSROffsetChunk->getValue(slidedCSROffsetChunk->getNumValues() - 1); // TODO: No need to allocate the new column chunk if this is relID. auto columnChunk = ColumnChunkFactory::createColumnChunk( column->getDataType()->copy(), enableCompression, oldCapacity); @@ -564,7 +605,8 @@ std::unique_ptr RelTableData::slideCSRColumnChunk(Transaction* tran auto currentNumSrcNodesInNG = csrOffsetChunk->getNumValues(); auto relIDs = (offset_t*)relIDChunk->getData(); for (auto offsetInNG = 0u; offsetInNG < currentNumSrcNodesInNG; offsetInNG++) { - auto [startCSROffset, endCSROffset] = getCSRStartAndEndOffset(offsetInNG, *csrOffsetChunk); + auto [startCSROffset, endCSROffset] = + getCSRStartAndEndOffset(offsetInNG, *csrOffsetChunk, *csrLengthChunk); auto hasDeletions = deleteInfo.contains(offsetInNG); auto hasUpdates = updateInfo.contains(offsetInNG); auto hasInsertions = insertInfo.contains(offsetInNG); @@ -618,20 +660,16 @@ std::unique_ptr RelTableData::slideCSRColumnChunk(Transaction* tran return newColumnChunk; } -void RelTableData::checkpointInMemory() { - if (csrOffsetColumn) { - csrOffsetColumn->checkpointInMemory(); - } - adjColumn->checkpointInMemory(); - TableData::checkpointInMemory(); +void CSRRelTableData::checkpointInMemory() { + csrHeaderColumns.offset->checkpointInMemory(); + csrHeaderColumns.length->checkpointInMemory(); + RelTableData::checkpointInMemory(); } -void RelTableData::rollbackInMemory() { - if (csrOffsetColumn) { - csrOffsetColumn->rollbackInMemory(); - } - adjColumn->rollbackInMemory(); - TableData::rollbackInMemory(); +void CSRRelTableData::rollbackInMemory() { + csrHeaderColumns.offset->rollbackInMemory(); + csrHeaderColumns.length->rollbackInMemory(); + RelTableData::rollbackInMemory(); } } // namespace storage