diff --git a/src/include/storage/local_storage/local_table.h b/src/include/storage/local_storage/local_table.h index bdff88340c..39d036f430 100644 --- a/src/include/storage/local_storage/local_table.h +++ b/src/include/storage/local_storage/local_table.h @@ -56,6 +56,9 @@ class LocalVectorCollection { return vectors[vectorIdx].get(); } + std::unique_ptr getStructChildVectorCollection( + common::struct_field_idx_t idx); + common::row_idx_t append(common::ValueVector* vector); private: diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index fd0c5ab808..f74b2782d4 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -70,7 +70,7 @@ class Column { return metadataDA->getNumElements(transaction->getType()); } - void prepareCommitForChunk(transaction::Transaction* transaction, + virtual void prepareCommitForChunk(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk, const offset_to_row_idx_t& insertInfo, const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo); diff --git a/src/include/storage/store/struct_column.h b/src/include/storage/store/struct_column.h index 103e5332c5..89a955b93e 100644 --- a/src/include/storage/store/struct_column.h +++ b/src/include/storage/store/struct_column.h @@ -5,7 +5,7 @@ namespace kuzu { namespace storage { -class StructColumn : public Column { +class StructColumn final : public Column { public: StructColumn(std::unique_ptr dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, @@ -13,28 +13,33 @@ class StructColumn : public Column { RWPropertyStats propertyStatistics, bool enableCompression); void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, - ColumnChunk* columnChunk) final; + ColumnChunk* columnChunk) override; void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, - common::ValueVector* resultVector, uint64_t offsetInVector) final; + common::ValueVector* resultVector, uint64_t offsetInVector) override; - void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final; + void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override; - void checkpointInMemory() final; - void rollbackInMemory() final; + void checkpointInMemory() override; + void rollbackInMemory() override; inline Column* getChild(common::vector_idx_t childIdx) { KU_ASSERT(childIdx < childColumns.size()); return childColumns[childIdx].get(); } void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom) final; + uint32_t posInVectorToWriteFrom) override; + + void prepareCommitForChunk(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk, + const offset_to_row_idx_t& insertInfo, const offset_to_row_idx_t& updateInfo, + const offset_set_t& deleteInfo) override; protected: void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, - common::ValueVector* resultVector) final; + common::ValueVector* resultVector) override; void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, - common::ValueVector* resultVector) final; + common::ValueVector* resultVector) override; private: std::vector> childColumns; diff --git a/src/storage/local_storage/local_table.cpp b/src/storage/local_storage/local_table.cpp index 4c9fe68c07..bd031fd142 100644 --- a/src/storage/local_storage/local_table.cpp +++ b/src/storage/local_storage/local_table.cpp @@ -54,6 +54,19 @@ void LocalVectorCollection::prepareAppend() { } } +std::unique_ptr LocalVectorCollection::getStructChildVectorCollection( + common::struct_field_idx_t idx) { + auto childCollection = std::make_unique( + common::StructType::getField(dataType, idx)->getType(), mm); + + for (int i = 0; i < numRows; i++) { + auto fieldVector = + common::StructVector::getFieldVector(getLocalVector(i)->getVector(), idx); + childCollection->append(fieldVector.get()); + } + return childCollection; +} + LocalNodeGroup::LocalNodeGroup(std::vector dataTypes, MemoryManager* mm) { chunks.resize(dataTypes.size()); for (auto i = 0u; i < dataTypes.size(); ++i) { diff --git a/src/storage/store/struct_column.cpp b/src/storage/store/struct_column.cpp index a1e8bd72e6..47ac9f1c98 100644 --- a/src/storage/store/struct_column.cpp +++ b/src/storage/store/struct_column.cpp @@ -104,5 +104,41 @@ void StructColumn::rollbackInMemory() { } } +void StructColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx, + LocalVectorCollection* localColumnChunk, const offset_to_row_idx_t& insertInfo, + const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) { + auto currentNumNodeGroups = metadataDA->getNumElements(transaction->getType()); + auto isNewNodeGroup = nodeGroupIdx >= currentNumNodeGroups; + if (isNewNodeGroup) { + // If this is a new node group, updateInfo should be empty. We should perform out-of-place + // commit with a new column chunk. + commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup, + insertInfo, updateInfo, deleteInfo); + } else { + // Update null data (currently always works in-place) + KU_ASSERT(nullColumn->canCommitInPlace( + transaction, nodeGroupIdx, localColumnChunk, insertInfo, updateInfo)); + nullColumn->commitLocalChunkInPlace( + transaction, localColumnChunk, insertInfo, updateInfo, deleteInfo); + // Update each child column separately + for (int i = 0; i < childColumns.size(); i++) { + const auto& childColumn = childColumns[i]; + auto childLocalColumnChunk = localColumnChunk->getStructChildVectorCollection(i); + + // If this is not a new node group, we should first check if we can perform in-place + // commit. + if (childColumn->canCommitInPlace(transaction, nodeGroupIdx, + childLocalColumnChunk.get(), insertInfo, updateInfo)) { + childColumn->commitLocalChunkInPlace( + transaction, childLocalColumnChunk.get(), insertInfo, updateInfo, deleteInfo); + } else { + childColumn->commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, + childLocalColumnChunk.get(), isNewNodeGroup, insertInfo, updateInfo, + deleteInfo); + } + } + } +} + } // namespace storage } // namespace kuzu diff --git a/test/test_files/update_node/delete_tinysnb.test b/test/test_files/update_node/delete_tinysnb.test index efa0643ea4..2ade29b1e0 100644 --- a/test/test_files/update_node/delete_tinysnb.test +++ b/test/test_files/update_node/delete_tinysnb.test @@ -15,7 +15,6 @@ 101|| -CASE MixedDeleteInsertTest --SKIP -STATEMENT CREATE (a:organisation {ID:30, mark:3.3}) ---- ok -STATEMENT MATCH (a:organisation) WHERE a.ID = 30 RETURN a.orgCode, a.mark diff --git a/tools/python_api/test/test_prepared_statement.py b/tools/python_api/test/test_prepared_statement.py index 857eac0004..4abf8f2ccb 100644 --- a/tools/python_api/test/test_prepared_statement.py +++ b/tools/python_api/test/test_prepared_statement.py @@ -30,7 +30,6 @@ def test_read(establish_connection): assert not result.has_next() -@pytest.mark.skip(reason="Failing due to struct out of place update regression") def test_write(establish_connection): conn, _ = establish_connection orgs = [