Skip to content

Commit

Permalink
Merge pull request #2979 from kuzudb/rel-memory-fix
Browse files Browse the repository at this point in the history
Re-write partitioner to use ColumnChunks instead of ValueVectors
  • Loading branch information
benjaminwinger committed Mar 8, 2024
2 parents 5e598ec + 38e4398 commit b7e3bc7
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 203 deletions.
20 changes: 15 additions & 5 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/data_chunk/data_chunk_collection.h"
#include "processor/operator/sink.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace storage {
Expand All @@ -20,7 +20,14 @@ struct PartitionerFunctions {
// partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD
// direction. Each partitioning method corresponds to a PartitioningState.
struct PartitioningBuffer {
std::vector<std::unique_ptr<common::DataChunkCollection>> partitions;
using ColumnChunkCollection = std::vector<std::unique_ptr<storage::ColumnChunk>>;
struct Partition {
// One chunk for each column, grouped into a list
// so that groups from different threads can be quickly merged without copying
// E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc.
std::vector<ColumnChunkCollection> chunks;
};
std::vector<Partition> partitions;

void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};
Expand Down Expand Up @@ -49,11 +56,11 @@ struct PartitionerSharedState {
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline common::DataChunkCollection* getPartitionBuffer(
inline PartitioningBuffer::Partition& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get();
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx];
}
};

Expand Down Expand Up @@ -102,7 +109,7 @@ class Partitioner : public Sink {

static void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, storage::MemoryManager* mm);
std::vector<common::partition_idx_t> numPartitions);

private:
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
Expand All @@ -111,6 +118,9 @@ class Partitioner : public Sink {
common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
// so this should be kept relatively small to avoid allocating more memory than is needed
static const uint64_t CHUNK_SIZE = 2048;
std::vector<std::unique_ptr<PartitioningInfo>> infos;
std::shared_ptr<PartitionerSharedState> sharedState;
std::unique_ptr<PartitionerLocalState> localState;
Expand Down
9 changes: 5 additions & 4 deletions src/include/processor/operator/persistent/rel_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "common/enums/rel_direction.h"
#include "processor/operator/partitioner.h"
#include "processor/operator/persistent/batch_insert.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_group.h"

namespace kuzu {
Expand Down Expand Up @@ -60,20 +61,20 @@ class RelBatchInsert final : public BatchInsert {
}

private:
void prepareCSRNodeGroup(common::DataChunkCollection* partition,
void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition,
common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx,
common::offset_t numNodes);

static common::length_t getGapSize(common::length_t length);
static std::vector<common::offset_t> populateStartCSROffsetsAndLengths(
storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes,
common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx);
PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx);
static void populateEndCSROffsets(
storage::CSRHeaderChunks& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
common::ValueVector* vector, common::offset_t startOffset);
storage::ColumnChunk& chunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk);
storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);

// We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now.
std::optional<common::offset_t> checkRelMultiplicityConstraint(
Expand Down
13 changes: 8 additions & 5 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ColumnChunk {
virtual ColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector);
virtual void appendOne(common::ValueVector* vector, common::vector_idx_t pos);
virtual void append(
ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend);

Expand All @@ -73,8 +74,7 @@ class ColumnChunk {
// `offsetInVector`, we should flatten the vector to pos at `offsetInVector`.
virtual void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk);
virtual void write(
common::ValueVector* vector, common::ValueVector* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* chunk, ColumnChunk* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy);
virtual void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down Expand Up @@ -151,12 +151,12 @@ class BoolColumnChunk : public ColumnChunk {
enableCompression, hasNullChunk) {}

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
};
Expand Down Expand Up @@ -206,8 +206,11 @@ class NullColumnChunk final : public BoolColumnChunk {
};

struct ColumnChunkFactory {
// inMemory starts string column chunk dictionaries at zero instead of reserving space for
// values to grow
static std::unique_ptr<ColumnChunk> createColumnChunk(common::LogicalType dataType,
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE,
bool inMemory = false);

static std::unique_ptr<ColumnChunk> createNullColumnChunk(
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) {
Expand Down
15 changes: 6 additions & 9 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "common/column_data_format.h"
#include "common/data_chunk/data_chunk.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand Down Expand Up @@ -32,15 +31,14 @@ class NodeGroup {
uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::DataChunkState* columnState, uint64_t numValuesToAppend);
common::offset_t append(NodeGroup* other, common::offset_t offsetInOtherNodeGroup);
void write(common::DataChunk* dataChunk, common::vector_idx_t offsetVector);
void write(std::vector<std::unique_ptr<ColumnChunk>>& data, common::vector_idx_t offsetVector);

void finalize(uint64_t nodeGroupIdx_);

virtual inline void writeToColumnChunk(common::vector_idx_t chunkIdx,
common::vector_idx_t vectorIdx, common::DataChunk* dataChunk,
common::ValueVector* offsetVector) {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, false /* isCSR */);
common::vector_idx_t vectorIdx, std::vector<std::unique_ptr<ColumnChunk>>& data,
ColumnChunk& offsetChunk) {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, false /*isCSR*/);
}

