diff --git a/src/include/processor/operator/partitioner.h b/src/include/processor/operator/partitioner.h index 19a327ee33..98173c035b 100644 --- a/src/include/processor/operator/partitioner.h +++ b/src/include/processor/operator/partitioner.h @@ -1,7 +1,7 @@ #pragma once #include "processor/operator/sink.h" -#include "storage/store/column_chunk.h" +#include "storage/store/chunked_node_group_collection.h" namespace kuzu { namespace storage { @@ -20,14 +20,7 @@ struct PartitionerFunctions { // partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD // direction. Each partitioning method corresponds to a PartitioningState. struct PartitioningBuffer { - using ColumnChunkCollection = std::vector>; - struct Partition { - // One chunk for each column, grouped into a list - // so that groups from different threads can be quickly merged without copying - // E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc. - std::vector chunks; - }; - std::vector partitions; + std::vector partitions; void merge(std::unique_ptr localPartitioningStates); }; @@ -35,6 +28,7 @@ struct PartitioningBuffer { // NOTE: Currently, Partitioner is tightly coupled with RelBatchInsert. We should generalize it // later when necessary. Here, each partition is essentially a node group. struct BatchInsertSharedState; +struct PartitioningInfo; struct PartitionerSharedState { std::mutex mtx; storage::MemoryManager* mm; @@ -51,12 +45,12 @@ struct PartitionerSharedState { explicit PartitionerSharedState(storage::MemoryManager* mm) : mm{mm} {} - void initialize(); + void initialize(std::vector>& infos); common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx); void resetState(); void merge(std::vector> localPartitioningStates); - inline PartitioningBuffer::Partition& getPartitionBuffer( + inline storage::ChunkedNodeGroupCollection& getPartitionBuffer( common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) { KU_ASSERT(partitioningIdx < partitioningBuffers.size()); KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size()); @@ -107,7 +101,7 @@ class Partitioner : public Sink { std::unique_ptr clone() final; - static void initializePartitioningStates( + static void initializePartitioningStates(std::vector>& infos, std::vector>& partitioningBuffers, std::vector numPartitions); @@ -121,9 +115,6 @@ class Partitioner : public Sink { common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom); private: - // Same size as a value vector. Each thread will allocate a chunk for each node group, - // so this should be kept relatively small to avoid allocating more memory than is needed - static const uint64_t CHUNK_SIZE = 2048; std::vector> infos; std::shared_ptr sharedState; std::unique_ptr localState; diff --git a/src/include/processor/operator/persistent/index_builder.h b/src/include/processor/operator/persistent/index_builder.h index 12b5c5222d..74a7eff5db 100644 --- a/src/include/processor/operator/persistent/index_builder.h +++ b/src/include/processor/operator/persistent/index_builder.h @@ -151,7 +151,7 @@ class IndexBuilder { IndexBuilder clone() { return IndexBuilder(sharedState); } void insert( - storage::ColumnChunk* chunk, common::offset_t nodeOffset, common::offset_t numNodes); + const storage::ColumnChunk& chunk, common::offset_t nodeOffset, common::offset_t numNodes); ProducerToken getProducerToken() const { return ProducerToken(sharedState); } @@ -159,7 +159,8 @@ class IndexBuilder { void finalize(ExecutionContext* context); private: - void checkNonNullConstraint(storage::NullColumnChunk* nullChunk, common::offset_t numNodes); + void checkNonNullConstraint( + const storage::NullColumnChunk& nullChunk, common::offset_t numNodes); std::shared_ptr sharedState; IndexBuilderLocalBuffers localBuffers; diff --git a/src/include/processor/operator/persistent/node_batch_insert.h b/src/include/processor/operator/persistent/node_batch_insert.h index bfcaea3de8..2aaebc147d 100644 --- a/src/include/processor/operator/persistent/node_batch_insert.h +++ b/src/include/processor/operator/persistent/node_batch_insert.h @@ -4,7 +4,7 @@ #include "processor/operator/call/in_query_call.h" #include "processor/operator/persistent/batch_insert.h" #include "processor/operator/persistent/index_builder.h" -#include "storage/store/node_group.h" +#include "storage/store/chunked_node_group.h" #include "storage/store/node_table.h" namespace kuzu { diff --git a/src/include/processor/operator/persistent/rel_batch_insert.h b/src/include/processor/operator/persistent/rel_batch_insert.h index 387bfffaed..cc772f2879 100644 --- a/src/include/processor/operator/persistent/rel_batch_insert.h +++ b/src/include/processor/operator/persistent/rel_batch_insert.h @@ -3,8 +3,8 @@ #include "common/enums/rel_direction.h" #include "processor/operator/partitioner.h" #include "processor/operator/persistent/batch_insert.h" +#include "storage/store/chunked_node_group.h" #include "storage/store/column_chunk.h" -#include "storage/store/node_group.h" namespace kuzu { namespace processor { @@ -12,18 +12,18 @@ namespace processor { struct RelBatchInsertInfo final : public BatchInsertInfo { common::RelDataDirection direction; uint64_t partitioningIdx; - common::vector_idx_t offsetVectorIdx; + common::column_id_t offsetColumnID; std::vector columnTypes; RelBatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled, common::RelDataDirection direction, uint64_t partitioningIdx, - common::vector_idx_t offsetVectorIdx, std::vector columnTypes) + common::column_id_t offsetColumnID, std::vector columnTypes) : BatchInsertInfo{tableEntry, compressionEnabled}, direction{direction}, - partitioningIdx{partitioningIdx}, offsetVectorIdx{offsetVectorIdx}, columnTypes{std::move( - columnTypes)} {} + partitioningIdx{partitioningIdx}, offsetColumnID{offsetColumnID}, columnTypes{std::move( + columnTypes)} {} RelBatchInsertInfo(const RelBatchInsertInfo& other) : BatchInsertInfo{other.tableEntry, other.compressionEnabled}, direction{other.direction}, - partitioningIdx{other.partitioningIdx}, offsetVectorIdx{other.offsetVectorIdx}, + partitioningIdx{other.partitioningIdx}, offsetColumnID{other.offsetColumnID}, columnTypes{common::LogicalType::copy(other.columnTypes)} {} inline std::unique_ptr copy() const override { @@ -60,20 +60,20 @@ class RelBatchInsert final : public BatchInsert { } private: - void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition, - common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx, + void prepareCSRNodeGroup(storage::ChunkedNodeGroupCollection& partition, + common::offset_t startNodeOffset, common::column_id_t offsetColumnID, common::offset_t numNodes); static common::length_t getGapSize(common::length_t length); static std::vector populateStartCSROffsetsAndLengths( storage::ChunkedCSRHeader& csrHeader, common::offset_t numNodes, - PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx); + storage::ChunkedNodeGroupCollection& partition, common::column_id_t offsetColumnID); static void populateEndCSROffsets( storage::ChunkedCSRHeader& csrHeader, std::vector& gaps); static void setOffsetToWithinNodeGroup( storage::ColumnChunk& chunk, common::offset_t startOffset); static void setOffsetFromCSROffsets( - storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk); + storage::ColumnChunk& nodeOffsetChunk, storage::ColumnChunk& csrOffsetChunk); // We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now. std::optional checkRelMultiplicityConstraint( diff --git a/src/include/storage/local_storage/local_table.h b/src/include/storage/local_storage/local_table.h index 6608a2641b..0bd931a730 100644 --- a/src/include/storage/local_storage/local_table.h +++ b/src/include/storage/local_storage/local_table.h @@ -5,7 +5,7 @@ #include "common/enums/rel_multiplicity.h" #include "common/enums/table_type.h" #include "common/vector/value_vector.h" -#include "storage/store/node_group.h" +#include "storage/store/chunked_node_group_collection.h" namespace kuzu { namespace storage { @@ -22,16 +22,14 @@ using ChunkCollection = std::vector; class LocalChunkedGroupCollection { public: - static constexpr uint64_t CHUNK_CAPACITY = 2048; - explicit LocalChunkedGroupCollection(std::vector dataTypes) - : dataTypes{std::move(dataTypes)}, numRows{0} {} + : dataTypes{std::move(dataTypes)}, chunkedGroups{this->dataTypes}, numRows{0} {} DELETE_COPY_DEFAULT_MOVE(LocalChunkedGroupCollection); static inline std::pair getChunkIdxAndOffsetInChunk( common::row_idx_t rowIdx) { - return std::make_pair(rowIdx / LocalChunkedGroupCollection::CHUNK_CAPACITY, - rowIdx % LocalChunkedGroupCollection::CHUNK_CAPACITY); + return std::make_pair(rowIdx / ChunkedNodeGroupCollection::CHUNK_CAPACITY, + rowIdx % ChunkedNodeGroupCollection::CHUNK_CAPACITY); } inline common::row_idx_t getRowIdxFromOffset(common::offset_t offset) { @@ -82,7 +80,7 @@ class LocalChunkedGroupCollection { inline ChunkCollection getLocalChunk(common::column_id_t columnID) { ChunkCollection localChunkCollection; for (auto& chunkedGroup : chunkedGroups.getChunkedGroups()) { - localChunkCollection.push_back(chunkedGroup->getColumnChunkUnsafe(columnID)); + localChunkCollection.push_back(&chunkedGroup->getColumnChunkUnsafe(columnID)); } return localChunkCollection; } @@ -91,10 +89,10 @@ class LocalChunkedGroupCollection { common::row_idx_t append(std::vector vectors); private: - ChunkedNodeGroupCollection chunkedGroups; + std::vector dataTypes; + storage::ChunkedNodeGroupCollection chunkedGroups; // The offset here can either be nodeOffset ( for node table) or relOffset (for rel table). offset_to_row_idx_t offsetToRowIdx; - std::vector dataTypes; common::row_idx_t numRows; // Only used for rel tables. Should be moved out later. diff --git a/src/include/storage/store/node_group.h b/src/include/storage/store/chunked_node_group.h similarity index 76% rename from src/include/storage/store/node_group.h rename to src/include/storage/store/chunked_node_group.h index 35645ab918..2cac9cc916 100644 --- a/src/include/storage/store/node_group.h +++ b/src/include/storage/store/chunked_node_group.h @@ -23,14 +23,11 @@ class ChunkedNodeGroup { KU_ASSERT(columnID < chunks.size()); return *chunks[columnID]; } - inline ColumnChunk* getColumnChunkUnsafe(common::column_id_t columnID) { - KU_ASSERT(columnID < chunks.size()); - return chunks[columnID].get(); - } - inline const ColumnChunk& getColumnChunk(common::column_id_t columnID) { + inline ColumnChunk& getColumnChunkUnsafe(common::column_id_t columnID) { KU_ASSERT(columnID < chunks.size()); return *chunks[columnID]; } + inline std::vector>& getColumnChunksUnsafe() { return chunks; } inline bool isFull() const { return numRows == common::StorageConstants::NODE_GROUP_SIZE; } void resetToEmpty(); @@ -39,7 +36,7 @@ class ChunkedNodeGroup { void resizeChunks(uint64_t newSize); uint64_t append(const std::vector& columnVectors, - common::DataChunkState* columnState, uint64_t numValuesToAppend); + common::SelectionVector& selVector, uint64_t numValuesToAppend); common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup); void write(std::vector>& data, common::vector_idx_t offsetVector); @@ -98,32 +95,6 @@ class ChunkedCSRNodeGroup : public ChunkedNodeGroup { ChunkedCSRHeader csrHeader; }; -class ChunkedNodeGroupCollection { -public: - ChunkedNodeGroupCollection() {} - - inline const std::vector>& getChunkedGroups() const { - return chunkedGroups; - } - inline const ChunkedNodeGroup* getChunkedGroup(uint64_t groupIdx) const { - KU_ASSERT(groupIdx < chunkedGroups.size()); - return chunkedGroups[groupIdx].get(); - } - inline ChunkedNodeGroup* getChunkedGroupUnsafe(uint64_t groupIdx) { - KU_ASSERT(groupIdx < chunkedGroups.size()); - return chunkedGroups[groupIdx].get(); - } - inline uint64_t getNumChunks() const { return chunkedGroups.size(); } - void append(std::unique_ptr chunkedGroup); - -private: - // Assert that all chunked node groups have the same num columns and same data types. - bool sanityCheckForAppend(); - -private: - std::vector> chunkedGroups; -}; - struct NodeGroupFactory { static inline std::unique_ptr createNodeGroup( common::ColumnDataFormat dataFormat, const std::vector& columnTypes, diff --git a/src/include/storage/store/chunked_node_group_collection.h b/src/include/storage/store/chunked_node_group_collection.h new file mode 100644 index 0000000000..9573a80fef --- /dev/null +++ b/src/include/storage/store/chunked_node_group_collection.h @@ -0,0 +1,40 @@ +#pragma once + +#include "storage/store/chunked_node_group.h" + +namespace kuzu { +namespace storage { + +class ChunkedNodeGroupCollection { +public: + static constexpr uint64_t CHUNK_CAPACITY = 2048; + + explicit ChunkedNodeGroupCollection(std::vector types) + : types{std::move(types)} {} + DELETE_COPY_DEFAULT_MOVE(ChunkedNodeGroupCollection); + + inline const std::vector>& getChunkedGroups() const { + return chunkedGroups; + } + inline const ChunkedNodeGroup* getChunkedGroup(uint64_t groupIdx) const { + KU_ASSERT(groupIdx < chunkedGroups.size()); + return chunkedGroups[groupIdx].get(); + } + inline ChunkedNodeGroup* getChunkedGroupUnsafe(uint64_t groupIdx) { + KU_ASSERT(groupIdx < chunkedGroups.size()); + return chunkedGroups[groupIdx].get(); + } + inline uint64_t getNumChunks() const { return chunkedGroups.size(); } + + void append( + const std::vector& vectors, const common::SelectionVector& selVector); + void append(std::unique_ptr chunkedGroup); + void merge(ChunkedNodeGroupCollection& chunkedGroupCollection); + +private: + std::vector types; + std::vector> chunkedGroups; +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index 8eecf3857f..ac1495ccf7 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -48,6 +48,7 @@ class ColumnChunk { } inline NullColumnChunk* getNullChunk() { return nullChunk.get(); } + inline const NullColumnChunk& getNullChunk() const { return *nullChunk; } inline common::LogicalType& getDataType() { return dataType; } inline const common::LogicalType& getDataType() const { return dataType; } diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index 320bb11b58..2fa4215262 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -6,7 +6,7 @@ #include "common/cast.h" #include "storage/index/hash_index.h" #include "storage/stats/nodes_store_statistics.h" -#include "storage/store/node_group.h" +#include "storage/store/chunked_node_group.h" #include "storage/store/node_table_data.h" #include "storage/store/table.h" diff --git a/src/include/storage/store/rel_table_data.h b/src/include/storage/store/rel_table_data.h index 2ddb88e043..5815bd97a1 100644 --- a/src/include/storage/store/rel_table_data.h +++ b/src/include/storage/store/rel_table_data.h @@ -1,7 +1,7 @@ #pragma once #include "common/enums/rel_direction.h" -#include "storage/store/node_group.h" +#include "storage/store/chunked_node_group.h" #include "storage/store/table_data.h" namespace kuzu { diff --git a/src/include/storage/store/table_data.h b/src/include/storage/store/table_data.h index 10beea3fa5..1263c3baf8 100644 --- a/src/include/storage/store/table_data.h +++ b/src/include/storage/store/table_data.h @@ -1,7 +1,7 @@ #pragma once +#include "storage/store/chunked_node_group.h" #include "storage/store/column.h" -#include "storage/store/node_group.h" namespace kuzu { namespace storage { diff --git a/src/processor/operator/partitioner.cpp b/src/processor/operator/partitioner.cpp index 51a38c4817..35838ec8af 100644 --- a/src/processor/operator/partitioner.cpp +++ b/src/processor/operator/partitioner.cpp @@ -3,7 +3,6 @@ #include "common/constants.h" #include "common/data_chunk/sel_vector.h" #include "processor/execution_context.h" -#include "storage/store/column_chunk.h" #include "storage/store/node_table.h" using namespace kuzu::common; @@ -37,7 +36,7 @@ static partition_idx_t getNumPartitions(offset_t maxOffset) { return (maxOffset + StorageConstants::NODE_GROUP_SIZE) / StorageConstants::NODE_GROUP_SIZE; } -void PartitionerSharedState::initialize() { +void PartitionerSharedState::initialize(std::vector>& infos) { maxNodeOffsets.resize(2); maxNodeOffsets[0] = srcNodeTable->getMaxNodeOffset(transaction::Transaction::getDummyWriteTrx().get()); @@ -46,7 +45,7 @@ void PartitionerSharedState::initialize() { numPartitions.resize(2); numPartitions[0] = getNumPartitions(maxNodeOffsets[0]); numPartitions[1] = getNumPartitions(maxNodeOffsets[1]); - Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions); + Partitioner::initializePartitioningStates(infos, partitioningBuffers, numPartitions); } partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) { @@ -77,11 +76,7 @@ void PartitioningBuffer::merge(std::unique_ptr localPartitio for (auto partitionIdx = 0u; partitionIdx < partitions.size(); partitionIdx++) { auto& sharedPartition = partitions[partitionIdx]; auto& localPartition = localPartitioningState->partitions[partitionIdx]; - sharedPartition.chunks.reserve( - sharedPartition.chunks.size() + localPartition.chunks.size()); - for (auto j = 0u; j < localPartition.chunks.size(); j++) { - sharedPartition.chunks.push_back(std::move(localPartition.chunks[j])); - } + sharedPartition.merge(localPartition); } } @@ -96,12 +91,13 @@ Partitioner::Partitioner(std::unique_ptr resultSetDescripto } void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) { - sharedState->initialize(); + sharedState->initialize(infos); } void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) { localState = std::make_unique(); - initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions); + initializePartitioningStates( + infos, localState->partitioningBuffers, sharedState->numPartitions); } DataChunk Partitioner::constructDataChunk(const std::vector& columnPositions, @@ -122,6 +118,7 @@ DataChunk Partitioner::constructDataChunk(const std::vector& columnPosi } void Partitioner::initializePartitioningStates( + std::vector>& infos, std::vector>& partitioningBuffers, std::vector numPartitions) { partitioningBuffers.resize(numPartitions.size()); @@ -130,7 +127,7 @@ void Partitioner::initializePartitioningStates( auto partitioningBuffer = std::make_unique(); partitioningBuffer->partitions.reserve(numPartition); for (auto i = 0u; i < numPartition; i++) { - partitioningBuffer->partitions.emplace_back(); + partitioningBuffer->partitions.emplace_back(infos[partitioningIdx]->columnTypes); } partitioningBuffers[partitioningIdx] = std::move(partitioningBuffer); } @@ -152,6 +149,11 @@ void Partitioner::executeInternal(ExecutionContext* context) { } void Partitioner::copyDataToPartitions(partition_idx_t partitioningIdx, DataChunk chunkToCopyFrom) { + std::vector vectorsToAppend; + vectorsToAppend.reserve(chunkToCopyFrom.getNumValueVectors()); + for (auto j = 0u; j < chunkToCopyFrom.getNumValueVectors(); j++) { + vectorsToAppend.push_back(chunkToCopyFrom.getValueVector(j).get()); + } SelectionVector selVector(1); selVector.resetSelectorToValuePosBufferWithSize(1); for (auto i = 0u; i < chunkToCopyFrom.state->selVector->selectedSize; i++) { @@ -161,21 +163,8 @@ void Partitioner::copyDataToPartitions(partition_idx_t partitioningIdx, DataChun partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size()); auto& partition = localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx]; - if (partition.chunks.empty() || partition.chunks.back()[0]->getNumValues() + 1 > - partition.chunks.back()[0]->getCapacity()) { - partition.chunks.emplace_back(); - partition.chunks.back().reserve(chunkToCopyFrom.getNumValueVectors()); - for (auto j = 0u; j < chunkToCopyFrom.getNumValueVectors(); j++) { - partition.chunks.back().emplace_back( - ColumnChunkFactory::createColumnChunk(infos[partitioningIdx]->columnTypes[j], - false /*enableCompression*/, Partitioner::CHUNK_SIZE)); - } - } - KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom.getNumValueVectors()); selVector.selectedPositions[0] = posToCopyFrom; - for (auto j = 0u; j < chunkToCopyFrom.getNumValueVectors(); j++) { - partition.chunks.back()[j]->append(chunkToCopyFrom.getValueVector(j).get(), selVector); - } + partition.append(vectorsToAppend, selVector); } } diff --git a/src/processor/operator/persistent/index_builder.cpp b/src/processor/operator/persistent/index_builder.cpp index acb0c313df..82deb37588 100644 --- a/src/processor/operator/persistent/index_builder.cpp +++ b/src/processor/operator/persistent/index_builder.cpp @@ -89,26 +89,27 @@ void IndexBuilderSharedState::quitProducer() { } } -void IndexBuilder::insert(ColumnChunk* chunk, offset_t nodeOffset, offset_t numNodes) { - checkNonNullConstraint(chunk->getNullChunk(), numNodes); +void IndexBuilder::insert(const ColumnChunk& chunk, offset_t nodeOffset, offset_t numNodes) { + checkNonNullConstraint(chunk.getNullChunk(), numNodes); TypeUtils::visit( - chunk->getDataType().getPhysicalType(), + chunk.getDataType().getPhysicalType(), [&](T) { for (auto i = 0u; i < numNodes; i++) { - auto value = chunk->getValue(i); + auto value = chunk.getValue(i); localBuffers.insert(value, nodeOffset + i); } }, [&](ku_string_t) { - auto stringColumnChunk = ku_dynamic_cast(chunk); + auto& stringColumnChunk = + ku_dynamic_cast(chunk); for (auto i = 0u; i < numNodes; i++) { - auto value = stringColumnChunk->getValue(i); + auto value = stringColumnChunk.getValue(i); localBuffers.insert(std::move(value), nodeOffset + i); } }, [&](auto) { - throw CopyException(ExceptionMessage::invalidPKType(chunk->getDataType().toString())); + throw CopyException(ExceptionMessage::invalidPKType(chunk.getDataType().toString())); }); } @@ -129,9 +130,9 @@ void IndexBuilder::finalize(ExecutionContext* /*context*/) { sharedState->flush(); } -void IndexBuilder::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) { +void IndexBuilder::checkNonNullConstraint(const NullColumnChunk& nullChunk, offset_t numNodes) { for (auto i = 0u; i < numNodes; i++) { - if (nullChunk->isNull(i)) { + if (nullChunk.isNull(i)) { throw CopyException(ExceptionMessage::nullPKException()); } } diff --git a/src/processor/operator/persistent/node_batch_insert.cpp b/src/processor/operator/persistent/node_batch_insert.cpp index e9a3e57f03..3aa0d72240 100644 --- a/src/processor/operator/persistent/node_batch_insert.cpp +++ b/src/processor/operator/persistent/node_batch_insert.cpp @@ -158,7 +158,7 @@ void NodeBatchInsert::copyToNodeGroup() { while (numAppendedTuples < numTuplesToAppend) { auto numAppendedTuplesInNodeGroup = nodeLocalState->nodeGroup->append(nodeLocalState->columnVectors, - nodeLocalState->columnState, numTuplesToAppend - numAppendedTuples); + *nodeLocalState->columnState->selVector, numTuplesToAppend - numAppendedTuples); numAppendedTuples += numAppendedTuplesInNodeGroup; if (nodeLocalState->nodeGroup->isFull()) { node_group_idx_t nodeGroupIdx; diff --git a/src/processor/operator/persistent/rel_batch_insert.cpp b/src/processor/operator/persistent/rel_batch_insert.cpp index 9473ef2052..6113407e14 100644 --- a/src/processor/operator/persistent/rel_batch_insert.cpp +++ b/src/processor/operator/persistent/rel_batch_insert.cpp @@ -3,7 +3,6 @@ #include "common/exception/copy.h" #include "common/exception/message.h" #include "common/string_format.h" -#include "processor/operator/partitioner.h" #include "processor/result/factorized_table.h" #include "storage/store/column_chunk.h" #include "storage/store/rel_table.h" @@ -42,17 +41,18 @@ void RelBatchInsert::executeInternal(ExecutionContext* /*context*/) { auto& partitioningBuffer = partitionerSharedState->getPartitionBuffer( relInfo->partitioningIdx, relLocalState->nodeGroupIdx); auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(relLocalState->nodeGroupIdx); - for (auto& columns : partitioningBuffer.chunks) { - setOffsetToWithinNodeGroup(*columns[relInfo->offsetVectorIdx], startNodeOffset); + for (auto& chunkedGroup : partitioningBuffer.getChunkedGroups()) { + setOffsetToWithinNodeGroup( + chunkedGroup->getColumnChunkUnsafe(relInfo->offsetColumnID), startNodeOffset); } // Calculate num of source nodes in this node group. // This will be used to set the num of values of the node group. auto numNodes = std::min(StorageConstants::NODE_GROUP_SIZE, partitionerSharedState->maxNodeOffsets[relInfo->partitioningIdx] - startNodeOffset + 1); - prepareCSRNodeGroup( - partitioningBuffer, startNodeOffset, relInfo->offsetVectorIdx, numNodes); - for (auto& chunk : partitioningBuffer.chunks) { - localState->nodeGroup->write(chunk, relInfo->offsetVectorIdx); + prepareCSRNodeGroup(partitioningBuffer, startNodeOffset, relInfo->offsetColumnID, numNodes); + for (auto& chunkedGroup : partitioningBuffer.getChunkedGroups()) { + localState->nodeGroup->write( + chunkedGroup->getColumnChunksUnsafe(), relInfo->offsetColumnID); } localState->nodeGroup->finalize(relLocalState->nodeGroupIdx); // Flush node group to table. @@ -62,15 +62,15 @@ void RelBatchInsert::executeInternal(ExecutionContext* /*context*/) { } } -void RelBatchInsert::prepareCSRNodeGroup(PartitioningBuffer::Partition& partition, - common::offset_t startNodeOffset, vector_idx_t offsetVectorIdx, offset_t numNodes) { +void RelBatchInsert::prepareCSRNodeGroup(ChunkedNodeGroupCollection& partition, + common::offset_t startNodeOffset, column_id_t offsetColumnID, offset_t numNodes) { auto relInfo = ku_dynamic_cast(info.get()); auto csrNodeGroup = ku_dynamic_cast(localState->nodeGroup.get()); auto& csrHeader = csrNodeGroup->getCSRHeader(); csrHeader.setNumValues(numNodes); // Populate start csr offsets and lengths for each node. - auto gaps = populateStartCSROffsetsAndLengths(csrHeader, numNodes, partition, offsetVectorIdx); + auto gaps = populateStartCSROffsetsAndLengths(csrHeader, numNodes, partition, offsetColumnID); auto invalid = checkRelMultiplicityConstraint(csrHeader); if (invalid.has_value()) { throw CopyException(ExceptionMessage::violateRelMultiplicityConstraint( @@ -81,9 +81,9 @@ void RelBatchInsert::prepareCSRNodeGroup(PartitioningBuffer::Partition& partitio offset_t csrChunkCapacity = csrHeader.getEndCSROffset(numNodes - 1) + csrHeader.getCSRLength(numNodes - 1); localState->nodeGroup->resizeChunks(csrChunkCapacity); - for (auto& dataChunk : partition.chunks) { - auto offsetChunk = dataChunk[offsetVectorIdx].get(); - setOffsetFromCSROffsets(offsetChunk, csrHeader.offset.get()); + for (auto& chunkedGroup : partition.getChunkedGroups()) { + auto& offsetChunk = chunkedGroup->getColumnChunkUnsafe(offsetColumnID); + setOffsetFromCSROffsets(offsetChunk, *csrHeader.offset); } populateEndCSROffsets(csrHeader, gaps); } @@ -106,7 +106,7 @@ length_t RelBatchInsert::getGapSize(length_t length) { } std::vector RelBatchInsert::populateStartCSROffsetsAndLengths(ChunkedCSRHeader& csrHeader, - offset_t numNodes, PartitioningBuffer::Partition& partition, vector_idx_t offsetVectorIdx) { + offset_t numNodes, ChunkedNodeGroupCollection& partition, column_id_t offsetColumnID) { KU_ASSERT(numNodes == csrHeader.length->getNumValues() && numNodes == csrHeader.offset->getNumValues()); std::vector gaps; @@ -115,10 +115,10 @@ std::vector RelBatchInsert::populateStartCSROffsetsAndLengths(ChunkedC auto csrLengths = (length_t*)csrHeader.length->getData(); std::fill(csrLengths, csrLengths + numNodes, 0); // Calculate length for each node. Store the num of tuples of node i at csrLengths[i]. - for (auto& chunk : partition.chunks) { - auto& offsetChunk = chunk[offsetVectorIdx]; - for (auto i = 0u; i < offsetChunk->getNumValues(); i++) { - auto nodeOffset = offsetChunk->getValue(i); + for (auto& chunkedGroup : partition.getChunkedGroups()) { + auto& offsetChunk = chunkedGroup->getColumnChunk(offsetColumnID); + for (auto i = 0u; i < offsetChunk.getNumValues(); i++) { + auto nodeOffset = offsetChunk.getValue(i); KU_ASSERT(nodeOffset < numNodes); csrLengths[nodeOffset]++; } @@ -144,13 +144,13 @@ void RelBatchInsert::setOffsetToWithinNodeGroup(ColumnChunk& chunk, offset_t sta } void RelBatchInsert::setOffsetFromCSROffsets( - ColumnChunk* nodeOffsetChunk, ColumnChunk* csrOffsetChunk) { - KU_ASSERT(nodeOffsetChunk->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID); - for (auto i = 0u; i < nodeOffsetChunk->getNumValues(); i++) { - auto nodeOffset = nodeOffsetChunk->getValue(i); - auto csrOffset = csrOffsetChunk->getValue(nodeOffset); - nodeOffsetChunk->setValue(csrOffset, i); - csrOffsetChunk->setValue(csrOffset + 1, nodeOffset); + ColumnChunk& nodeOffsetChunk, ColumnChunk& csrOffsetChunk) { + KU_ASSERT(nodeOffsetChunk.getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID); + for (auto i = 0u; i < nodeOffsetChunk.getNumValues(); i++) { + auto nodeOffset = nodeOffsetChunk.getValue(i); + auto csrOffset = csrOffsetChunk.getValue(nodeOffset); + nodeOffsetChunk.setValue(csrOffset, i); + csrOffsetChunk.setValue(csrOffset + 1, nodeOffset); } } diff --git a/src/storage/local_storage/local_table.cpp b/src/storage/local_storage/local_table.cpp index d07cf36bca..2c5a35f562 100644 --- a/src/storage/local_storage/local_table.cpp +++ b/src/storage/local_storage/local_table.cpp @@ -3,7 +3,6 @@ #include "storage/local_storage/local_node_table.h" #include "storage/local_storage/local_rel_table.h" #include "storage/store/column.h" -#include "storage/store/node_group.h" using namespace kuzu::common; @@ -56,9 +55,9 @@ void LocalChunkedGroupCollection::update( KU_ASSERT(offsetToRowIdx.contains(offset)); auto rowIdx = offsetToRowIdx.at(offset); auto [chunkIdx, offsetInChunk] = getChunkIdxAndOffsetInChunk(rowIdx); - auto chunk = chunkedGroups.getChunkedGroupUnsafe(chunkIdx)->getColumnChunkUnsafe(columnID); - chunk->write( - propertyVector, propertyVector->state->selVector->selectedPositions[0], offsetInChunk); + auto& chunk = chunkedGroups.getChunkedGroupUnsafe(chunkIdx)->getColumnChunkUnsafe(columnID); + auto pos = propertyVector->state->selVector->selectedPositions[0]; + chunk.write(propertyVector, pos, offsetInChunk); } void LocalChunkedGroupCollection::remove(offset_t srcNodeOffset, offset_t relOffset) { @@ -76,14 +75,14 @@ row_idx_t LocalChunkedGroupCollection::append(std::vector vectors) KU_ASSERT(vectors.size() == dataTypes.size()); if (chunkedGroups.getNumChunks() == 0 || chunkedGroups.getChunkedGroup(chunkedGroups.getNumChunks() - 1)->getNumRows() == - CHUNK_CAPACITY) { + ChunkedNodeGroupCollection::CHUNK_CAPACITY) { chunkedGroups.append(std::make_unique( - dataTypes, false /*enableCompression*/, CHUNK_CAPACITY)); + dataTypes, false /*enableCompression*/, ChunkedNodeGroupCollection::CHUNK_CAPACITY)); } auto lastChunkGroup = chunkedGroups.getChunkedGroupUnsafe(chunkedGroups.getNumChunks() - 1); for (auto i = 0u; i < vectors.size(); i++) { KU_ASSERT(vectors[i]->state->selVector->selectedSize == 1); - lastChunkGroup->getColumnChunkUnsafe(i)->append(vectors[i], *vectors[i]->state->selVector); + lastChunkGroup->getColumnChunkUnsafe(i).append(vectors[i], *vectors[i]->state->selVector); } lastChunkGroup->setNumValues(lastChunkGroup->getNumRows() + 1); return numRows++; diff --git a/src/storage/store/CMakeLists.txt b/src/storage/store/CMakeLists.txt index 2c579e8276..62de9fd231 100644 --- a/src/storage/store/CMakeLists.txt +++ b/src/storage/store/CMakeLists.txt @@ -1,10 +1,11 @@ add_library(kuzu_storage_store OBJECT + chunked_node_group.cpp + chunked_node_group_collection.cpp column.cpp column_chunk.cpp dictionary_chunk.cpp dictionary_column.cpp - node_group.cpp node_table.cpp node_table_data.cpp null_column.cpp diff --git a/src/storage/store/node_group.cpp b/src/storage/store/chunked_node_group.cpp similarity index 87% rename from src/storage/store/node_group.cpp rename to src/storage/store/chunked_node_group.cpp index 9ddaf8f784..4d08b7fefe 100644 --- a/src/storage/store/node_group.cpp +++ b/src/storage/store/chunked_node_group.cpp @@ -1,4 +1,4 @@ -#include "storage/store/node_group.h" +#include "storage/store/chunked_node_group.h" #include "common/assert.h" #include "common/constants.h" @@ -94,12 +94,12 @@ void ChunkedNodeGroup::resizeChunks(uint64_t newSize) { } uint64_t ChunkedNodeGroup::append(const std::vector& columnVectors, - DataChunkState* columnState, uint64_t numValuesToAppend) { + SelectionVector& selVector, uint64_t numValuesToAppend) { auto numValuesToAppendInChunk = std::min(numValuesToAppend, StorageConstants::NODE_GROUP_SIZE - numRows); auto serialSkip = 0u; - auto originalSize = columnState->selVector->selectedSize; - columnState->selVector->selectedSize = numValuesToAppendInChunk; + auto originalSize = selVector.selectedSize; + selVector.selectedSize = numValuesToAppendInChunk; for (auto i = 0u; i < chunks.size(); i++) { auto chunk = chunks[i].get(); if (chunk->getDataType().getLogicalTypeID() == common::LogicalTypeID::SERIAL) { @@ -109,10 +109,9 @@ uint64_t ChunkedNodeGroup::append(const std::vector& columnVectors } KU_ASSERT((i - serialSkip) < columnVectors.size()); auto columnVector = columnVectors[i - serialSkip]; - KU_ASSERT(chunk->getDataType() == columnVector->dataType); - chunk->append(columnVector, *columnVector->state->selVector); + chunk->append(columnVector, selVector); } - columnState->selVector->selectedSize = originalSize; + selVector.selectedSize = originalSize; numRows += numValuesToAppendInChunk; return numValuesToAppendInChunk; } @@ -153,17 +152,6 @@ void ChunkedNodeGroup::finalize(uint64_t nodeGroupIdx_) { } } -void ChunkedNodeGroupCollection::append(std::unique_ptr chunkedGroup) { - if (chunkedGroups.size() > 1) { - KU_ASSERT(chunkedGroup->getNumColumns() == chunkedGroups[0]->getNumColumns()); - for (auto i = 0u; i < chunkedGroup->getNumColumns(); i++) { - KU_ASSERT(chunkedGroup->getColumnChunk(i).getDataType() == - chunkedGroups[0]->getColumnChunk(i).getDataType()); - } - } - chunkedGroups.push_back(std::move(chunkedGroup)); -} - ChunkedCSRHeader::ChunkedCSRHeader(bool enableCompression, uint64_t capacity) { offset = ColumnChunkFactory::createColumnChunk(*LogicalType::UINT64(), enableCompression, capacity); diff --git a/src/storage/store/chunked_node_group_collection.cpp b/src/storage/store/chunked_node_group_collection.cpp new file mode 100644 index 0000000000..b9119f04b9 --- /dev/null +++ b/src/storage/store/chunked_node_group_collection.cpp @@ -0,0 +1,50 @@ +#include "storage/store/chunked_node_group_collection.h" + +using namespace kuzu::common; + +namespace kuzu { +namespace storage { + +void ChunkedNodeGroupCollection::append( + const std::vector& vectors, const SelectionVector& selVector) { + if (chunkedGroups.empty()) { + chunkedGroups.push_back( + std::make_unique(types, false /*enableCompression*/, CHUNK_CAPACITY)); + } + auto numRowsToAppend = selVector.selectedSize; + row_idx_t numRowsAppended = 0; + SelectionVector tmpSelVector(numRowsToAppend); + while (numRowsAppended < numRowsToAppend) { + auto& lastChunkedGroup = chunkedGroups.back(); + auto numRowsToAppendInGroup = std::min(numRowsToAppend - numRowsAppended, + static_cast(CHUNK_CAPACITY - lastChunkedGroup->getNumRows())); + tmpSelVector.resetSelectorToValuePosBufferWithSize(numRowsToAppendInGroup); + for (auto i = 0u; i < numRowsToAppendInGroup; i++) { + tmpSelVector.selectedPositions[i] = selVector.selectedPositions[numRowsAppended + i]; + } + lastChunkedGroup->append(vectors, tmpSelVector, numRowsToAppendInGroup); + if (lastChunkedGroup->getNumRows() == CHUNK_CAPACITY) { + chunkedGroups.push_back(std::make_unique( + types, false /*enableCompression*/, CHUNK_CAPACITY)); + } + numRowsAppended += numRowsToAppendInGroup; + } +} + +void ChunkedNodeGroupCollection::append(std::unique_ptr chunkedGroup) { + KU_ASSERT(chunkedGroup->getNumColumns() == types.size()); + for (auto i = 0u; i < chunkedGroup->getNumColumns(); i++) { + KU_ASSERT(chunkedGroup->getColumnChunk(i).getDataType() == types[i]); + } + chunkedGroups.push_back(std::move(chunkedGroup)); +} + +void ChunkedNodeGroupCollection::merge(ChunkedNodeGroupCollection& chunkedGroupCollection) { + chunkedGroups.reserve(chunkedGroups.size() + chunkedGroupCollection.getNumChunks()); + for (auto& chunkedGroup : chunkedGroupCollection.chunkedGroups) { + append(std::move(chunkedGroup)); + } +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/store/node_table_data.cpp b/src/storage/store/node_table_data.cpp index 4df1b9ad59..aea7577412 100644 --- a/src/storage/store/node_table_data.cpp +++ b/src/storage/store/node_table_data.cpp @@ -99,9 +99,9 @@ void NodeTableData::lookup(Transaction* transaction, TableReadState& readState, void NodeTableData::append(ChunkedNodeGroup* nodeGroup) { for (auto columnID = 0u; columnID < columns.size(); columnID++) { - auto columnChunk = nodeGroup->getColumnChunkUnsafe(columnID); + auto& columnChunk = nodeGroup->getColumnChunkUnsafe(columnID); KU_ASSERT(columnID < columns.size()); - columns[columnID]->append(columnChunk, nodeGroup->getNodeGroupIdx()); + columns[columnID]->append(&columnChunk, nodeGroup->getNodeGroupIdx()); } } diff --git a/src/storage/store/rel_table_data.cpp b/src/storage/store/rel_table_data.cpp index feb03ae34e..adbc76bf1a 100644 --- a/src/storage/store/rel_table_data.cpp +++ b/src/storage/store/rel_table_data.cpp @@ -307,7 +307,7 @@ void RelTableData::append(ChunkedNodeGroup* nodeGroup) { csrHeaderColumns.append(csrNodeGroup->getCSRHeader(), nodeGroup->getNodeGroupIdx()); for (auto columnID = 0u; columnID < columns.size(); columnID++) { getColumn(columnID)->append( - nodeGroup->getColumnChunkUnsafe(columnID), nodeGroup->getNodeGroupIdx()); + &nodeGroup->getColumnChunkUnsafe(columnID), nodeGroup->getNodeGroupIdx()); } }