Skip to content

Commit

Permalink
Merge pull request #1942 from kuzudb/struct-local-storage
Browse files Browse the repository at this point in the history
Support struct update
  • Loading branch information
ray6080 committed Aug 16, 2023
2 parents 5fc4852 + 50e6a1d commit 06cbd43
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 33 deletions.
83 changes: 56 additions & 27 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ class NodeTable;

class LocalVector {
public:
LocalVector(const common::LogicalType& logicalType, storage::MemoryManager* mm) {
vector = std::make_unique<common::ValueVector>(logicalType, mm);
LocalVector(const common::LogicalType& dataType, MemoryManager* mm) {
vector = std::make_unique<common::ValueVector>(dataType, mm);
vector->setState(std::make_shared<common::DataChunkState>());
vector->state->selVector->resetSelectorToValuePosBuffer();
}

virtual ~LocalVector() = default;

void scan(common::ValueVector* resultVector) const;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
virtual void scan(common::ValueVector* resultVector) const;
virtual void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector);
virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector);
Expand All @@ -33,7 +33,7 @@ class LocalVector {

class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(storage::MemoryManager* mm)
explicit StringLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
0} {};

Expand All @@ -43,14 +43,29 @@ class StringLocalVector : public LocalVector {
uint64_t ovfStringLength;
};

class StructLocalVector : public LocalVector {
public:
explicit StructLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType{common::LogicalTypeID::STRUCT,
std::make_unique<common::StructTypeInfo>()},
mm} {}

void scan(common::ValueVector* resultVector) const final;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector) final;
void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;
};

struct LocalVectorFactory {
static std::unique_ptr<LocalVector> createLocalVectorData(
const common::LogicalType& logicalType, storage::MemoryManager* mm);
const common::LogicalType& logicalType, MemoryManager* mm);
};

class LocalColumnChunk {
public:
explicit LocalColumnChunk(storage::MemoryManager* mm) : mm{mm} {};
explicit LocalColumnChunk(common::LogicalType& dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm} {};

void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector);
void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInLocalVector,
Expand All @@ -59,73 +74,87 @@ class LocalColumnChunk {
common::ValueVector* vectorToWriteFrom, common::sel_t pos);

std::map<common::vector_idx_t, std::unique_ptr<LocalVector>> vectors;
storage::MemoryManager* mm;
common::LogicalType dataType;
MemoryManager* mm;
};

class LocalColumn {
public:
explicit LocalColumn(storage::NodeColumn* column) : column{column} {};
explicit LocalColumn(NodeColumn* column) : column{column} {};
virtual ~LocalColumn() = default;

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
storage::MemoryManager* mm);
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, storage::MemoryManager* mm);
virtual void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void update(
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector, MemoryManager* mm);
virtual void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm);

virtual void prepareCommit();

protected:
virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
storage::NodeColumn* column;
NodeColumn* column;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};
explicit StringLocalColumn(NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

class VarListLocalColumn : public LocalColumn {
public:
explicit VarListLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};
explicit VarListLocalColumn(NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class StructLocalColumn : public LocalColumn {
public:
explicit StructLocalColumn(NodeColumn* column);

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
MemoryManager* mm) final;
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm) final;

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;

private:
std::vector<std::unique_ptr<LocalColumn>> fields;
};

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(storage::NodeColumn* column);
static std::unique_ptr<LocalColumn> createLocalColumn(NodeColumn* column);
};

class LocalTable {
public:
explicit LocalTable(storage::NodeTable* table) : table{table} {};
explicit LocalTable(NodeTable* table) : table{table} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector,
common::ValueVector* propertyVector, storage::MemoryManager* mm);
common::ValueVector* propertyVector, MemoryManager* mm);
void update(common::property_id_t propertyID, common::offset_t nodeOffset,
common::ValueVector* propertyVector, common::sel_t posInPropertyVector,
storage::MemoryManager* mm);
common::ValueVector* propertyVector, common::sel_t posInPropertyVector, MemoryManager* mm);

void prepareCommit();

private:
std::map<common::property_id_t, std::unique_ptr<LocalColumn>> columns;
storage::NodeTable* table;
NodeTable* table;
};

} // namespace storage
Expand Down
8 changes: 8 additions & 0 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ struct BoolNodeColumnFunc {
};

