Skip to content

Commit

Permalink
Re-write partitioner to use ColumnChunks instead of ValueVectors
Browse files Browse the repository at this point in the history
ValueVectors have high memory fragmentation, and allocate strings in
256KB chunks for only 2048 strings.
ColumnChunks can have a much larger capacity, and also support string
de-duplication.
  • Loading branch information
benjaminwinger committed Mar 8, 2024
1 parent 5e598ec commit 38e4398
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 203 deletions.
20 changes: 15 additions & 5 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/data_chunk/data_chunk_collection.h"
#include "processor/operator/sink.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace storage {
Expand All @@ -20,7 +20,14 @@ struct PartitionerFunctions {
// partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD
// direction. Each partitioning method corresponds to a PartitioningState.
struct PartitioningBuffer {
std::vector<std::unique_ptr<common::DataChunkCollection>> partitions;
using ColumnChunkCollection = std::vector<std::unique_ptr<storage::ColumnChunk>>;
struct Partition {
// One chunk for each column, grouped into a list
// so that groups from different threads can be quickly merged without copying
// E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc.
std::vector<ColumnChunkCollection> chunks;
};
std::vector<Partition> partitions;

void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};
Expand Down Expand Up @@ -49,11 +56,11 @@ struct PartitionerSharedState {
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline common::DataChunkCollection* getPartitionBuffer(
inline PartitioningBuffer::Partition& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get();
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx];
}
};

Expand Down Expand Up @@ -102,7 +109,7 @@ class Partitioner : public Sink {

static void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, storage::MemoryManager* mm);
std::vector<common::partition_idx_t> numPartitions);

private:
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
Expand All @@ -111,6 +118,9 @@ class Partitioner : public Sink {
common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
// so this should be kept relatively small to avoid allocating more memory than is needed
static const uint64_t CHUNK_SIZE = 2048;
std::vector<std::unique_ptr<PartitioningInfo>> infos;
std::shared_ptr<PartitionerSharedState> sharedState;
std::unique_ptr<PartitionerLocalState> localState;
Expand Down
9 changes: 5 additions & 4 deletions src/include/processor/operator/persistent/rel_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "common/enums/rel_direction.h"
#include "processor/operator/partitioner.h"
#include "processor/operator/persistent/batch_insert.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_group.h"

namespace kuzu {
Expand Down Expand Up @@ -60,20 +61,20 @@ class RelBatchInsert final : public BatchInsert {
}

private:
void prepareCSRNodeGroup(common::DataChunkCollection* partition,
void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition,
common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx,
common::offset_t numNodes);

static common::length_t getGapSize(common::length_t length);
static std::vector<common::offset_t> populateStartCSROffsetsAndLengths(
storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes,
common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx);
PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx);
static void populateEndCSROffsets(
storage::CSRHeaderChunks& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
common::ValueVector* vector, common::offset_t startOffset);
storage::ColumnChunk& chunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk);
storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);

// We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now.
std::optional<common::offset_t> checkRelMultiplicityConstraint(
Expand Down
13 changes: 8 additions & 5 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ColumnChunk {
virtual ColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector);
virtual void appendOne(common::ValueVector* vector, common::vector_idx_t pos);
virtual void append(
ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend);

Expand All @@ -73,8 +74,7 @@ class ColumnChunk {
// `offsetInVector`, we should flatten the vector to pos at `offsetInVector`.
virtual void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk);
virtual void write(
common::ValueVector* vector, common::ValueVector* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* chunk, ColumnChunk* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy);
virtual void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down Expand Up @@ -151,12 +151,12 @@ class BoolColumnChunk : public ColumnChunk {
enableCompression, hasNullChunk) {}

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
};
Expand Down Expand Up @@ -206,8 +206,11 @@ class NullColumnChunk final : public BoolColumnChunk {
};

struct ColumnChunkFactory {
// inMemory starts string column chunk dictionaries at zero instead of reserving space for
// values to grow
static std::unique_ptr<ColumnChunk> createColumnChunk(common::LogicalType dataType,
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE,
bool inMemory = false);

static std::unique_ptr<ColumnChunk> createNullColumnChunk(
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) {
Expand Down
15 changes: 6 additions & 9 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "common/column_data_format.h"
#include "common/data_chunk/data_chunk.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand Down Expand Up @@ -32,15 +31,14 @@ class NodeGroup {
uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::DataChunkState* columnState, uint64_t numValuesToAppend);
common::offset_t append(NodeGroup* other, common::offset_t offsetInOtherNodeGroup);
void write(common::DataChunk* dataChunk, common::vector_idx_t offsetVector);
void write(std::vector<std::unique_ptr<ColumnChunk>>& data, common::vector_idx_t offsetVector);

void finalize(uint64_t nodeGroupIdx_);

virtual inline void writeToColumnChunk(common::vector_idx_t chunkIdx,
common::vector_idx_t vectorIdx, common::DataChunk* dataChunk,
common::ValueVector* offsetVector) {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, false /* isCSR */);
common::vector_idx_t vectorIdx, std::vector<std::unique_ptr<ColumnChunk>>& data,
ColumnChunk& offsetChunk) {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, false /*isCSR*/);

Check warning on line 41 in src/include/storage/store/node_group.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/store/node_group.h#L41

Added line #L41 was not covered by tests
}

