Skip to content

Commit

Permalink
Refactor Partitioner to use ChunkedNodeGroupCollection (#3123)
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 23, 2024
1 parent e60e8cd commit 365815b
Show file tree
Hide file tree
Showing 21 changed files with 191 additions and 161 deletions.
21 changes: 6 additions & 15 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "processor/operator/sink.h"
#include "storage/store/column_chunk.h"
#include "storage/store/chunked_node_group_collection.h"

namespace kuzu {
namespace storage {
Expand All @@ -20,21 +20,15 @@ struct PartitionerFunctions {
// partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD
// direction. Each partitioning method corresponds to a PartitioningState.
struct PartitioningBuffer {
using ColumnChunkCollection = std::vector<std::unique_ptr<storage::ColumnChunk>>;
struct Partition {
// One chunk for each column, grouped into a list
// so that groups from different threads can be quickly merged without copying
// E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc.
std::vector<ColumnChunkCollection> chunks;
};
std::vector<Partition> partitions;
std::vector<storage::ChunkedNodeGroupCollection> partitions;

void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};

// NOTE: Currently, Partitioner is tightly coupled with RelBatchInsert. We should generalize it
// later when necessary. Here, each partition is essentially a node group.
struct BatchInsertSharedState;
struct PartitioningInfo;
struct PartitionerSharedState {
std::mutex mtx;
storage::MemoryManager* mm;
Expand All @@ -51,12 +45,12 @@ struct PartitionerSharedState {

explicit PartitionerSharedState(storage::MemoryManager* mm) : mm{mm} {}

void initialize();
void initialize(std::vector<std::unique_ptr<PartitioningInfo>>& infos);
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline PartitioningBuffer::Partition& getPartitionBuffer(
inline storage::ChunkedNodeGroupCollection& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
Expand Down Expand Up @@ -107,7 +101,7 @@ class Partitioner : public Sink {

std::unique_ptr<PhysicalOperator> clone() final;

static void initializePartitioningStates(
static void initializePartitioningStates(std::vector<std::unique_ptr<PartitioningInfo>>& infos,
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions);

Expand All @@ -121,9 +115,6 @@ class Partitioner : public Sink {
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
// so this should be kept relatively small to avoid allocating more memory than is needed
static const uint64_t CHUNK_SIZE = 2048;
std::vector<std::unique_ptr<PartitioningInfo>> infos;
std::shared_ptr<PartitionerSharedState> sharedState;
std::unique_ptr<PartitionerLocalState> localState;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/persistent/index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,16 @@ class IndexBuilder {
IndexBuilder clone() { return IndexBuilder(sharedState); }

void insert(
storage::ColumnChunk* chunk, common::offset_t nodeOffset, common::offset_t numNodes);
const storage::ColumnChunk& chunk, common::offset_t nodeOffset, common::offset_t numNodes);

ProducerToken getProducerToken() const { return ProducerToken(sharedState); }

void finishedProducing();
void finalize(ExecutionContext* context);

private:
void checkNonNullConstraint(storage::NullColumnChunk* nullChunk, common::offset_t numNodes);
void checkNonNullConstraint(
const storage::NullColumnChunk& nullChunk, common::offset_t numNodes);
std::shared_ptr<IndexBuilderSharedState> sharedState;

IndexBuilderLocalBuffers localBuffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "processor/operator/call/in_query_call.h"
#include "processor/operator/persistent/batch_insert.h"
#include "processor/operator/persistent/index_builder.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table.h"

namespace kuzu {
Expand Down
20 changes: 10 additions & 10 deletions src/include/processor/operator/persistent/rel_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@
#include "common/enums/rel_direction.h"
#include "processor/operator/partitioner.h"
#include "processor/operator/persistent/batch_insert.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace processor {

struct RelBatchInsertInfo final : public BatchInsertInfo {
common::RelDataDirection direction;
uint64_t partitioningIdx;
common::vector_idx_t offsetVectorIdx;
common::column_id_t offsetColumnID;
std::vector<common::LogicalType> columnTypes;

RelBatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled,
common::RelDataDirection direction, uint64_t partitioningIdx,
common::vector_idx_t offsetVectorIdx, std::vector<common::LogicalType> columnTypes)
common::column_id_t offsetColumnID, std::vector<common::LogicalType> columnTypes)
: BatchInsertInfo{tableEntry, compressionEnabled}, direction{direction},
partitioningIdx{partitioningIdx}, offsetVectorIdx{offsetVectorIdx}, columnTypes{std::move(
columnTypes)} {}
partitioningIdx{partitioningIdx}, offsetColumnID{offsetColumnID}, columnTypes{std::move(
columnTypes)} {}
RelBatchInsertInfo(const RelBatchInsertInfo& other)
: BatchInsertInfo{other.tableEntry, other.compressionEnabled}, direction{other.direction},
partitioningIdx{other.partitioningIdx}, offsetVectorIdx{other.offsetVectorIdx},
partitioningIdx{other.partitioningIdx}, offsetColumnID{other.offsetColumnID},
columnTypes{common::LogicalType::copy(other.columnTypes)} {}

inline std::unique_ptr<BatchInsertInfo> copy() const override {
Expand Down Expand Up @@ -60,20 +60,20 @@ class RelBatchInsert final : public BatchInsert {
}

private:
void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition,
common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx,
void prepareCSRNodeGroup(storage::ChunkedNodeGroupCollection& partition,
common::offset_t startNodeOffset, common::column_id_t offsetColumnID,
common::offset_t numNodes);

static common::length_t getGapSize(common::length_t length);
static std::vector<common::offset_t> populateStartCSROffsetsAndLengths(
storage::ChunkedCSRHeader& csrHeader, common::offset_t numNodes,
PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx);
storage::ChunkedNodeGroupCollection& partition, common::column_id_t offsetColumnID);
static void populateEndCSROffsets(
storage::ChunkedCSRHeader& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
storage::ColumnChunk& chunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);
storage::ColumnChunk& nodeOffsetChunk, storage::ColumnChunk& csrOffsetChunk);

// We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now.
std::optional<common::offset_t> checkRelMultiplicityConstraint(
Expand Down
16 changes: 7 additions & 9 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "common/enums/rel_multiplicity.h"
#include "common/enums/table_type.h"
#include "common/vector/value_vector.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group_collection.h"

namespace kuzu {
namespace storage {
Expand All @@ -22,16 +22,14 @@ using ChunkCollection = std::vector<ColumnChunk*>;

class LocalChunkedGroupCollection {
public:
static constexpr uint64_t CHUNK_CAPACITY = 2048;

explicit LocalChunkedGroupCollection(std::vector<common::LogicalType> dataTypes)
: dataTypes{std::move(dataTypes)}, numRows{0} {}
: dataTypes{std::move(dataTypes)}, chunkedGroups{this->dataTypes}, numRows{0} {}
DELETE_COPY_DEFAULT_MOVE(LocalChunkedGroupCollection);

static inline std::pair<uint32_t, uint64_t> getChunkIdxAndOffsetInChunk(
common::row_idx_t rowIdx) {
return std::make_pair(rowIdx / LocalChunkedGroupCollection::CHUNK_CAPACITY,
rowIdx % LocalChunkedGroupCollection::CHUNK_CAPACITY);
return std::make_pair(rowIdx / ChunkedNodeGroupCollection::CHUNK_CAPACITY,
rowIdx % ChunkedNodeGroupCollection::CHUNK_CAPACITY);
}

inline common::row_idx_t getRowIdxFromOffset(common::offset_t offset) {
Expand Down Expand Up @@ -82,7 +80,7 @@ class LocalChunkedGroupCollection {
inline ChunkCollection getLocalChunk(common::column_id_t columnID) {
ChunkCollection localChunkCollection;
for (auto& chunkedGroup : chunkedGroups.getChunkedGroups()) {
localChunkCollection.push_back(chunkedGroup->getColumnChunkUnsafe(columnID));
localChunkCollection.push_back(&chunkedGroup->getColumnChunkUnsafe(columnID));
}
return localChunkCollection;
}
Expand All @@ -91,10 +89,10 @@ class LocalChunkedGroupCollection {
common::row_idx_t append(std::vector<common::ValueVector*> vectors);

private:
ChunkedNodeGroupCollection chunkedGroups;
std::vector<common::LogicalType> dataTypes;
storage::ChunkedNodeGroupCollection chunkedGroups;
// The offset here can either be nodeOffset ( for node table) or relOffset (for rel table).
offset_to_row_idx_t offsetToRowIdx;
std::vector<common::LogicalType> dataTypes;
common::row_idx_t numRows;

// Only used for rel tables. Should be moved out later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ class ChunkedNodeGroup {
KU_ASSERT(columnID < chunks.size());
return *chunks[columnID];
}
inline ColumnChunk* getColumnChunkUnsafe(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return chunks[columnID].get();
}
inline const ColumnChunk& getColumnChunk(common::column_id_t columnID) {
inline ColumnChunk& getColumnChunkUnsafe(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return *chunks[columnID];
}
inline std::vector<std::unique_ptr<ColumnChunk>>& getColumnChunksUnsafe() { return chunks; }
inline bool isFull() const { return numRows == common::StorageConstants::NODE_GROUP_SIZE; }

void resetToEmpty();
Expand All @@ -39,7 +36,7 @@ class ChunkedNodeGroup {
void resizeChunks(uint64_t newSize);

uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::DataChunkState* columnState, uint64_t numValuesToAppend);
common::SelectionVector& selVector, uint64_t numValuesToAppend);
common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup);
void write(std::vector<std::unique_ptr<ColumnChunk>>& data, common::vector_idx_t offsetVector);

Expand Down Expand Up @@ -98,32 +95,6 @@ class ChunkedCSRNodeGroup : public ChunkedNodeGroup {
ChunkedCSRHeader csrHeader;
};

class ChunkedNodeGroupCollection {
public:
ChunkedNodeGroupCollection() {}

inline const std::vector<std::unique_ptr<ChunkedNodeGroup>>& getChunkedGroups() const {
return chunkedGroups;
}
inline const ChunkedNodeGroup* getChunkedGroup(uint64_t groupIdx) const {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline ChunkedNodeGroup* getChunkedGroupUnsafe(uint64_t groupIdx) {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline uint64_t getNumChunks() const { return chunkedGroups.size(); }
void append(std::unique_ptr<ChunkedNodeGroup> chunkedGroup);

private:
// Assert that all chunked node groups have the same num columns and same data types.
bool sanityCheckForAppend();

private:
std::vector<std::unique_ptr<ChunkedNodeGroup>> chunkedGroups;
};

struct NodeGroupFactory {
static inline std::unique_ptr<ChunkedNodeGroup> createNodeGroup(
common::ColumnDataFormat dataFormat, const std::vector<common::LogicalType>& columnTypes,
Expand Down
40 changes: 40 additions & 0 deletions src/include/storage/store/chunked_node_group_collection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "storage/store/chunked_node_group.h"

namespace kuzu {
namespace storage {

class ChunkedNodeGroupCollection {
public:
static constexpr uint64_t CHUNK_CAPACITY = 2048;

explicit ChunkedNodeGroupCollection(std::vector<common::LogicalType> types)
: types{std::move(types)} {}
DELETE_COPY_DEFAULT_MOVE(ChunkedNodeGroupCollection);

inline const std::vector<std::unique_ptr<ChunkedNodeGroup>>& getChunkedGroups() const {
return chunkedGroups;
}
inline const ChunkedNodeGroup* getChunkedGroup(uint64_t groupIdx) const {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline ChunkedNodeGroup* getChunkedGroupUnsafe(uint64_t groupIdx) {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline uint64_t getNumChunks() const { return chunkedGroups.size(); }

void append(
const std::vector<common::ValueVector*>& vectors, const common::SelectionVector& selVector);
void append(std::unique_ptr<ChunkedNodeGroup> chunkedGroup);
void merge(ChunkedNodeGroupCollection& chunkedGroupCollection);

private:
std::vector<common::LogicalType> types;
std::vector<std::unique_ptr<ChunkedNodeGroup>> chunkedGroups;
};

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ColumnChunk {
}

inline NullColumnChunk* getNullChunk() { return nullChunk.get(); }
inline const NullColumnChunk& getNullChunk() const { return *nullChunk; }
inline common::LogicalType& getDataType() { return dataType; }
inline const common::LogicalType& getDataType() const { return dataType; }

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "common/cast.h"
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_store_statistics.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table_data.h"
#include "storage/store/table.h"

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/enums/rel_direction.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/table_data.h"

namespace kuzu {
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "storage/store/chunked_node_group.h"
#include "storage/store/column.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace storage {
Expand Down
Loading

0 comments on commit 365815b

Please sign in to comment.