Skip to content

Commit

Permalink
Merge pull request #2442 from kuzudb/struct-field-out-of-place-fix
Browse files Browse the repository at this point in the history
Update struct fields in-place or out of place as necessary
  • Loading branch information
benjaminwinger committed Nov 17, 2023
2 parents 0763648 + a53cd01 commit 7ef3d5c
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class LocalVectorCollection {
return vectors[vectorIdx].get();
}

std::unique_ptr<LocalVectorCollection> getStructChildVectorCollection(
common::struct_field_idx_t idx);

common::row_idx_t append(common::ValueVector* vector);

private:
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Column {
return metadataDA->getNumElements(transaction->getType());
}

void prepareCommitForChunk(transaction::Transaction* transaction,
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
const offset_to_row_idx_t& insertInfo, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
Expand Down
23 changes: 14 additions & 9 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@
namespace kuzu {
namespace storage {

class StructColumn : public Column {
class StructColumn final : public Column {
public:
StructColumn(std::unique_ptr<common::LogicalType> dataType,
const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool enableCompression);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk) final;
ColumnChunk* columnChunk) override;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector) final;
common::ValueVector* resultVector, uint64_t offsetInVector) override;

void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final;
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

void checkpointInMemory() final;
void rollbackInMemory() final;
void checkpointInMemory() override;
void rollbackInMemory() override;

inline Column* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childColumns.size());
return childColumns[childIdx].get();
}
void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;
uint32_t posInVectorToWriteFrom) override;

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
const offset_to_row_idx_t& insertInfo, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
common::ValueVector* resultVector) override;
void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
common::ValueVector* resultVector) override;

private:
std::vector<std::unique_ptr<Column>> childColumns;
Expand Down
13 changes: 13 additions & 0 deletions src/storage/local_storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ void LocalVectorCollection::prepareAppend() {
}
}

std::unique_ptr<LocalVectorCollection> LocalVectorCollection::getStructChildVectorCollection(
common::struct_field_idx_t idx) {
auto childCollection = std::make_unique<LocalVectorCollection>(
common::StructType::getField(dataType, idx)->getType(), mm);

for (int i = 0; i < numRows; i++) {
auto fieldVector =
common::StructVector::getFieldVector(getLocalVector(i)->getVector(), idx);
childCollection->append(fieldVector.get());
}
return childCollection;
}

LocalNodeGroup::LocalNodeGroup(std::vector<LogicalType*> dataTypes, MemoryManager* mm) {
chunks.resize(dataTypes.size());
for (auto i = 0u; i < dataTypes.size(); ++i) {
Expand Down
36 changes: 36 additions & 0 deletions src/storage/store/struct_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,41 @@ void StructColumn::rollbackInMemory() {
}
}

void StructColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localColumnChunk, const offset_to_row_idx_t& insertInfo,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) {
auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType());
auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups;
if (isNewNodeGroup) {
// If this is a new node group, updateInfo should be empty. We should perform out-of-place
// commit with a new column chunk.
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup,
insertInfo, updateInfo, deleteInfo);
} else {
// Update null data (currently always works in-place)
KU_ASSERT(nullColumn->canCommitInPlace(
transaction, nodeGroupIdx, localColumnChunk, insertInfo, updateInfo));
nullColumn->commitLocalChunkInPlace(
transaction, localColumnChunk, insertInfo, updateInfo, deleteInfo);
// Update each child column separately
for (int i = 0; i < childColumns.size(); i++) {
const auto& childColumn = childColumns[i];
auto childLocalColumnChunk = localColumnChunk->getStructChildVectorCollection(i);

// If this is not a new node group, we should first check if we can perform in-place
// commit.
if (childColumn->canCommitInPlace(transaction, nodeGroupIdx,
childLocalColumnChunk.get(), insertInfo, updateInfo)) {
childColumn->commitLocalChunkInPlace(
transaction, childLocalColumnChunk.get(), insertInfo, updateInfo, deleteInfo);
} else {
childColumn->commitLocalChunkOutOfPlace(transaction, nodeGroupIdx,
childLocalColumnChunk.get(), isNewNodeGroup, insertInfo, updateInfo,
deleteInfo);
}
}
}
}

} // namespace storage
} // namespace kuzu
1 change: 0 additions & 1 deletion test/test_files/update_node/delete_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
101||

-CASE MixedDeleteInsertTest
-SKIP
-STATEMENT CREATE (a:organisation {ID:30, mark:3.3})
---- ok
-STATEMENT MATCH (a:organisation) WHERE a.ID = 30 RETURN a.orgCode, a.mark
Expand Down
1 change: 0 additions & 1 deletion tools/python_api/test/test_prepared_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def test_read(establish_connection):
assert not result.has_next()


@pytest.mark.skip(reason="Failing due to struct out of place update regression")
def test_write(establish_connection):
conn, _ = establish_connection
orgs = [
Expand Down

0 comments on commit 7ef3d5c

Please sign in to comment.