class NullNodeColumn;
class StructNodeColumn;
// TODO(Guodong): This is intentionally duplicated with `Column`, as for now, we don't change rel
// tables. `Column` is used for rel tables only. Eventually, we should remove `Column`.
class NodeColumn {
friend class LocalColumn;
friend class StringLocalColumn;
friend class VarListLocalColumn;
friend class transaction::TransactionTests;
friend class StructNodeColumn;

public:
NodeColumn(const catalog::Property& property, BMFileHandle* dataFH, BMFileHandle* metadataFH,
Expand Down Expand Up @@ -84,6 +86,10 @@ class NodeColumn {
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return metadataDA->getNumElements(transaction->getType());
}
inline NodeColumn* getChildColumn(common::vector_idx_t childIdx) {
assert(childIdx < childrenColumns.size());
return childrenColumns[childIdx].get();
}

virtual void checkpointInMemory();
virtual void rollbackInMemory();
Expand Down Expand Up @@ -147,6 +153,8 @@ class BoolNodeColumn : public NodeColumn {
};

class NullNodeColumn : public NodeColumn {
friend StructNodeColumn;

public:
NullNodeColumn(common::page_idx_t metaDAHPageIdx, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class StructNodeColumn : public NodeColumn {
common::ValueVector* resultVector) final;
void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void writeInternal(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;
};

} // namespace storage
Expand Down
100 changes: 95 additions & 5 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "storage/copier/var_list_column_chunk.h"
#include "storage/store/node_table.h"
#include "storage/store/string_node_column.h"
#include "storage/store/struct_node_column.h"
#include "storage/store/var_list_node_column.h"

using namespace kuzu::common;
Expand Down Expand Up @@ -39,7 +40,7 @@ void LocalVector::update(
}

void StringLocalVector::update(
sel_t offsetInLocalVector, common::ValueVector* updateVector, sel_t offsetInUpdateVector) {
sel_t offsetInLocalVector, ValueVector* updateVector, sel_t offsetInUpdateVector) {
auto kuStr = updateVector->getValue<ku_string_t>(offsetInUpdateVector);
if (kuStr.len > BufferPoolConstants::PAGE_4KB_SIZE) {
throw RuntimeException(
Expand All @@ -50,12 +51,41 @@ void StringLocalVector::update(
LocalVector::update(offsetInLocalVector, updateVector, offsetInUpdateVector);
}

void StructLocalVector::scan(ValueVector* resultVector) const {
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto posInLocalVector = vector->state->selVector->selectedPositions[i];
auto posInResultVector = resultVector->state->selVector->selectedPositions[i];
resultVector->setNull(posInResultVector, vector->isNull(posInLocalVector));
}
}

void StructLocalVector::lookup(
sel_t offsetInLocalVector, ValueVector* resultVector, sel_t offsetInResultVector) {
if (!validityMask[offsetInLocalVector]) {
return;
}
resultVector->setNull(offsetInResultVector, vector->isNull(offsetInLocalVector));
}

void StructLocalVector::update(
sel_t offsetInLocalVector, ValueVector* updateVector, sel_t offsetInUpdateVector) {
vector->setNull(offsetInLocalVector, updateVector->isNull(offsetInUpdateVector));
if (!validityMask[offsetInLocalVector]) {
vector->state->selVector->selectedPositions[vector->state->selVector->selectedSize++] =
offsetInLocalVector;
validityMask[offsetInLocalVector] = true;
}
}

std::unique_ptr<LocalVector> LocalVectorFactory::createLocalVectorData(
const LogicalType& logicalType, MemoryManager* mm) {
switch (logicalType.getPhysicalType()) {
case PhysicalTypeID::STRING: {
return std::make_unique<StringLocalVector>(mm);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructLocalVector>(mm);
}
default: {
return std::make_unique<LocalVector>(logicalType, mm);
}
Expand All @@ -80,8 +110,7 @@ void LocalColumnChunk::lookup(vector_idx_t vectorIdx, sel_t offsetInLocalVector,
void LocalColumnChunk::update(vector_idx_t vectorIdx, sel_t offsetInLocalVector,
ValueVector* updateVector, sel_t offsetInUpdateVector) {
if (!vectors.contains(vectorIdx)) {
vectors.emplace(
vectorIdx, LocalVectorFactory::createLocalVectorData(updateVector->dataType, mm));
vectors.emplace(vectorIdx, LocalVectorFactory::createLocalVectorData(dataType, mm));
}
vectors.at(vectorIdx)->update(offsetInLocalVector, updateVector, offsetInUpdateVector);
}
Expand Down Expand Up @@ -128,7 +157,7 @@ void LocalColumn::update(offset_t nodeOffset, ValueVector* propertyVector,
sel_t posInPropertyVector, MemoryManager* mm) {
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
if (!chunks.contains(nodeGroupIdx)) {
chunks.emplace(nodeGroupIdx, std::make_unique<LocalColumnChunk>(mm));
chunks.emplace(nodeGroupIdx, std::make_unique<LocalColumnChunk>(column->dataType, mm));
}
auto chunk = chunks.at(nodeGroupIdx).get();
auto [vectorIdxInChunk, offsetInVector] =
Expand All @@ -144,6 +173,7 @@ void LocalColumn::prepareCommit() {

void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
auto nodeGroupStartOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
for (auto& [vectorIdx, vector] : chunk->vectors) {
auto vectorStartOffset =
Expand All @@ -157,6 +187,7 @@ void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
}

void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto localChunk = chunks.at(nodeGroupIdx).get();
auto stringColumn = reinterpret_cast<StringNodeColumn*>(column);
auto overflowMetadata =
Expand Down Expand Up @@ -193,12 +224,13 @@ void StringLocalColumn::commitLocalChunkOutOfPlace(
}

void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
auto varListColumn = reinterpret_cast<VarListNodeColumn*>(column);
auto listColumnChunkInStorage = ColumnChunkFactory::createColumnChunk(column->getDataType());
auto columnChunkToUpdate = ColumnChunkFactory::createColumnChunk(column->getDataType());
varListColumn->scan(nodeGroupIdx, listColumnChunkInStorage.get());
common::offset_t nextOffsetToWrite = 0;
offset_t nextOffsetToWrite = 0;
auto numNodesInGroup =
column->metadataDA->get(nodeGroupIdx, TransactionType::READ_ONLY).numValues;
for (auto& [vectorIdx, localVector] : chunk->vectors) {
Expand Down Expand Up @@ -236,11 +268,69 @@ void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
dataColumnChunk, startPageIdx, nodeGroupIdx);
}

StructLocalColumn::StructLocalColumn(NodeColumn* column) : LocalColumn{column} {
assert(column->getDataType().getLogicalTypeID() == LogicalTypeID::STRUCT);
auto dataType = column->getDataType();
auto structFields = StructType::getFields(&dataType);
fields.resize(structFields.size());
for (auto i = 0u; i < structFields.size(); i++) {
fields[i] = LocalColumnFactory::createLocalColumn(column->getChildColumn(i));
}
}

void StructLocalColumn::scan(ValueVector* nodeIDVector, ValueVector* resultVector) {
LocalColumn::scan(nodeIDVector, resultVector);
auto fieldVectors = StructVector::getFieldVectors(resultVector);
assert(fieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->scan(nodeIDVector, fieldVectors[i].get());
}
}

void StructLocalColumn::lookup(ValueVector* nodeIDVector, ValueVector* resultVector) {
LocalColumn::lookup(nodeIDVector, resultVector);
auto fieldVectors = StructVector::getFieldVectors(resultVector);
assert(fieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->lookup(nodeIDVector, fieldVectors[i].get());
}
}

void StructLocalColumn::update(
ValueVector* nodeIDVector, ValueVector* propertyVector, MemoryManager* mm) {
LocalColumn::update(nodeIDVector, propertyVector, mm);
auto propertyFieldVectors = StructVector::getFieldVectors(propertyVector);
assert(propertyFieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->update(nodeIDVector, propertyFieldVectors[i].get(), mm);
}
}

void StructLocalColumn::update(offset_t nodeOffset, ValueVector* propertyVector,
sel_t posInPropertyVector, MemoryManager* mm) {
LocalColumn::update(nodeOffset, propertyVector, posInPropertyVector, mm);
auto propertyFieldVectors = StructVector::getFieldVectors(propertyVector);
assert(propertyFieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->update(nodeOffset, propertyFieldVectors[i].get(), posInPropertyVector, mm);
}
}

void StructLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
LocalColumn::prepareCommitForChunk(nodeGroupIdx);
for (auto& field : fields) {
field->prepareCommitForChunk(nodeGroupIdx);
}
}

std::unique_ptr<LocalColumn> LocalColumnFactory::createLocalColumn(NodeColumn* column) {
switch (column->getDataType().getPhysicalType()) {
case PhysicalTypeID::STRING: {
return std::make_unique<StringLocalColumn>(column);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructLocalColumn>(column);
}
case PhysicalTypeID::VAR_LIST: {
return std::make_unique<VarListLocalColumn>(column);
}
Expand Down
Loading

0 comments on commit 06cbd43

Please sign in to comment.