Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support struct update #1942

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 56 additions & 27 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ class NodeTable;

class LocalVector {
public:
LocalVector(const common::LogicalType& logicalType, storage::MemoryManager* mm) {
vector = std::make_unique<common::ValueVector>(logicalType, mm);
LocalVector(const common::LogicalType& dataType, MemoryManager* mm) {
vector = std::make_unique<common::ValueVector>(dataType, mm);
vector->setState(std::make_shared<common::DataChunkState>());
vector->state->selVector->resetSelectorToValuePosBuffer();
}

virtual ~LocalVector() = default;

void scan(common::ValueVector* resultVector) const;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
virtual void scan(common::ValueVector* resultVector) const;
virtual void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector);
virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector);
Expand All @@ -33,7 +33,7 @@ class LocalVector {

class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(storage::MemoryManager* mm)
explicit StringLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
0} {};

Expand All @@ -43,14 +43,29 @@ class StringLocalVector : public LocalVector {
uint64_t ovfStringLength;
};

class StructLocalVector : public LocalVector {
public:
explicit StructLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType{common::LogicalTypeID::STRUCT,
std::make_unique<common::StructTypeInfo>()},
mm} {}

void scan(common::ValueVector* resultVector) const final;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector) final;
void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;
};

struct LocalVectorFactory {
static std::unique_ptr<LocalVector> createLocalVectorData(
const common::LogicalType& logicalType, storage::MemoryManager* mm);
const common::LogicalType& logicalType, MemoryManager* mm);
};

class LocalColumnChunk {
public:
explicit LocalColumnChunk(storage::MemoryManager* mm) : mm{mm} {};
explicit LocalColumnChunk(common::LogicalType& dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm} {};

void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector);
void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInLocalVector,
Expand All @@ -59,73 +74,87 @@ class LocalColumnChunk {
common::ValueVector* vectorToWriteFrom, common::sel_t pos);

std::map<common::vector_idx_t, std::unique_ptr<LocalVector>> vectors;
storage::MemoryManager* mm;
common::LogicalType dataType;
MemoryManager* mm;
};

class LocalColumn {
public:
explicit LocalColumn(storage::NodeColumn* column) : column{column} {};
explicit LocalColumn(NodeColumn* column) : column{column} {};
virtual ~LocalColumn() = default;

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
storage::MemoryManager* mm);
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, storage::MemoryManager* mm);
virtual void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void update(
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector, MemoryManager* mm);
virtual void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm);

virtual void prepareCommit();

protected:
virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
storage::NodeColumn* column;
NodeColumn* column;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};
explicit StringLocalColumn(NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

class VarListLocalColumn : public LocalColumn {
public:
explicit VarListLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};
explicit VarListLocalColumn(NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class StructLocalColumn : public LocalColumn {
public:
explicit StructLocalColumn(NodeColumn* column);

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
MemoryManager* mm) final;
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm) final;

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;

private:
std::vector<std::unique_ptr<LocalColumn>> fields;
};

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(storage::NodeColumn* column);
static std::unique_ptr<LocalColumn> createLocalColumn(NodeColumn* column);
};

class LocalTable {
public:
explicit LocalTable(storage::NodeTable* table) : table{table} {};
explicit LocalTable(NodeTable* table) : table{table} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector,
common::ValueVector* propertyVector, storage::MemoryManager* mm);
common::ValueVector* propertyVector, MemoryManager* mm);
void update(common::property_id_t propertyID, common::offset_t nodeOffset,
common::ValueVector* propertyVector, common::sel_t posInPropertyVector,
storage::MemoryManager* mm);
common::ValueVector* propertyVector, common::sel_t posInPropertyVector, MemoryManager* mm);

void prepareCommit();

private:
std::map<common::property_id_t, std::unique_ptr<LocalColumn>> columns;
storage::NodeTable* table;
NodeTable* table;
};

} // namespace storage
Expand Down
8 changes: 8 additions & 0 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ struct BoolNodeColumnFunc {
};