protected:
Expand Down Expand Up @@ -81,9 +79,8 @@ class CSRNodeGroup : public NodeGroup {
const CSRHeaderChunks& getCSRHeader() const { return csrHeaderChunks; }

inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx,
common::DataChunk* dataChunk, common::ValueVector* offsetVector) override {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, true /* isCSR */);
std::vector<std::unique_ptr<ColumnChunk>>& data, ColumnChunk& offsetChunk) override {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, true /* isCSR */);
}

private:
Expand Down
11 changes: 7 additions & 4 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
#pragma once

#include "common/assert.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
#include "storage/store/dictionary_chunk.h"

namespace kuzu {
namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
StringColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

void resetToEmpty() final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand All @@ -43,7 +46,7 @@ class StringColumnChunk : public ColumnChunk {
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);

void setValueFromString(const char* value, uint64_t length, uint64_t pos);
void setValueFromString(std::string_view value, uint64_t pos);

private:
std::unique_ptr<DictionaryChunk> dictionaryChunk;
Expand Down
9 changes: 6 additions & 3 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#pragma once

#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
StructColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

inline ColumnChunk* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childChunks.size());
Expand All @@ -20,11 +23,11 @@ class StructColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
7 changes: 4 additions & 3 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ struct VarListDataColumnChunk {
class VarListColumnChunk : public ColumnChunk {

public:
VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
VarListColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk->dataColumnChunk.get();
Expand All @@ -36,11 +37,11 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
// Note: `write` assumes that no `append` will be called afterward.
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
54 changes: 36 additions & 18 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "processor/operator/partitioner.h"

#include <cstdint>

#include "common/constants.h"
#include "processor/execution_context.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_table.h"

using namespace kuzu::common;
Expand Down Expand Up @@ -42,7 +47,7 @@ void PartitionerSharedState::initialize() {
numPartitions.resize(2);
numPartitions[0] = getNumPartitions(maxNodeOffsets[0]);
numPartitions[1] = getNumPartitions(maxNodeOffsets[1]);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions, mm);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions);
}

partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) {
Expand Down Expand Up @@ -71,9 +76,13 @@ void PartitionerSharedState::merge(
void PartitioningBuffer::merge(std::unique_ptr<PartitioningBuffer> localPartitioningState) {
KU_ASSERT(partitions.size() == localPartitioningState->partitions.size());
for (auto partitionIdx = 0u; partitionIdx < partitions.size(); partitionIdx++) {
auto sharedPartition = partitions[partitionIdx].get();
auto localPartition = localPartitioningState->partitions[partitionIdx].get();
sharedPartition->merge(localPartition);
auto& sharedPartition = partitions[partitionIdx];
auto& localPartition = localPartitioningState->partitions[partitionIdx];
sharedPartition.chunks.reserve(
sharedPartition.chunks.size() + localPartition.chunks.size());
for (auto j = 0u; j < localPartition.chunks.size(); j++) {
sharedPartition.chunks.push_back(std::move(localPartition.chunks[j]));
}
}
}

Expand All @@ -91,10 +100,9 @@ void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) {
sharedState->initialize();
}

void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* context) {
void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) {
localState = std::make_unique<PartitionerLocalState>();
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions,
context->clientContext->getMemoryManager());
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions);
}

static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>& columnPositions,
Expand All @@ -114,14 +122,14 @@ static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>&

void Partitioner::initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, MemoryManager* mm) {
std::vector<common::partition_idx_t> numPartitions) {
partitioningBuffers.resize(numPartitions.size());
for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) {
auto numPartition = numPartitions[partitioningIdx];
auto partitioningBuffer = std::make_unique<PartitioningBuffer>();
partitioningBuffer->partitions.reserve(numPartition);
for (auto i = 0u; i < numPartition; i++) {
partitioningBuffer->partitions.push_back(std::make_unique<DataChunkCollection>(mm));
partitioningBuffer->partitions.emplace_back();
}
partitioningBuffers[partitioningIdx] = std::move(partitioningBuffer);
}
Expand All @@ -146,18 +154,28 @@ void Partitioner::executeInternal(ExecutionContext* context) {

void Partitioner::copyDataToPartitions(
partition_idx_t partitioningIdx, DataChunk* chunkToCopyFrom) {
auto originalChunkState = chunkToCopyFrom->state;
chunkToCopyFrom->state = std::make_shared<DataChunkState>(1 /* capacity */);
chunkToCopyFrom->state->selVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
for (auto i = 0u; i < originalChunkState->selVector->selectedSize; i++) {
auto posToCopyFrom = originalChunkState->selVector->selectedPositions[i];
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
KU_ASSERT(
partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size());
auto partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get();
chunkToCopyFrom->state->selVector->selectedPositions[0] = posToCopyFrom;
partition->append(*chunkToCopyFrom);
auto& partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx];
if (partition.chunks.empty() || partition.chunks.back()[0]->getNumValues() + 1 >
partition.chunks.back()[0]->getCapacity()) {
partition.chunks.emplace_back();
partition.chunks.back().reserve(chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk(
chunkToCopyFrom->getValueVector(i)->dataType, false /*enableCompression*/,
Partitioner::CHUNK_SIZE));
}
}
KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back()[i]->appendOne(
chunkToCopyFrom->getValueVector(i).get(), posToCopyFrom);
}
}
}

Expand Down
Loading

0 comments on commit b7e3bc7

Please sign in to comment.