Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update struct fields in-place or out of place as necessary #2442

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class LocalVectorCollection {
return vectors[vectorIdx].get();
}

std::unique_ptr<LocalVectorCollection> getStructChildVectorCollection(
common::struct_field_idx_t idx);

common::row_idx_t append(common::ValueVector* vector);

private:
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Column {
return metadataDA->getNumElements(transaction->getType());
}

void prepareCommitForChunk(transaction::Transaction* transaction,
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
const offset_to_row_idx_t& insertInfo, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
Expand Down
23 changes: 14 additions & 9 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@
namespace kuzu {
namespace storage {

class StructColumn : public Column {
class StructColumn final : public Column {
public:
StructColumn(std::unique_ptr<common::LogicalType> dataType,
const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool enableCompression);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk) final;
ColumnChunk* columnChunk) override;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector) final;
common::ValueVector* resultVector, uint64_t offsetInVector) override;

void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final;
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

void checkpointInMemory() final;
void rollbackInMemory() final;
void checkpointInMemory() override;
void rollbackInMemory() override;

inline Column* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childColumns.size());
return childColumns[childIdx].get();
}
void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;
uint32_t posInVectorToWriteFrom) override;

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
const offset_to_row_idx_t& insertInfo, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
common::ValueVector* resultVector) override;
void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
common::ValueVector* resultVector) override;

private:
std::vector<std::unique_ptr<Column>> childColumns;
Expand Down
13 changes: 13 additions & 0 deletions src/storage/local_storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@
}
}

std::unique_ptr<LocalVectorCollection> LocalVectorCollection::getStructChildVectorCollection(
common::struct_field_idx_t idx) {
auto childCollection = std::make_unique<LocalVectorCollection>(
common::StructType::getField(dataType, idx)->getType(), mm);

for (int i = 0; i < numRows; i++) {
auto fieldVector =
common::StructVector::getFieldVector(getLocalVector(i)->getVector(), idx);
childCollection->append(fieldVector.get());
}
return childCollection;
}

Check warning on line 68 in src/storage/local_storage/local_table.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/local_storage/local_table.cpp#L68

Added line #L68 was not covered by tests

LocalNodeGroup::LocalNodeGroup(std::vector<LogicalType*> dataTypes, MemoryManager* mm) {
chunks.resize(dataTypes.size());
for (auto i = 0u; i < dataTypes.size(); ++i) {
Expand Down
36 changes: 36 additions & 0 deletions src/storage/store/struct_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,41 @@
}
}

void StructColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localColumnChunk, const offset_to_row_idx_t& insertInfo,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) {
auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType());
auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups;
if (isNewNodeGroup) {
// If this is a new node group, updateInfo should be empty. We should perform out-of-place
// commit with a new column chunk.
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup,

Check warning on line 115 in src/storage/store/struct_column.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/struct_column.cpp#L115

Added line #L115 was not covered by tests
insertInfo, updateInfo, deleteInfo);
} else {
// Update null data (currently always works in-place)
KU_ASSERT(nullColumn->canCommitInPlace(
transaction, nodeGroupIdx, localColumnChunk, insertInfo, updateInfo));
nullColumn->commitLocalChunkInPlace(
transaction, localColumnChunk, insertInfo, updateInfo, deleteInfo);
// Update each child column separately
for (int i = 0; i < childColumns.size(); i++) {
const auto& childColumn = childColumns[i];
auto childLocalColumnChunk = localColumnChunk->getStructChildVectorCollection(i);

// If this is not a new node group, we should first check if we can perform in-place
// commit.
if (childColumn->canCommitInPlace(transaction, nodeGroupIdx,
childLocalColumnChunk.get(), insertInfo, updateInfo)) {
childColumn->commitLocalChunkInPlace(
transaction, childLocalColumnChunk.get(), insertInfo, updateInfo, deleteInfo);
} else {
childColumn->commitLocalChunkOutOfPlace(transaction, nodeGroupIdx,
childLocalColumnChunk.get(), isNewNodeGroup, insertInfo, updateInfo,
deleteInfo);
}
}
}
}

} // namespace storage
} // namespace kuzu
1 change: 0 additions & 1 deletion test/test_files/update_node/delete_tinysnb.test
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
101||

-CASE MixedDeleteInsertTest
-SKIP
-STATEMENT CREATE (a:organisation {ID:30, mark:3.3})
---- ok
-STATEMENT MATCH (a:organisation) WHERE a.ID = 30 RETURN a.orgCode, a.mark
Expand Down
1 change: 0 additions & 1 deletion tools/python_api/test/test_prepared_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def test_read(establish_connection):
assert not result.has_next()


@pytest.mark.skip(reason="Failing due to struct out of place update regression")
def test_write(establish_connection):
conn, _ = establish_connection
orgs = [
Expand Down