protected:
Expand Down Expand Up @@ -81,9 +79,8 @@ class CSRNodeGroup : public NodeGroup {
const CSRHeaderChunks& getCSRHeader() const { return csrHeaderChunks; }

inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx,
common::DataChunk* dataChunk, common::ValueVector* offsetVector) override {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, true /* isCSR */);
std::vector<std::unique_ptr<ColumnChunk>>& data, ColumnChunk& offsetChunk) override {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, true /* isCSR */);
}

private:
Expand Down
11 changes: 7 additions & 4 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
#pragma once

#include "common/assert.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
#include "storage/store/dictionary_chunk.h"

namespace kuzu {
namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
StringColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

void resetToEmpty() final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand All @@ -43,7 +46,7 @@ class StringColumnChunk : public ColumnChunk {
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);

void setValueFromString(const char* value, uint64_t length, uint64_t pos);
void setValueFromString(std::string_view value, uint64_t pos);

private:
std::unique_ptr<DictionaryChunk> dictionaryChunk;
Expand Down
9 changes: 6 additions & 3 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#pragma once

#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
StructColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

inline ColumnChunk* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childChunks.size());
Expand All @@ -20,11 +23,11 @@ class StructColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
7 changes: 4 additions & 3 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ struct VarListDataColumnChunk {
class VarListColumnChunk : public ColumnChunk {

public:
VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
VarListColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk->dataColumnChunk.get();
Expand All @@ -36,11 +37,11 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
// Note: `write` assumes that no `append` will be called afterward.
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
54 changes: 36 additions & 18 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "processor/operator/partitioner.h"

#include <cstdint>

#include "common/constants.h"
#include "processor/execution_context.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_table.h"

using namespace kuzu::common;
Expand Down Expand Up @@ -42,7 +47,7 @@ void PartitionerSharedState::initialize() {
numPartitions.resize(2);
numPartitions[0] = getNumPartitions(maxNodeOffsets[0]);
numPartitions[1] = getNumPartitions(maxNodeOffsets[1]);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions, mm);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions);
}

partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) {
Expand Down Expand Up @@ -71,9 +76,13 @@ void PartitionerSharedState::merge(
void PartitioningBuffer::merge(std::unique_ptr<PartitioningBuffer> localPartitioningState) {
KU_ASSERT(partitions.size() == localPartitioningState->partitions.size());
for (auto partitionIdx = 0u; partitionIdx < partitions.size(); partitionIdx++) {
auto sharedPartition = partitions[partitionIdx].get();
auto localPartition = localPartitioningState->partitions[partitionIdx].get();
sharedPartition->merge(localPartition);
auto& sharedPartition = partitions[partitionIdx];
auto& localPartition = localPartitioningState->partitions[partitionIdx];
sharedPartition.chunks.reserve(
sharedPartition.chunks.size() + localPartition.chunks.size());
for (auto j = 0u; j < localPartition.chunks.size(); j++) {
sharedPartition.chunks.push_back(std::move(localPartition.chunks[j]));
}
}
}

Expand All @@ -91,10 +100,9 @@ void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) {
sharedState->initialize();
}

void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* context) {
void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) {
localState = std::make_unique<PartitionerLocalState>();
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions,
context->clientContext->getMemoryManager());
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions);
}

static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>& columnPositions,
Expand All @@ -114,14 +122,14 @@ static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>&

void Partitioner::initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, MemoryManager* mm) {
std::vector<common::partition_idx_t> numPartitions) {
partitioningBuffers.resize(numPartitions.size());
for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) {
auto numPartition = numPartitions[partitioningIdx];
auto partitioningBuffer = std::make_unique<PartitioningBuffer>();
partitioningBuffer->partitions.reserve(numPartition);
for (auto i = 0u; i < numPartition; i++) {
partitioningBuffer->partitions.push_back(std::make_unique<DataChunkCollection>(mm));
partitioningBuffer->partitions.emplace_back();
}
partitioningBuffers[partitioningIdx] = std::move(partitioningBuffer);
}
Expand All @@ -146,18 +154,28 @@ void Partitioner::executeInternal(ExecutionContext* context) {

void Partitioner::copyDataToPartitions(
partition_idx_t partitioningIdx, DataChunk* chunkToCopyFrom) {
auto originalChunkState = chunkToCopyFrom->state;
chunkToCopyFrom->state = std::make_shared<DataChunkState>(1 /* capacity */);
chunkToCopyFrom->state->selVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
for (auto i = 0u; i < originalChunkState->selVector->selectedSize; i++) {
auto posToCopyFrom = originalChunkState->selVector->selectedPositions[i];
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
KU_ASSERT(
partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size());
auto partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get();
chunkToCopyFrom->state->selVector->selectedPositions[0] = posToCopyFrom;
partition->append(*chunkToCopyFrom);
auto& partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx];
if (partition.chunks.empty() || partition.chunks.back()[0]->getNumValues() + 1 >
partition.chunks.back()[0]->getCapacity()) {
partition.chunks.emplace_back();
partition.chunks.back().reserve(chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk(
chunkToCopyFrom->getValueVector(i)->dataType, false /*enableCompression*/,
Partitioner::CHUNK_SIZE));
}
}
KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back()[i]->appendOne(
chunkToCopyFrom->getValueVector(i).get(), posToCopyFrom);
}
}
}

Expand Down
Loading

0 comments on commit 38e4398

Please sign in to comment.