Skip to content

Commit

Permalink
replace ValueVector with ColumnChunk in LocalStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 19, 2024
1 parent e963df1 commit 775d2e6
Show file tree
Hide file tree
Showing 50 changed files with 562 additions and 4,810 deletions.
4 changes: 4 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ template<>
void ValueVector::setValue(uint32_t pos, std::string val) {
StringVector::addString(this, pos, val.data(), val.length());
}
template<>
void ValueVector::setValue(uint32_t pos, std::string_view val) {
StringVector::addString(this, pos, val.data(), val.length());
}

void ValueVector::setNull(uint32_t pos, bool isNull) {
nullMask->setNull(pos, isNull);
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct BatchInsertSharedState {
};

struct BatchInsertLocalState {
std::unique_ptr<storage::NodeGroup> nodeGroup;
std::unique_ptr<storage::ChunkedNodeGroup> nodeGroup;

virtual ~BatchInsertLocalState() = default;
};
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState {
uint64_t currentNodeGroupIdx;
// The sharedNodeGroup is to accumulate left data within local node groups in NodeBatchInsert
// ops.
std::unique_ptr<storage::NodeGroup> sharedNodeGroup;
std::unique_ptr<storage::ChunkedNodeGroup> sharedNodeGroup;

NodeBatchInsertSharedState(
storage::Table* table, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal)
Expand All @@ -60,7 +60,7 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState {

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

void appendIncompleteNodeGroup(std::unique_ptr<storage::NodeGroup> localNodeGroup,
void appendIncompleteNodeGroup(std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder);

inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; }
Expand Down Expand Up @@ -107,7 +107,7 @@ class NodeBatchInsert final : public BatchInsert {

static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx,
std::optional<IndexBuilder>& indexBuilder, common::column_id_t pkColumnID,
storage::NodeTable* table, storage::NodeGroup* nodeGroup);
storage::NodeTable* table, storage::ChunkedNodeGroup* nodeGroup);

private:
void copyToNodeGroup();
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/persistent/rel_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,18 @@ class RelBatchInsert final : public BatchInsert {

static common::length_t getGapSize(common::length_t length);
static std::vector<common::offset_t> populateStartCSROffsetsAndLengths(
storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes,
storage::ChunkedCSRHeader& csrHeader, common::offset_t numNodes,
PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx);
static void populateEndCSROffsets(
storage::CSRHeaderChunks& csrHeader, std::vector<common::offset_t>& gaps);
storage::ChunkedCSRHeader& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
storage::ColumnChunk& chunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);

// We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now.
std::optional<common::offset_t> checkRelMultiplicityConstraint(
const storage::CSRHeaderChunks& csrHeader);
const storage::ChunkedCSRHeader& csrHeader);

private:
std::shared_ptr<PartitionerSharedState> partitionerSharedState;
Expand Down
12 changes: 7 additions & 5 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

#include <utility>

#include "common/copy_constructors.h"
#include "local_table.h"

namespace kuzu {
namespace storage {

class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::offset_t nodeGroupStartOffset,
const std::vector<common::LogicalType*>& dataTypes, MemoryManager* mm)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {}
LocalNodeNG(
common::offset_t nodeGroupStartOffset, const std::vector<common::LogicalType>& dataTypes)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes} {}
DELETE_COPY_DEFAULT_MOVE(LocalNodeNG);

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
Expand All @@ -35,8 +37,8 @@ class LocalNodeNG final : public LocalNodeGroup {

class LocalNodeTableData final : public LocalTableData {
public:
LocalNodeTableData(std::vector<common::LogicalType*> dataTypes, MemoryManager* mm)
: LocalTableData{std::move(dataTypes), mm} {}
explicit LocalNodeTableData(std::vector<common::LogicalType> dataTypes)
: LocalTableData{std::move(dataTypes)} {}

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
Expand Down
12 changes: 7 additions & 5 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/copy_constructors.h"
#include "common/enums/rel_multiplicity.h"
#include "common/vector/value_vector.h"
#include "storage/local_storage/local_table.h"
Expand All @@ -14,8 +15,9 @@ class LocalRelNG final : public LocalNodeGroup {
friend class RelTableData;

public:
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType*> dataTypes,
MemoryManager* mm, common::RelMultiplicity multiplicity);
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes,
common::RelMultiplicity multiplicity);
DELETE_COPY_DEFAULT_MOVE(LocalRelNG);

common::row_idx_t scanCSR(common::offset_t srcOffset, common::offset_t posToReadForOffset,
const std::vector<common::column_id_t>& columnIDs,
Expand Down Expand Up @@ -53,9 +55,9 @@ class LocalRelTableData final : public LocalTableData {
friend class RelTableData;

public:
LocalRelTableData(common::RelMultiplicity multiplicity,
std::vector<common::LogicalType*> dataTypes, MemoryManager* mm)
: LocalTableData{std::move(dataTypes), mm}, multiplicity{multiplicity} {}
LocalRelTableData(
common::RelMultiplicity multiplicity, std::vector<common::LogicalType> dataTypes)
: LocalTableData{std::move(dataTypes)}, multiplicity{multiplicity} {}

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
Expand Down
10 changes: 3 additions & 7 deletions src/include/storage/local_storage/local_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@

#include <unordered_map>

#include "common/copy_constructors.h"
#include "storage/local_storage/local_table.h"

namespace kuzu {
namespace catalog {
class TableCatalogEntry;
}
namespace storage {

class MemoryManager;

// Data structures in LocalStorage are not thread-safe.
// For now, we only support single thread insertions and updates. Once we optimize them with
// multiple threads, LocalStorage and its related data structures should be reworked to be
// thread-safe.
class LocalStorage {
public:
explicit LocalStorage(storage::MemoryManager* mm);
explicit LocalStorage() {}
DELETE_COPY_AND_MOVE(LocalStorage);

// This function will create the local table data if not exists.
LocalTableData* getOrCreateLocalTableData(common::table_id_t tableID,
Expand All @@ -32,7 +29,6 @@ class LocalStorage {

private:
std::unordered_map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
storage::MemoryManager* mm;
};

} // namespace storage
Expand Down
74 changes: 33 additions & 41 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

#include <unordered_map>

#include "common/data_chunk/data_chunk_collection.h"
#include "common/enums/rel_multiplicity.h"
#include "common/enums/table_type.h"
#include "common/vector/value_vector.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace storage {
Expand All @@ -18,26 +18,21 @@ using offset_set_t = std::unordered_set<common::offset_t>;
static constexpr common::column_id_t NBR_ID_COLUMN_ID = 0;
static constexpr common::column_id_t REL_ID_COLUMN_ID = 1;

struct LocalVectorCollection {
std::vector<common::ValueVector*> vectors;
using ChunkCollection = std::vector<ColumnChunk*>;

static LocalVectorCollection empty() { return LocalVectorCollection{}; }

inline bool isEmpty() const { return vectors.empty(); }
inline void appendVector(common::ValueVector* vector) { vectors.push_back(vector); }
inline common::ValueVector* getLocalVector(common::row_idx_t rowIdx) const {
auto vectorIdx = rowIdx >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
KU_ASSERT(vectorIdx < vectors.size());
return vectors[vectorIdx];
}
class LocalChunkedGroupCollection {
public:
static constexpr uint64_t CHUNK_CAPACITY = 2048;

LocalVectorCollection getStructChildVectorCollection(common::struct_field_idx_t idx) const;
};
explicit LocalChunkedGroupCollection(std::vector<common::LogicalType> dataTypes)
: dataTypes{std::move(dataTypes)}, numRows{0} {}
DELETE_COPY_DEFAULT_MOVE(LocalChunkedGroupCollection);

class LocalDataChunkCollection {
public:
LocalDataChunkCollection(MemoryManager* mm, std::vector<common::LogicalType> dataTypes)
: dataChunkCollection{mm}, mm{mm}, dataTypes{std::move(dataTypes)}, numRows{0} {}
static inline std::pair<uint32_t, uint64_t> getChunkIdxAndOffsetInChunk(
common::row_idx_t rowIdx) {
return std::make_pair(rowIdx / LocalChunkedGroupCollection::CHUNK_CAPACITY,
rowIdx % LocalChunkedGroupCollection::CHUNK_CAPACITY);
}

inline common::row_idx_t getRowIdxFromOffset(common::offset_t offset) {
KU_ASSERT(offsetToRowIdx.contains(offset));
Expand All @@ -61,12 +56,12 @@ class LocalDataChunkCollection {

bool isEmpty() const { return offsetToRowIdx.empty() && srcNodeOffsetToRelOffsets.empty(); }
void readValueAtRowIdx(common::row_idx_t rowIdx, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);
common::ValueVector* outputVector, common::sel_t posInOutputVector) const;
bool read(common::offset_t offset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);

inline void append(common::offset_t offset, std::vector<common::ValueVector*> vectors) {
offsetToRowIdx[offset] = appendToDataChunkCollection(vectors);
offsetToRowIdx[offset] = append(vectors);
}
// Only used for rel tables. Should be moved out later.
inline void append(common::offset_t nodeOffset, common::offset_t relOffset,
Expand All @@ -84,23 +79,21 @@ class LocalDataChunkCollection {
// Only used for rel tables. Should be moved out later.
void remove(common::offset_t srcNodeOffset, common::offset_t relOffset);

inline LocalVectorCollection getLocalChunk(common::column_id_t columnID) {
LocalVectorCollection localVectorCollection;
for (auto& chunk : dataChunkCollection.getChunksUnsafe()) {
localVectorCollection.appendVector(chunk.getValueVector(columnID).get());
inline ChunkCollection getLocalChunk(common::column_id_t columnID) {
ChunkCollection localChunkCollection;
for (auto& chunkedGroup : chunkedGroups.getChunkedGroups()) {
localChunkCollection.push_back(chunkedGroup->getColumnChunkUnsafe(columnID));
}
return localVectorCollection;
return localChunkCollection;
}

private:
common::row_idx_t appendToDataChunkCollection(std::vector<common::ValueVector*> vectors);
common::DataChunk createNewDataChunk();
common::row_idx_t append(std::vector<common::ValueVector*> vectors);

private:
common::DataChunkCollection dataChunkCollection;
ChunkedNodeGroupCollection chunkedGroups;
// The offset here can either be nodeOffset ( for node table) or relOffset (for rel table).
offset_to_row_idx_t offsetToRowIdx;
storage::MemoryManager* mm;
std::vector<common::LogicalType> dataTypes;
common::row_idx_t numRows;

Expand Down Expand Up @@ -147,8 +140,9 @@ class LocalDeletionInfo {

class LocalNodeGroup {
public:
LocalNodeGroup(common::offset_t nodeGroupStartOffset,
std::vector<common::LogicalType*> dataTypes, MemoryManager* mm);
LocalNodeGroup(
common::offset_t nodeGroupStartOffset, const std::vector<common::LogicalType>& dataTypes);
DELETE_COPY_DEFAULT_MOVE(LocalNodeGroup);
virtual ~LocalNodeGroup() = default;

virtual bool insert(std::vector<common::ValueVector*> nodeIDVectors,
Expand All @@ -157,29 +151,28 @@ class LocalNodeGroup {
common::column_id_t columnID, common::ValueVector* propertyVector) = 0;
virtual bool delete_(common::ValueVector* IDVector, common::ValueVector* extraVector) = 0;

LocalDataChunkCollection& getUpdateChunks(common::column_id_t columnID) {
LocalChunkedGroupCollection& getUpdateChunks(common::column_id_t columnID) {
KU_ASSERT(columnID < updateChunks.size());
return updateChunks[columnID];
}
LocalDataChunkCollection& getInsesrtChunks() { return insertChunks; }
LocalChunkedGroupCollection& getInsesrtChunks() { return insertChunks; }

bool hasUpdatesOrDeletions() const;

protected:
common::offset_t nodeGroupStartOffset;
storage::MemoryManager* mm;

LocalDataChunkCollection insertChunks;
LocalChunkedGroupCollection insertChunks;
LocalDeletionInfo deleteInfo;
std::vector<LocalDataChunkCollection> updateChunks;
std::vector<LocalChunkedGroupCollection> updateChunks;
};

class LocalTableData {
friend class NodeTableData;

public:
LocalTableData(std::vector<common::LogicalType*> dataTypes, MemoryManager* mm)
: dataTypes{std::move(dataTypes)}, mm{mm} {}
explicit LocalTableData(std::vector<common::LogicalType> dataTypes)
: dataTypes{std::move(dataTypes)} {}
virtual ~LocalTableData() = default;

inline void clear() { nodeGroups.clear(); }
Expand All @@ -194,8 +187,7 @@ class LocalTableData {
virtual LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) = 0;

protected:
std::vector<common::LogicalType*> dataTypes;
MemoryManager* mm;
std::vector<common::LogicalType> dataTypes;

std::unordered_map<common::node_group_idx_t, std::unique_ptr<LocalNodeGroup>> nodeGroups;
};
Expand All @@ -206,7 +198,7 @@ class LocalTable {
explicit LocalTable(common::TableType tableType) : tableType{tableType} {};

LocalTableData* getOrCreateLocalTableData(const std::vector<std::unique_ptr<Column>>& columns,
MemoryManager* mm, common::vector_idx_t dataIdx, common::RelMultiplicity multiplicity);
common::vector_idx_t dataIdx, common::RelMultiplicity multiplicity);
inline LocalTableData* getLocalTableData(common::vector_idx_t dataIdx) {
KU_ASSERT(dataIdx < localTableDataCollection.size());
return localTableDataCollection[dataIdx].get();
Expand Down
Loading

0 comments on commit 775d2e6

Please sign in to comment.