class NullNodeColumn;
class StructNodeColumn;
// TODO(Guodong): This is intentionally duplicated with `Column`, as for now, we don't change rel
// tables. `Column` is used for rel tables only. Eventually, we should remove `Column`.
class NodeColumn {
friend class LocalColumn;
friend class StringLocalColumn;
friend class VarListLocalColumn;
friend class transaction::TransactionTests;
friend class StructNodeColumn;

public:
NodeColumn(const catalog::Property& property, BMFileHandle* dataFH, BMFileHandle* metadataFH,
Expand Down Expand Up @@ -84,6 +86,10 @@ class NodeColumn {
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return metadataDA->getNumElements(transaction->getType());
}
inline NodeColumn* getChildColumn(common::vector_idx_t childIdx) {
assert(childIdx < childrenColumns.size());
return childrenColumns[childIdx].get();
}

virtual void checkpointInMemory();
virtual void rollbackInMemory();
Expand Down Expand Up @@ -147,6 +153,8 @@ class BoolNodeColumn : public NodeColumn {
};

class NullNodeColumn : public NodeColumn {
friend StructNodeColumn;

public:
NullNodeColumn(common::page_idx_t metaDAHPageIdx, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class StructNodeColumn : public NodeColumn {
common::ValueVector* resultVector) final;
void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void writeInternal(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;
};

} // namespace storage
Expand Down
100 changes: 95 additions & 5 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "storage/copier/var_list_column_chunk.h"
#include "storage/store/node_table.h"
#include "storage/store/string_node_column.h"
#include "storage/store/struct_node_column.h"
#include "storage/store/var_list_node_column.h"

using namespace kuzu::common;
Expand Down Expand Up @@ -39,7 +40,7 @@
}

void StringLocalVector::update(
sel_t offsetInLocalVector, common::ValueVector* updateVector, sel_t offsetInUpdateVector) {
sel_t offsetInLocalVector, ValueVector* updateVector, sel_t offsetInUpdateVector) {
auto kuStr = updateVector->getValue<ku_string_t>(offsetInUpdateVector);
if (kuStr.len > BufferPoolConstants::PAGE_4KB_SIZE) {
throw RuntimeException(
Expand All @@ -50,12 +51,41 @@
LocalVector::update(offsetInLocalVector, updateVector, offsetInUpdateVector);
}

void StructLocalVector::scan(ValueVector* resultVector) const {
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto posInLocalVector = vector->state->selVector->selectedPositions[i];
auto posInResultVector = resultVector->state->selVector->selectedPositions[i];
resultVector->setNull(posInResultVector, vector->isNull(posInLocalVector));
}
}

void StructLocalVector::lookup(

Check warning on line 62 in src/storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_table.cpp#L62

Added line #L62 was not covered by tests
sel_t offsetInLocalVector, ValueVector* resultVector, sel_t offsetInResultVector) {
if (!validityMask[offsetInLocalVector]) {

Check warning on line 64 in src/storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_table.cpp#L64

Added line #L64 was not covered by tests
return;
}
resultVector->setNull(offsetInResultVector, vector->isNull(offsetInLocalVector));

Check warning on line 67 in src/storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_table.cpp#L67

Added line #L67 was not covered by tests
}

void StructLocalVector::update(
sel_t offsetInLocalVector, ValueVector* updateVector, sel_t offsetInUpdateVector) {
vector->setNull(offsetInLocalVector, updateVector->isNull(offsetInUpdateVector));
if (!validityMask[offsetInLocalVector]) {
vector->state->selVector->selectedPositions[vector->state->selVector->selectedSize++] =
offsetInLocalVector;
validityMask[offsetInLocalVector] = true;
}
}

std::unique_ptr<LocalVector> LocalVectorFactory::createLocalVectorData(
const LogicalType& logicalType, MemoryManager* mm) {
switch (logicalType.getPhysicalType()) {
case PhysicalTypeID::STRING: {
return std::make_unique<StringLocalVector>(mm);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructLocalVector>(mm);
}
default: {
return std::make_unique<LocalVector>(logicalType, mm);
}
Expand All @@ -80,8 +110,7 @@
void LocalColumnChunk::update(vector_idx_t vectorIdx, sel_t offsetInLocalVector,
ValueVector* updateVector, sel_t offsetInUpdateVector) {
if (!vectors.contains(vectorIdx)) {
vectors.emplace(
vectorIdx, LocalVectorFactory::createLocalVectorData(updateVector->dataType, mm));
vectors.emplace(vectorIdx, LocalVectorFactory::createLocalVectorData(dataType, mm));
}
vectors.at(vectorIdx)->update(offsetInLocalVector, updateVector, offsetInUpdateVector);
}
Expand Down Expand Up @@ -128,7 +157,7 @@
sel_t posInPropertyVector, MemoryManager* mm) {
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
if (!chunks.contains(nodeGroupIdx)) {
chunks.emplace(nodeGroupIdx, std::make_unique<LocalColumnChunk>(mm));
chunks.emplace(nodeGroupIdx, std::make_unique<LocalColumnChunk>(column->dataType, mm));
}
auto chunk = chunks.at(nodeGroupIdx).get();
auto [vectorIdxInChunk, offsetInVector] =
Expand All @@ -144,6 +173,7 @@

void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
auto nodeGroupStartOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
for (auto& [vectorIdx, vector] : chunk->vectors) {
auto vectorStartOffset =
Expand All @@ -157,6 +187,7 @@
}

void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto localChunk = chunks.at(nodeGroupIdx).get();
auto stringColumn = reinterpret_cast<StringNodeColumn*>(column);
auto overflowMetadata =
Expand Down Expand Up @@ -193,12 +224,13 @@
}

void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
auto varListColumn = reinterpret_cast<VarListNodeColumn*>(column);
auto listColumnChunkInStorage = ColumnChunkFactory::createColumnChunk(column->getDataType());
auto columnChunkToUpdate = ColumnChunkFactory::createColumnChunk(column->getDataType());
varListColumn->scan(nodeGroupIdx, listColumnChunkInStorage.get());
common::offset_t nextOffsetToWrite = 0;
offset_t nextOffsetToWrite = 0;
auto numNodesInGroup =
column->metadataDA->get(nodeGroupIdx, TransactionType::READ_ONLY).numValues;
for (auto& [vectorIdx, localVector] : chunk->vectors) {
Expand Down Expand Up @@ -236,11 +268,69 @@
dataColumnChunk, startPageIdx, nodeGroupIdx);
}

StructLocalColumn::StructLocalColumn(NodeColumn* column) : LocalColumn{column} {
assert(column->getDataType().getLogicalTypeID() == LogicalTypeID::STRUCT);
auto dataType = column->getDataType();
auto structFields = StructType::getFields(&dataType);
fields.resize(structFields.size());
for (auto i = 0u; i < structFields.size(); i++) {
fields[i] = LocalColumnFactory::createLocalColumn(column->getChildColumn(i));
}
}

void StructLocalColumn::scan(ValueVector* nodeIDVector, ValueVector* resultVector) {
LocalColumn::scan(nodeIDVector, resultVector);
auto fieldVectors = StructVector::getFieldVectors(resultVector);
assert(fieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->scan(nodeIDVector, fieldVectors[i].get());
}
}

void StructLocalColumn::lookup(ValueVector* nodeIDVector, ValueVector* resultVector) {
LocalColumn::lookup(nodeIDVector, resultVector);
auto fieldVectors = StructVector::getFieldVectors(resultVector);

Check warning on line 292 in src/storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_table.cpp#L290-L292

Added lines #L290 - L292 were not covered by tests
assert(fieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->lookup(nodeIDVector, fieldVectors[i].get());

Check warning on line 295 in src/storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_table.cpp#L294-L295

Added lines #L294 - L295 were not covered by tests
}
}

Check warning on line 297 in src/storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_table.cpp#L297

Added line #L297 was not covered by tests

void StructLocalColumn::update(
ValueVector* nodeIDVector, ValueVector* propertyVector, MemoryManager* mm) {
LocalColumn::update(nodeIDVector, propertyVector, mm);
auto propertyFieldVectors = StructVector::getFieldVectors(propertyVector);
assert(propertyFieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->update(nodeIDVector, propertyFieldVectors[i].get(), mm);
}
}

void StructLocalColumn::update(offset_t nodeOffset, ValueVector* propertyVector,
sel_t posInPropertyVector, MemoryManager* mm) {
LocalColumn::update(nodeOffset, propertyVector, posInPropertyVector, mm);
auto propertyFieldVectors = StructVector::getFieldVectors(propertyVector);
assert(propertyFieldVectors.size() == fields.size());
for (auto i = 0u; i < fields.size(); i++) {
fields[i]->update(nodeOffset, propertyFieldVectors[i].get(), posInPropertyVector, mm);
}
}

void StructLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
LocalColumn::prepareCommitForChunk(nodeGroupIdx);
for (auto& field : fields) {
field->prepareCommitForChunk(nodeGroupIdx);
}
}

std::unique_ptr<LocalColumn> LocalColumnFactory::createLocalColumn(NodeColumn* column) {
switch (column->getDataType().getPhysicalType()) {
case PhysicalTypeID::STRING: {
return std::make_unique<StringLocalColumn>(column);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructLocalColumn>(column);
}
case PhysicalTypeID::VAR_LIST: {
return std::make_unique<VarListLocalColumn>(column);
}
Expand Down
Loading
Loading