diff --git a/dataset/copy-fault-tests/duplicate-ids/schema.cypher b/dataset/copy-fault-tests/duplicate-ids/schema.cypher index ad9435d2bf..5cbe4566e5 100644 --- a/dataset/copy-fault-tests/duplicate-ids/schema.cypher +++ b/dataset/copy-fault-tests/duplicate-ids/schema.cypher @@ -1 +1,2 @@ -create node table person (ID INT64, fName STRING, PRIMARY KEY (ID)); \ No newline at end of file +create node table person (ID INT64, fName STRING, PRIMARY KEY (ID)); +create node table org (ID STRING, fName STRING, PRIMARY KEY (ID)); diff --git a/dataset/copy-fault-tests/duplicate-ids/vOrg.csv b/dataset/copy-fault-tests/duplicate-ids/vOrg.csv new file mode 100644 index 0000000000..e6c1fb63e8 --- /dev/null +++ b/dataset/copy-fault-tests/duplicate-ids/vOrg.csv @@ -0,0 +1,4 @@ +10,Guodong +24,Semih +31,Xiyang +10,Ziyi diff --git a/src/common/exception.cpp b/src/common/exception.cpp index ab7b9db39d..060a9f22e4 100644 --- a/src/common/exception.cpp +++ b/src/common/exception.cpp @@ -18,5 +18,10 @@ std::string ExceptionMessage::invalidPKType(const std::string& type) { type); } +std::string ExceptionMessage::overLargeStringValueException(const std::string& length) { + return StringUtils::string_format( + "Maximum length of strings is 4096. Input string's length is {}.", length); +} + } // namespace common } // namespace kuzu diff --git a/src/include/common/exception.h b/src/include/common/exception.h index 894904b93e..e0812fbd53 100644 --- a/src/include/common/exception.h +++ b/src/include/common/exception.h @@ -15,6 +15,7 @@ struct ExceptionMessage { static inline std::string notAllowCopyOnNonEmptyTableException() { return "COPY commands can only be executed once on a table."; } + static std::string overLargeStringValueException(const std::string& length); }; class Exception : public std::exception { diff --git a/src/include/common/types/ku_string.h b/src/include/common/types/ku_string.h index 20d148a648..451e56b20d 100644 --- a/src/include/common/types/ku_string.h +++ b/src/include/common/types/ku_string.h @@ -9,9 +9,9 @@ namespace common { struct ku_string_t { - static const uint64_t PREFIX_LENGTH = 4; - static const uint64_t INLINED_SUFFIX_LENGTH = 8; - static const uint64_t SHORT_STR_LENGTH = PREFIX_LENGTH + INLINED_SUFFIX_LENGTH; + static constexpr uint64_t PREFIX_LENGTH = 4; + static constexpr uint64_t INLINED_SUFFIX_LENGTH = 8; + static constexpr uint64_t SHORT_STR_LENGTH = PREFIX_LENGTH + INLINED_SUFFIX_LENGTH; uint32_t len; uint8_t prefix[PREFIX_LENGTH]; diff --git a/src/include/common/vector/value_vector.h b/src/include/common/vector/value_vector.h index 45cb8f4b4b..6a588f399d 100644 --- a/src/include/common/vector/value_vector.h +++ b/src/include/common/vector/value_vector.h @@ -53,6 +53,7 @@ class ValueVector { inline uint32_t getNumBytesPerValue() const { return numBytesPerValue; } + // TODO(Guodong): Rename this to getValueRef template inline T& getValue(uint32_t pos) const { return ((T*)valueBuffer.get())[pos]; diff --git a/src/include/main/storage_driver.h b/src/include/main/storage_driver.h index efcea03754..64983802a6 100644 --- a/src/include/main/storage_driver.h +++ b/src/include/main/storage_driver.h @@ -25,8 +25,8 @@ class StorageDriver { uint64_t getNumRels(const std::string& relName); private: - void scanColumn( - storage::NodeColumn* column, common::offset_t* offsets, size_t size, uint8_t* result); + void scanColumn(transaction::Transaction* transaction, storage::NodeColumn* column, + common::offset_t* offsets, size_t size, uint8_t* result); private: catalog::Catalog* catalog; diff --git a/src/include/processor/operator/persistent/set_executor.h b/src/include/processor/operator/persistent/set_executor.h index 2fcc02f494..d6790d0363 100644 --- a/src/include/processor/operator/persistent/set_executor.h +++ b/src/include/processor/operator/persistent/set_executor.h @@ -18,7 +18,7 @@ class NodeSetExecutor { void init(ResultSet* resultSet, ExecutionContext* context); - virtual void set() = 0; + virtual void set(ExecutionContext* context) = 0; virtual std::unique_ptr copy() const = 0; @@ -37,46 +37,50 @@ class NodeSetExecutor { common::ValueVector* rhsVector = nullptr; }; +struct NodeSetInfo { + storage::NodeTable* table; + common::property_id_t propertyID; +}; + class SingleLabelNodeSetExecutor : public NodeSetExecutor { public: - SingleLabelNodeSetExecutor(storage::NodeColumn* column, const DataPos& nodeIDPos, + SingleLabelNodeSetExecutor(NodeSetInfo setInfo, const DataPos& nodeIDPos, const DataPos& lhsVectorPos, std::unique_ptr evaluator) - : NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, column{column} {} + : NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, setInfo{setInfo} {} SingleLabelNodeSetExecutor(const SingleLabelNodeSetExecutor& other) : NodeSetExecutor{other.nodeIDPos, other.lhsVectorPos, other.evaluator->clone()}, - column{other.column} {} + setInfo(other.setInfo) {} - void set() final; + void set(ExecutionContext* context) final; inline std::unique_ptr copy() const final { return std::make_unique(*this); } private: - storage::NodeColumn* column; + NodeSetInfo setInfo; }; class MultiLabelNodeSetExecutor : public NodeSetExecutor { public: - MultiLabelNodeSetExecutor( - std::unordered_map tableIDToColumn, + MultiLabelNodeSetExecutor(std::unordered_map tableIDToSetInfo, const DataPos& nodeIDPos, const DataPos& lhsVectorPos, std::unique_ptr evaluator) - : NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, tableIDToColumn{std::move( - tableIDToColumn)} {} + : NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, + tableIDToSetInfo{std::move(tableIDToSetInfo)} {} MultiLabelNodeSetExecutor(const MultiLabelNodeSetExecutor& other) : NodeSetExecutor{other.nodeIDPos, other.lhsVectorPos, other.evaluator->clone()}, - tableIDToColumn{other.tableIDToColumn} {} + tableIDToSetInfo{other.tableIDToSetInfo} {} - void set() final; + void set(ExecutionContext* context) final; inline std::unique_ptr copy() const final { return std::make_unique(*this); } private: - std::unordered_map tableIDToColumn; + std::unordered_map tableIDToSetInfo; }; class RelSetExecutor { diff --git a/src/include/storage/copier/column_chunk.h b/src/include/storage/copier/column_chunk.h index 0d5aafad47..36fc1fddf2 100644 --- a/src/include/storage/copier/column_chunk.h +++ b/src/include/storage/copier/column_chunk.h @@ -16,6 +16,15 @@ namespace storage { class NullColumnChunk; +struct ColumnChunkMetadata { + common::page_idx_t pageIdx = common::INVALID_PAGE_IDX; + common::page_idx_t numPages = 0; +}; + +struct OverflowColumnChunkMetadata : public ColumnChunkMetadata { + common::offset_t lastOffsetInPage; +}; + // Base data segment covers all fixed-sized data types. // Some template functions are almost duplicated from `InMemColumnChunk`, which is intended. // Currently, `InMemColumnChunk` is used to populate rel columns. Eventually, we will merge them. @@ -76,6 +85,8 @@ class ColumnChunk { } inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; } + inline uint64_t getNumBytes() const { return numBytes; } + inline uint8_t* getData() { return buffer.get(); } virtual void write(const common::Value& val, uint64_t posToWrite); diff --git a/src/include/storage/copier/string_column_chunk.h b/src/include/storage/copier/string_column_chunk.h index 880eebeae3..03f862b499 100644 --- a/src/include/storage/copier/string_column_chunk.h +++ b/src/include/storage/copier/string_column_chunk.h @@ -17,6 +17,8 @@ class StringColumnChunk : public ColumnChunk { void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final; + virtual void update(common::ValueVector* vector, common::vector_idx_t vectorIdx); + template void setValueFromString(const char* value, uint64_t length, uint64_t pos) { throw common::NotImplementedException("VarSizedColumnChunk::setValueFromString"); @@ -28,11 +30,14 @@ class StringColumnChunk : public ColumnChunk { common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx); -private: + inline InMemOverflowFile* getOverflowFile() { return overflowFile.get(); } + inline common::offset_t getLastOffsetInPage() { return overflowCursor.offsetInPage; } + inline common::page_idx_t getNumPages() const final { return ColumnChunk::getNumPages() + overflowFile->getNumPages(); } +private: template void templateCopyStringArrowArray( arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend); diff --git a/src/include/storage/local_storage.h b/src/include/storage/local_storage.h new file mode 100644 index 0000000000..c4f2b74b0e --- /dev/null +++ b/src/include/storage/local_storage.h @@ -0,0 +1,39 @@ +#pragma once + +#include "local_table.h" + +namespace kuzu { +namespace storage { +class NodesStore; + +// Data structures in LocalStorage are not thread-safe. +// For now, we only support single thread insertions and updates. Once we optimize them with +// multiple threads, LocalStorage and its related data structures should be reworked to be +// thread-safe. +class LocalStorage { +public: + LocalStorage(storage::StorageManager* storageManager, storage::MemoryManager* mm); + + void scan(common::table_id_t tableID, common::ValueVector* nodeIDVector, + const std::vector& columnIDs, + const std::vector& outputVectors); + void lookup(common::table_id_t tableID, common::ValueVector* nodeIDVector, + const std::vector& columnIDs, + const std::vector& outputVectors); + void update(common::table_id_t tableID, common::property_id_t propertyID, + common::ValueVector* nodeIDVector, common::ValueVector* propertyVector); + void update(common::table_id_t tableID, common::property_id_t propertyID, + common::offset_t nodeOffset, common::ValueVector* propertyVector, + common::sel_t posInPropertyVector); + + void prepareCommit(); + void prepareRollback(); + +private: + std::map> tables; + storage::NodesStore* nodesStore; + storage::MemoryManager* mm; +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/local_table.h b/src/include/storage/local_table.h new file mode 100644 index 0000000000..cd0bf65b1d --- /dev/null +++ b/src/include/storage/local_table.h @@ -0,0 +1,124 @@ +#pragma once + +#include + +#include "common/vector/value_vector.h" + +namespace kuzu { +namespace storage { +class NodeColumn; +class NodeTable; + +class LocalVector { +public: + LocalVector(const common::LogicalType& logicalType, storage::MemoryManager* mm) { + vector = std::make_unique(logicalType, mm); + vector->setState(std::make_shared()); + vector->state->selVector->resetSelectorToValuePosBuffer(); + } + + virtual ~LocalVector() = default; + + void scan(common::ValueVector* resultVector) const; + void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector, + common::sel_t offsetInResultVector); + virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector, + common::sel_t offsetInUpdateVector); + + std::unique_ptr vector; + // This mask is mainly to speed the lookup operation up. Otherwise, we have to do binary search + // to check if the value at an offset has been updated or not. + std::bitset validityMask; +}; + +class StringLocalVector : public LocalVector { +public: + explicit StringLocalVector(storage::MemoryManager* mm) + : LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{ + 0} {}; + + void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector, + common::sel_t offsetInUpdateVector) final; + + uint64_t ovfStringLength; +}; + +struct LocalVectorFactory { + static std::unique_ptr createLocalVectorData( + const common::LogicalType& logicalType, storage::MemoryManager* mm); +}; + +class LocalColumnChunk { +public: + explicit LocalColumnChunk(storage::MemoryManager* mm) : mm{mm} {}; + + void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector); + void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInLocalVector, + common::ValueVector* resultVector, common::sel_t offsetInResultVector); + void update(common::vector_idx_t vectorIdx, common::sel_t offsetInVector, + common::ValueVector* vectorToWriteFrom, common::sel_t pos); + + std::map> vectors; + storage::MemoryManager* mm; +}; + +class LocalColumn { +public: + explicit LocalColumn(storage::NodeColumn* column) : column{column} {}; + virtual ~LocalColumn() = default; + + void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector); + void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector); + void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector, + storage::MemoryManager* mm); + void update(common::offset_t nodeOffset, common::ValueVector* propertyVector, + common::sel_t posInPropertyVector, storage::MemoryManager* mm); + + virtual void prepareCommit(); + +protected: + virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx); + +protected: + std::map> chunks; + storage::NodeColumn* column; +}; + +class StringLocalColumn : public LocalColumn { +public: + explicit StringLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {}; + +private: + void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final; + void commitLocalChunkOutOfPlace( + common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk); +}; + +struct LocalColumnFactory { + static std::unique_ptr createLocalColumn(storage::NodeColumn* column); +}; + +class LocalTable { +public: + explicit LocalTable(storage::NodeTable* table) : table{table} {}; + + void scan(common::ValueVector* nodeIDVector, const std::vector& columnIDs, + const std::vector& outputVectors); + void lookup(common::ValueVector* nodeIDVector, + const std::vector& columnIDs, + const std::vector& outputVectors); + void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector, + common::ValueVector* propertyVector, storage::MemoryManager* mm); + void update(common::property_id_t propertyID, common::offset_t nodeOffset, + common::ValueVector* propertyVector, common::sel_t posInPropertyVector, + storage::MemoryManager* mm); + + void prepareCommit(); + +private: + std::map> columns; + storage::NodeTable* table; +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/storage_utils.h b/src/include/storage/storage_utils.h index 0862738724..40af7a2e7c 100644 --- a/src/include/storage/storage_utils.h +++ b/src/include/storage/storage_utils.h @@ -86,16 +86,34 @@ struct PageUtils { class StorageUtils { public: - static inline common::offset_t getStartOffsetForNodeGroup( + static inline common::offset_t getStartOffsetOfNodeGroup( common::node_group_idx_t nodeGroupIdx) { return nodeGroupIdx << common::StorageConstants::NODE_GROUP_SIZE_LOG2; } - - static inline common::node_group_idx_t getNodeGroupIdxFromNodeOffset( - common::offset_t nodeOffset) { + static inline common::offset_t getStartOffsetOfVector(common::vector_idx_t vectorIdx) { + return vectorIdx << common::DEFAULT_VECTOR_CAPACITY_LOG_2; + } + static inline common::node_group_idx_t getNodeGroupIdx(common::offset_t nodeOffset) { return nodeOffset >> common::StorageConstants::NODE_GROUP_SIZE_LOG2; } + static inline common::vector_idx_t getVectorIdx(common::offset_t offsetInChunk) { + return offsetInChunk >> common::DEFAULT_VECTOR_CAPACITY_LOG_2; + } + static inline common::vector_idx_t getVectorIdxInChunk( + common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx) { + return (nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx)) >> + common::DEFAULT_VECTOR_CAPACITY_LOG_2; + } + static inline std::pair + getVectorIdxInChunkAndOffsetInVector( + common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx) { + auto startOffsetOfNodeGroup = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + auto offsetInChunk = nodeOffset - startOffsetOfNodeGroup; + auto vectorIdx = getVectorIdx(offsetInChunk); + return std::make_pair(vectorIdx, offsetInChunk - getStartOffsetOfVector(vectorIdx)); + } + static std::string getNodeIndexFName(const std::string& directory, const common::table_id_t& tableID, common::DBFileType dbFileType); diff --git a/src/include/storage/store/node_column.h b/src/include/storage/store/node_column.h index 9fe81939a8..99fd2e9b2b 100644 --- a/src/include/storage/store/node_column.h +++ b/src/include/storage/store/node_column.h @@ -6,6 +6,10 @@ #include "storage/storage_structure/storage_structure.h" namespace kuzu { +namespace transaction { +class TransactionTests; +} + namespace storage { using read_node_column_func_t = std::function; -struct ColumnChunkMetadata { - common::page_idx_t pageIdx = common::INVALID_PAGE_IDX; - common::page_idx_t numPages = 0; -}; - struct FixedSizedNodeColumnFunc { static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead); @@ -48,6 +47,10 @@ class NullNodeColumn; // TODO(Guodong): This is intentionally duplicated with `Column`, as for now, we don't change rel // tables. `Column` is used for rel tables only. Eventually, we should remove `Column`. class NodeColumn { + friend class LocalColumn; + friend class StringLocalColumn; + friend class transaction::TransactionTests; + public: NodeColumn(const catalog::Property& property, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, @@ -58,26 +61,21 @@ class NodeColumn { virtual ~NodeColumn() = default; // Expose for feature store - virtual void batchLookup(const common::offset_t* nodeOffsets, size_t size, uint8_t* result); + virtual void batchLookup(transaction::Transaction* transaction, + const common::offset_t* nodeOffsets, size_t size, uint8_t* result); virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector); virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, common::ValueVector* resultVector, uint64_t offsetInVector = 0); + virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk); virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector); virtual common::page_idx_t append( ColumnChunk* columnChunk, common::page_idx_t startPageIdx, uint64_t nodeGroupIdx); - // TODO(Guodong): refactor these write interfaces. - void write(common::ValueVector* nodeIDVector, common::ValueVector* vectorToWriteFrom); - inline void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom) { - writeInternal(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); - } - virtual void setNull(common::offset_t nodeOffset); inline common::LogicalType getDataType() const { return dataType; } @@ -107,9 +105,14 @@ class NodeColumn { void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx, const std::function& func); + void write(common::ValueVector* nodeIDVector, common::ValueVector* vectorToWriteFrom); + inline void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, + uint32_t posInVectorToWriteFrom) { + writeInternal(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + } virtual void writeInternal(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); - void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, + virtual void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); // TODO(Guodong): This is mostly duplicated with @@ -138,7 +141,8 @@ class BoolNodeColumn : public NodeColumn { BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, bool requireNullColumn = true); - void batchLookup(const common::offset_t* nodeOffsets, size_t size, uint8_t* result) final; + void batchLookup(transaction::Transaction* transaction, const common::offset_t* nodeOffsets, + size_t size, uint8_t* result) final; }; class NullNodeColumn : public NodeColumn { diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index fdb85903ae..7a2f9cbb2a 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -41,9 +41,11 @@ class NodeTable { const std::vector& propertyVectors, const std::unordered_map& propertyIDToVectorIdx); - - void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector, - common::ValueVector* vectorToWriteFrom); + void update(transaction::Transaction* transaction, common::property_id_t propertyID, + common::ValueVector* nodeIDVector, common::ValueVector* propertyVector); + void update(transaction::Transaction* transaction, common::property_id_t propertyID, + common::offset_t nodeOffset, common::ValueVector* propertyVector, + common::sel_t posInPropertyVector); void delete_(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, DeleteState* deleteState); void append(NodeGroup* nodeGroup); diff --git a/src/include/storage/store/string_node_column.h b/src/include/storage/store/string_node_column.h index 8c6c3d37e5..34470e9b97 100644 --- a/src/include/storage/store/string_node_column.h +++ b/src/include/storage/store/string_node_column.h @@ -19,10 +19,18 @@ class StringNodeColumn : public NodeColumn { void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, common::ValueVector* resultVector, uint64_t offsetInVector = 0) final; + void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final; common::page_idx_t append(ColumnChunk* columnChunk, common::page_idx_t startPageIdx, common::node_group_idx_t nodeGroupIdx) final; + void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, + uint32_t posInVectorToWriteFrom) final; + + inline InMemDiskArray* getOverflowMetadataDA() { + return overflowMetadataDA.get(); + } + void checkpointInMemory() final; void rollbackInMemory() final; @@ -33,11 +41,12 @@ class StringNodeColumn : public NodeColumn { common::ValueVector* resultVector) final; private: + void writeOverflow(); void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr, common::ValueVector* resultVector, common::page_idx_t overflowPageIdx); private: - std::unique_ptr> overflowMetadataDA; + std::unique_ptr> overflowMetadataDA; }; } // namespace storage diff --git a/src/include/transaction/transaction.h b/src/include/transaction/transaction.h index 0ec0c8126f..5791ce123a 100644 --- a/src/include/transaction/transaction.h +++ b/src/include/transaction/transaction.h @@ -2,6 +2,8 @@ #include +#include "storage/local_storage.h" + namespace kuzu { namespace transaction { @@ -14,27 +16,37 @@ class Transaction { friend class TransactionManager; public: - constexpr Transaction(TransactionType transactionType, uint64_t transactionID) - : type{transactionType}, ID{transactionID} {} + Transaction(TransactionType transactionType, uint64_t transactionID, + storage::StorageManager* storageManager, storage::MemoryManager* mm) + : type{transactionType}, ID{transactionID} { + localStorage = std::make_unique(storageManager, mm); + } + + constexpr explicit Transaction(TransactionType transactionType) + : type{transactionType}, ID{UINT64_MAX} {} public: inline TransactionType getType() const { return type; } inline bool isReadOnly() const { return TransactionType::READ_ONLY == type; } inline bool isWriteTransaction() const { return TransactionType::WRITE == type; } inline uint64_t getID() const { return ID; } + inline storage::LocalStorage* getLocalStorage() { return localStorage.get(); } + static inline std::unique_ptr getDummyWriteTrx() { - return std::make_unique(TransactionType::WRITE, UINT64_MAX); + return std::make_unique(TransactionType::WRITE); } static inline std::unique_ptr getDummyReadOnlyTrx() { - return std::make_unique(TransactionType::READ_ONLY, UINT64_MAX); + return std::make_unique(TransactionType::READ_ONLY); } private: TransactionType type; + // TODO(Guodong): add type transaction_id_t. uint64_t ID; + std::unique_ptr localStorage; }; -static Transaction DUMMY_READ_TRANSACTION = Transaction(TransactionType::READ_ONLY, UINT64_MAX); +static Transaction DUMMY_READ_TRANSACTION = Transaction(TransactionType::READ_ONLY); } // namespace transaction } // namespace kuzu diff --git a/src/include/transaction/transaction_manager.h b/src/include/transaction/transaction_manager.h index 98ae3a85c4..4f1028bd95 100644 --- a/src/include/transaction/transaction_manager.h +++ b/src/include/transaction/transaction_manager.h @@ -16,10 +16,12 @@ namespace transaction { class TransactionManager { public: - explicit TransactionManager(storage::WAL& wal) + explicit TransactionManager( + storage::WAL& wal, storage::StorageManager* storageManager, storage::MemoryManager* mm) : logger{common::LoggerUtils::getLogger( common::LoggerConstants::LoggerEnum::TRANSACTION_MANAGER)}, - wal{wal}, activeWriteTransactionID{INT64_MAX}, lastTransactionID{0}, lastCommitID{0} {}; + wal{wal}, storageManager{storageManager}, mm{mm}, activeWriteTransactionID{INT64_MAX}, + lastTransactionID{0}, lastCommitID{0} {}; std::unique_ptr beginWriteTransaction(); std::unique_ptr beginReadOnlyTransaction(); void commit(Transaction* transaction); @@ -63,6 +65,8 @@ class TransactionManager { std::shared_ptr logger; storage::WAL& wal; + storage::StorageManager* storageManager; + storage::MemoryManager* mm; uint64_t activeWriteTransactionID; diff --git a/src/main/database.cpp b/src/main/database.cpp index 0f8127f055..484f96294e 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -55,7 +55,8 @@ Database::Database(std::string databasePath, SystemConfig systemConfig) recoverIfNecessary(); catalog = std::make_unique(wal.get()); storageManager = std::make_unique(*catalog, *memoryManager, wal.get()); - transactionManager = std::make_unique(*wal); + transactionManager = std::make_unique( + *wal, storageManager.get(), memoryManager.get()); } Database::~Database() { @@ -119,6 +120,7 @@ void Database::commit(Transaction* transaction, bool skipCheckpointForTestingRec } assert(transaction->isWriteTransaction()); catalog->prepareCommitOrRollback(TransactionAction::COMMIT); + transaction->getLocalStorage()->prepareCommit(); storageManager->prepareCommit(); // Note: It is enough to stop and wait transactions to leave the system instead of // for example checking on the query processor's task scheduler. This is because the diff --git a/src/main/storage_driver.cpp b/src/main/storage_driver.cpp index b8d39f4917..4302bba099 100644 --- a/src/main/storage_driver.cpp +++ b/src/main/storage_driver.cpp @@ -3,6 +3,7 @@ #include "storage/storage_manager.h" using namespace kuzu::common; +using namespace kuzu::transaction; namespace kuzu { namespace main { @@ -24,10 +25,11 @@ void StorageDriver::scan(const std::string& nodeName, const std::string& propert std::vector threads; auto numElementsPerThread = size / numThreads + 1; auto sizeLeft = size; + auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); while (sizeLeft > 0) { uint64_t sizeToRead = std::min(numElementsPerThread, sizeLeft); - threads.emplace_back( - &StorageDriver::scanColumn, this, column, offsets, sizeToRead, current_buffer); + threads.emplace_back(&StorageDriver::scanColumn, this, dummyReadOnlyTransaction.get(), + column, offsets, sizeToRead, current_buffer); offsets += sizeToRead; current_buffer += sizeToRead * column->getNumBytesPerValue(); sizeLeft -= sizeToRead; @@ -54,9 +56,9 @@ uint64_t StorageDriver::getNumRels(const std::string& relName) { return relStatistics->getNumTuples(); } -void StorageDriver::scanColumn( - storage::NodeColumn* column, offset_t* offsets, size_t size, uint8_t* result) { - column->batchLookup(offsets, size, result); +void StorageDriver::scanColumn(Transaction* transaction, storage::NodeColumn* column, + offset_t* offsets, size_t size, uint8_t* result) { + column->batchLookup(transaction, offsets, size, result); } } // namespace main diff --git a/src/processor/map/map_set.cpp b/src/processor/map/map_set.cpp index 83baa2ccf8..e28381abd0 100644 --- a/src/processor/map/map_set.cpp +++ b/src/processor/map/map_set.cpp @@ -22,22 +22,23 @@ std::unique_ptr PlanMapper::getNodeSetExecutor(storage::NodesSt } auto evaluator = expressionMapper.mapExpression(info->setItem.second, inSchema); if (node->isMultiLabeled()) { - std::unordered_map tableIDToColumn; + std::unordered_map tableIDToSetInfo; for (auto tableID : node->getTableIDs()) { if (!property->hasPropertyID(tableID)) { continue; } auto propertyID = property->getPropertyID(tableID); - auto column = store->getNodePropertyColumn(tableID, propertyID); - tableIDToColumn.insert({tableID, column}); + auto table = store->getNodeTable(tableID); + tableIDToSetInfo.insert({tableID, NodeSetInfo{table, propertyID}}); } return std::make_unique( - std::move(tableIDToColumn), nodeIDPos, propertyPos, std::move(evaluator)); + std::move(tableIDToSetInfo), nodeIDPos, propertyPos, std::move(evaluator)); } else { auto tableID = node->getSingleTableID(); - auto column = store->getNodePropertyColumn(tableID, property->getPropertyID(tableID)); + auto table = store->getNodeTable(tableID); return std::make_unique( - column, nodeIDPos, propertyPos, std::move(evaluator)); + NodeSetInfo{table, property->getPropertyID(tableID)}, nodeIDPos, propertyPos, + std::move(evaluator)); } } diff --git a/src/processor/operator/copy_from/copy_node.cpp b/src/processor/operator/copy_from/copy_node.cpp index a9ee45ee05..9125bb2a0f 100644 --- a/src/processor/operator/copy_from/copy_node.cpp +++ b/src/processor/operator/copy_from/copy_node.cpp @@ -96,7 +96,7 @@ void CopyNode::executeInternal(ExecutionContext* context) { if (localNodeGroup->isFull()) { node_group_idx_t nodeGroupIdx; if (copyNodeInfo.preservingOrder) { - nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset) - 1; + nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset) - 1; sharedState->currentNodeGroupIdx++; } else { nodeGroupIdx = sharedState->getNextNodeGroupIdx(); @@ -115,7 +115,7 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table, NodeGroup* nodeGroup) { nodeGroup->setNodeGroupIdx(nodeGroupIdx); - auto startOffset = StorageUtils::getStartOffsetForNodeGroup(nodeGroupIdx); + auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); if (pkIndex) { populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset, nodeGroup->getNumNodes() /* startPageIdx */); diff --git a/src/processor/operator/persistent/merge.cpp b/src/processor/operator/persistent/merge.cpp index adb9fe98ce..db18d43ddd 100644 --- a/src/processor/operator/persistent/merge.cpp +++ b/src/processor/operator/persistent/merge.cpp @@ -36,7 +36,7 @@ bool Merge::getNextTuplesInternal(ExecutionContext* context) { auto pos = markVector->state->selVector->selectedPositions[0]; if (!markVector->isNull(pos)) { for (auto& executor : onMatchNodeSetExecutors) { - executor->set(); + executor->set(context); } for (auto& executor : onMatchRelSetExecutors) { executor->set(); @@ -46,13 +46,13 @@ bool Merge::getNextTuplesInternal(ExecutionContext* context) { executor->insert(transaction); } for (auto& executor : nodeSetExecutors) { - executor->set(); + executor->set(context); } for (auto& executor : relInsertExecutors) { executor->insert(transaction); } for (auto& executor : onCreateNodeSetExecutors) { - executor->set(); + executor->set(context); } for (auto& executor : onCreateRelSetExecutors) { executor->set(); diff --git a/src/processor/operator/persistent/set.cpp b/src/processor/operator/persistent/set.cpp index 3ab16c61e4..4b70fc469b 100644 --- a/src/processor/operator/persistent/set.cpp +++ b/src/processor/operator/persistent/set.cpp @@ -14,7 +14,7 @@ bool SetNodeProperty::getNextTuplesInternal(ExecutionContext* context) { return false; } for (auto& executor : executors) { - executor->set(); + executor->set(context); } return true; } diff --git a/src/processor/operator/persistent/set_executor.cpp b/src/processor/operator/persistent/set_executor.cpp index 72a5fd3da1..40daeedfae 100644 --- a/src/processor/operator/persistent/set_executor.cpp +++ b/src/processor/operator/persistent/set_executor.cpp @@ -34,39 +34,39 @@ static void writeToPropertyVector(ValueVector* propertyVector, uint32_t property } } -void SingleLabelNodeSetExecutor::set() { +void SingleLabelNodeSetExecutor::set(ExecutionContext* context) { evaluator->evaluate(); + setInfo.table->update(context->transaction, setInfo.propertyID, nodeIDVector, rhsVector); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; ++i) { auto lhsPos = nodeIDVector->state->selVector->selectedPositions[i]; - auto& nodeID = nodeIDVector->getValue(lhsPos); auto rhsPos = lhsPos; if (rhsVector->state->selVector->selectedSize == 1) { rhsPos = rhsVector->state->selVector->selectedPositions[0]; } - column->write(nodeID.offset, rhsVector, rhsPos); if (lhsVector != nullptr) { writeToPropertyVector(lhsVector, lhsPos, rhsVector, rhsPos); } } } -void MultiLabelNodeSetExecutor::set() { +void MultiLabelNodeSetExecutor::set(ExecutionContext* context) { evaluator->evaluate(); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; ++i) { auto lhsPos = nodeIDVector->state->selVector->selectedPositions[i]; auto& nodeID = nodeIDVector->getValue(lhsPos); - if (!tableIDToColumn.contains(nodeID.tableID)) { + if (!tableIDToSetInfo.contains(nodeID.tableID)) { if (lhsVector != nullptr) { lhsVector->setNull(lhsPos, true); } continue; } - auto column = tableIDToColumn.at(nodeID.tableID); auto rhsPos = lhsPos; if (rhsVector->state->selVector->selectedSize == 1) { rhsPos = rhsVector->state->selVector->selectedPositions[0]; } - column->write(nodeID.offset, rhsVector, rhsPos); + auto& setInfo = tableIDToSetInfo.at(nodeID.tableID); + setInfo.table->update( + context->transaction, setInfo.propertyID, nodeID.offset, rhsVector, rhsPos); if (lhsVector != nullptr) { writeToPropertyVector(lhsVector, lhsPos, rhsVector, rhsPos); } diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index eda36a639f..3c8e4f624d 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -9,6 +9,8 @@ add_subdirectory(wal) add_library(kuzu_storage OBJECT file_handle.cpp + local_storage.cpp + local_table.cpp storage_info.cpp storage_manager.cpp storage_utils.cpp diff --git a/src/storage/copier/string_column_chunk.cpp b/src/storage/copier/string_column_chunk.cpp index 11637ed065..0ab135ee89 100644 --- a/src/storage/copier/string_column_chunk.cpp +++ b/src/storage/copier/string_column_chunk.cpp @@ -7,40 +7,11 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -// BLOB -template<> -void StringColumnChunk::setValueFromString( - const char* value, uint64_t length, uint64_t pos) { - if (length > BufferPoolConstants::PAGE_4KB_SIZE) { - length = BufferPoolConstants::PAGE_4KB_SIZE; - } - auto blobBuffer = std::make_unique(length); - auto blobLen = Blob::fromString(value, length, blobBuffer.get()); - auto val = overflowFile->copyString((char*)blobBuffer.get(), blobLen, overflowCursor); - setValue(val, pos); -} - -// STRING -template<> -void StringColumnChunk::setValueFromString( - const char* value, uint64_t length, uint64_t pos) { - if (length > BufferPoolConstants::PAGE_4KB_SIZE) { - length = BufferPoolConstants::PAGE_4KB_SIZE; - } - auto val = overflowFile->copyString(value, length, overflowCursor); - setValue(val, pos); -} - -// STRING -template<> -std::string StringColumnChunk::getValue(offset_t pos) const { - auto kuStr = ((ku_string_t*)buffer.get())[pos]; - return overflowFile->readString(&kuStr); -} - StringColumnChunk::StringColumnChunk(LogicalType dataType, CopyDescription* copyDescription) : ColumnChunk{std::move(dataType), copyDescription} { overflowFile = std::make_unique(); + overflowCursor.pageIdx = 0; + overflowCursor.offsetInPage = 0; } void StringColumnChunk::resetToEmpty() { @@ -97,6 +68,18 @@ void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk } } +void StringColumnChunk::update(ValueVector* vector, vector_idx_t vectorIdx) { + auto startOffsetInChunk = vectorIdx << DEFAULT_VECTOR_CAPACITY_LOG_2; + for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { + auto pos = vector->state->selVector->selectedPositions[i]; + auto offsetInChunk = startOffsetInChunk + pos; + nullChunk->setNull(offsetInChunk, vector->isNull(pos)); + if (!vector->isNull(pos)) { + auto kuStr = vector->getValue(pos); + setValueFromString(kuStr.getAsString().c_str(), kuStr.len, offsetInChunk); + } + } +} page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) { for (auto i = 0u; i < overflowFile->getNumPages(); i++) { FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data, @@ -128,6 +111,49 @@ void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other, } } +void StringColumnChunk::write(const Value& val, uint64_t posToWrite) { + assert(val.getDataType()->getPhysicalType() == PhysicalTypeID::STRING); + nullChunk->setNull(posToWrite, val.isNull()); + if (val.isNull()) { + return; + } + auto strVal = val.getValue(); + setValueFromString(strVal.c_str(), strVal.length(), posToWrite); +} + +// BLOB +template<> +void StringColumnChunk::setValueFromString( + const char* value, uint64_t length, uint64_t pos) { + if (length > BufferPoolConstants::PAGE_4KB_SIZE) { + throw CopyException( + ExceptionMessage::overLargeStringValueException(std::to_string(length))); + } + auto blobBuffer = std::make_unique(length); + auto blobLen = Blob::fromString(value, length, blobBuffer.get()); + auto val = overflowFile->copyString((char*)blobBuffer.get(), blobLen, overflowCursor); + setValue(val, pos); +} + +// STRING +template<> +void StringColumnChunk::setValueFromString( + const char* value, uint64_t length, uint64_t pos) { + if (length > BufferPoolConstants::PAGE_4KB_SIZE) { + throw CopyException( + ExceptionMessage::overLargeStringValueException(std::to_string(length))); + } + auto val = overflowFile->copyString(value, length, overflowCursor); + setValue(val, pos); +} + +// STRING +template<> +std::string StringColumnChunk::getValue(offset_t pos) const { + auto kuStr = ((ku_string_t*)buffer.get())[pos]; + return overflowFile->readString(&kuStr); +} + template void StringColumnChunk::templateCopyStringArrowArray( arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) { @@ -168,15 +194,5 @@ void StringColumnChunk::templateCopyStringValues( } } -void StringColumnChunk::write(const Value& val, uint64_t posToWrite) { - assert(val.getDataType()->getPhysicalType() == PhysicalTypeID::STRING); - nullChunk->setNull(posToWrite, val.isNull()); - if (val.isNull()) { - return; - } - auto strVal = val.getValue(); - setValueFromString(strVal.c_str(), strVal.length(), posToWrite); -} - } // namespace storage } // namespace kuzu diff --git a/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp b/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp index 58a6a29bc4..5d74c94082 100644 --- a/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp +++ b/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp @@ -339,7 +339,8 @@ template<> void InMemColumnChunkWithOverflow::setValWithOverflow( PageByteCursor& overflowCursor, const char* value, uint64_t length, uint64_t pos) { if (length > BufferPoolConstants::PAGE_4KB_SIZE) { - length = BufferPoolConstants::PAGE_4KB_SIZE; + throw CopyException( + ExceptionMessage::overLargeStringValueException(std::to_string(length))); } auto val = inMemOverflowFile->copyString(value, length, overflowCursor); setValue(val, pos); diff --git a/src/storage/in_mem_storage_structure/in_mem_lists.cpp b/src/storage/in_mem_storage_structure/in_mem_lists.cpp index aa076f3ae1..a1bf390012 100644 --- a/src/storage/in_mem_storage_structure/in_mem_lists.cpp +++ b/src/storage/in_mem_storage_structure/in_mem_lists.cpp @@ -395,7 +395,8 @@ void InMemListsWithOverflow::setValueFromStringWithOverflow( PageByteCursor& overflowCursor, offset_t nodeOffset, uint64_t pos, const char* val, uint64_t length) { if (length > BufferPoolConstants::PAGE_4KB_SIZE) { - length = BufferPoolConstants::PAGE_4KB_SIZE; + throw CopyException( + ExceptionMessage::overLargeStringValueException(std::to_string(length))); } auto stringVal = overflowInMemFile->copyString(val, length, overflowCursor); setValue(nodeOffset, pos, (uint8_t*)&stringVal); diff --git a/src/storage/local_storage.cpp b/src/storage/local_storage.cpp new file mode 100644 index 0000000000..0a6d3b3361 --- /dev/null +++ b/src/storage/local_storage.cpp @@ -0,0 +1,59 @@ +#include "storage/local_storage.h" + +#include "storage/storage_manager.h" +#include "storage/storage_utils.h" + +using namespace kuzu::common; +using namespace kuzu::transaction; + +namespace kuzu { +namespace storage { + +LocalStorage::LocalStorage(StorageManager* storageManager, MemoryManager* mm) + : nodesStore{&storageManager->getNodesStore()}, mm{mm} {} + +void LocalStorage::scan(table_id_t tableID, ValueVector* nodeIDVector, + const std::vector& columnIDs, const std::vector& outputVectors) { + if (!tables.contains(tableID)) { + return; + } + tables.at(tableID)->scan(nodeIDVector, columnIDs, outputVectors); +} + +void LocalStorage::lookup(table_id_t tableID, ValueVector* nodeIDVector, + const std::vector& columnIDs, const std::vector& outputVectors) { + if (!tables.contains(tableID)) { + return; + } + tables.at(tableID)->lookup(nodeIDVector, columnIDs, outputVectors); +} + +void LocalStorage::update(table_id_t tableID, property_id_t propertyID, ValueVector* nodeIDVector, + ValueVector* propertyVector) { + if (!tables.contains(tableID)) { + tables.emplace(tableID, std::make_unique(nodesStore->getNodeTable(tableID))); + } + tables.at(tableID)->update(propertyID, nodeIDVector, propertyVector, mm); +} + +void LocalStorage::update(table_id_t tableID, property_id_t propertyID, offset_t nodeOffset, + ValueVector* propertyVector, sel_t posInPropertyVector) { + if (!tables.contains(tableID)) { + tables.emplace(tableID, std::make_unique(nodesStore->getNodeTable(tableID))); + } + tables.at(tableID)->update(propertyID, nodeOffset, propertyVector, posInPropertyVector, mm); +} + +void LocalStorage::prepareCommit() { + for (auto& [_, table] : tables) { + table->prepareCommit(); + } + tables.clear(); +} + +void LocalStorage::prepareRollback() { + tables.clear(); +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/local_table.cpp b/src/storage/local_table.cpp new file mode 100644 index 0000000000..2799b87887 --- /dev/null +++ b/src/storage/local_table.cpp @@ -0,0 +1,251 @@ +#include "storage/local_table.h" + +#include "storage/copier/string_column_chunk.h" +#include "storage/store/node_table.h" +#include "storage/store/string_node_column.h" + +using namespace kuzu::common; +using namespace kuzu::transaction; + +namespace kuzu { +namespace storage { + +void LocalVector::scan(ValueVector* resultVector) const { + for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { + auto posInLocalVector = vector->state->selVector->selectedPositions[i]; + auto posInResultVector = resultVector->state->selVector->selectedPositions[i]; + resultVector->copyFromVectorData(posInResultVector, vector.get(), posInLocalVector); + } +} + +void LocalVector::lookup( + sel_t offsetInLocalVector, ValueVector* resultVector, sel_t offsetInResultVector) { + if (!validityMask[offsetInLocalVector]) { + return; + } + resultVector->copyFromVectorData(offsetInResultVector, vector.get(), offsetInLocalVector); +} + +void LocalVector::update( + sel_t offsetInLocalVector, ValueVector* updateVector, sel_t offsetInUpdateVector) { + vector->copyFromVectorData(offsetInLocalVector, updateVector, offsetInUpdateVector); + if (!validityMask[offsetInLocalVector]) { + vector->state->selVector->selectedPositions[vector->state->selVector->selectedSize++] = + offsetInLocalVector; + validityMask[offsetInLocalVector] = true; + } +} + +void StringLocalVector::update( + sel_t offsetInLocalVector, common::ValueVector* updateVector, sel_t offsetInUpdateVector) { + auto kuStr = updateVector->getValue(offsetInUpdateVector); + if (kuStr.len > BufferPoolConstants::PAGE_4KB_SIZE) { + throw RuntimeException( + ExceptionMessage::overLargeStringValueException(std::to_string(kuStr.len))); + } else if (!ku_string_t::isShortString(kuStr.len)) { + ovfStringLength += kuStr.len; + } + LocalVector::update(offsetInLocalVector, updateVector, offsetInUpdateVector); +} + +std::unique_ptr LocalVectorFactory::createLocalVectorData( + const LogicalType& logicalType, MemoryManager* mm) { + switch (logicalType.getPhysicalType()) { + case PhysicalTypeID::STRING: { + return std::make_unique(mm); + } + default: { + return std::make_unique(logicalType, mm); + } + } +} + +void LocalColumnChunk::scan(vector_idx_t vectorIdx, ValueVector* resultVector) { + if (!vectors.contains(vectorIdx)) { + return; + } + vectors.at(vectorIdx)->scan(resultVector); +} + +void LocalColumnChunk::lookup(vector_idx_t vectorIdx, sel_t offsetInLocalVector, + ValueVector* resultVector, sel_t offsetInResultVector) { + if (!vectors.contains(vectorIdx)) { + return; + } + vectors.at(vectorIdx)->lookup(offsetInLocalVector, resultVector, offsetInResultVector); +} + +void LocalColumnChunk::update(vector_idx_t vectorIdx, sel_t offsetInLocalVector, + ValueVector* updateVector, sel_t offsetInUpdateVector) { + if (!vectors.contains(vectorIdx)) { + vectors.emplace( + vectorIdx, LocalVectorFactory::createLocalVectorData(updateVector->dataType, mm)); + } + vectors.at(vectorIdx)->update(offsetInLocalVector, updateVector, offsetInUpdateVector); +} + +void LocalColumn::scan(ValueVector* nodeIDVector, ValueVector* outputVector) { + assert(nodeIDVector->isSequential()); + auto nodeID = nodeIDVector->getValue(0); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeID.offset); + if (!chunks.contains(nodeGroupIdx)) { + return; + } + auto vectorIdxInChunk = StorageUtils::getVectorIdxInChunk(nodeID.offset, nodeGroupIdx); + chunks.at(nodeGroupIdx)->scan(vectorIdxInChunk, outputVector); +} + +void LocalColumn::lookup(ValueVector* nodeIDVector, ValueVector* outputVector) { + // TODO(Guodong): Should optimize. Sort nodeIDVector by node group idx. + for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { + auto pos = nodeIDVector->state->selVector->selectedPositions[i]; + auto nodeID = nodeIDVector->getValue(pos); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeID.offset); + if (!chunks.contains(nodeGroupIdx)) { + return; + } + auto [vectorIdxInChunk, offsetInVector] = + StorageUtils::getVectorIdxInChunkAndOffsetInVector(nodeID.offset, nodeGroupIdx); + chunks.at(nodeGroupIdx) + ->lookup(vectorIdxInChunk, offsetInVector, outputVector, + outputVector->state->selVector->selectedPositions[i]); + } +} + +void LocalColumn::update( + ValueVector* nodeIDVector, ValueVector* propertyVector, MemoryManager* mm) { + for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { + auto pos = nodeIDVector->state->selVector->selectedPositions[i]; + auto nodeID = nodeIDVector->getValue(pos); + update(nodeID.offset, propertyVector, + propertyVector->state->selVector->selectedPositions[i], mm); + } +} + +void LocalColumn::update(offset_t nodeOffset, ValueVector* propertyVector, + sel_t posInPropertyVector, MemoryManager* mm) { + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + if (!chunks.contains(nodeGroupIdx)) { + chunks.emplace(nodeGroupIdx, std::make_unique(mm)); + } + auto chunk = chunks.at(nodeGroupIdx).get(); + auto [vectorIdxInChunk, offsetInVector] = + StorageUtils::getVectorIdxInChunkAndOffsetInVector(nodeOffset, nodeGroupIdx); + chunk->update(vectorIdxInChunk, offsetInVector, propertyVector, posInPropertyVector); +} + +void LocalColumn::prepareCommit() { + for (auto& [nodeGroupIdx, _] : chunks) { + prepareCommitForChunk(nodeGroupIdx); + } +} + +void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { + auto nodeGroupStartOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + auto chunk = chunks.at(nodeGroupIdx).get(); + for (auto& [vectorIdx, vector] : chunk->vectors) { + auto vectorStartOffset = + nodeGroupStartOffset + StorageUtils::getStartOffsetOfVector(vectorIdx); + for (auto i = 0u; i < vector->vector->state->selVector->selectedSize; i++) { + auto pos = vector->vector->state->selVector->selectedPositions[i]; + assert(vector->validityMask[pos]); + column->write(vectorStartOffset + pos, vector->vector.get(), pos); + } + } +} + +void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { + auto localChunk = chunks.at(nodeGroupIdx).get(); + auto stringColumn = reinterpret_cast(column); + auto overflowMetadata = + stringColumn->getOverflowMetadataDA()->get(nodeGroupIdx, TransactionType::WRITE); + auto ovfStringLengthInChunk = 0u; + for (auto& [_, localVector] : localChunk->vectors) { + auto stringLocalVector = reinterpret_cast(localVector.get()); + ovfStringLengthInChunk += stringLocalVector->ovfStringLength; + } + if (overflowMetadata.lastOffsetInPage + ovfStringLengthInChunk <= + BufferPoolConstants::PAGE_4KB_SIZE) { + // Write the updated overflow strings to the overflow string buffer. + LocalColumn::prepareCommitForChunk(nodeGroupIdx); + } else { + commitLocalChunkOutOfPlace(nodeGroupIdx, localChunk); + } +} + +void StringLocalColumn::commitLocalChunkOutOfPlace( + node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) { + auto stringColumn = reinterpret_cast(column); + // Trigger rewriting the column chunk to another new place. + auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType()); + auto stringColumnChunk = reinterpret_cast(columnChunk.get()); + // First scan the whole column chunk into StringColumnChunk. + stringColumn->scan(nodeGroupIdx, stringColumnChunk); + for (auto& [vectorIdx, vector] : localChunk->vectors) { + stringColumnChunk->update(vector->vector.get(), vectorIdx); + } + // Append the updated StringColumnChunk back to column. + auto numPages = stringColumnChunk->getNumPages(); + auto startPageIdx = column->dataFH->addNewPages(numPages); + column->append(stringColumnChunk, startPageIdx, nodeGroupIdx); +} + +std::unique_ptr LocalColumnFactory::createLocalColumn(NodeColumn* column) { + switch (column->getDataType().getPhysicalType()) { + case PhysicalTypeID::STRING: { + return std::make_unique(column); + } + default: { + return std::make_unique(column); + } + } +} + +void LocalTable::scan(ValueVector* nodeIDVector, const std::vector& columnIDs, + const std::vector& outputVectors) { + for (auto i = 0u; i < columnIDs.size(); i++) { + auto columnID = columnIDs[i]; + if (!columns.contains(columnID)) { + continue; + } + columns.at(columnID)->scan(nodeIDVector, outputVectors[i]); + } +} + +void LocalTable::lookup(ValueVector* nodeIDVector, const std::vector& columnIDs, + const std::vector& outputVectors) { + for (auto i = 0u; i < columnIDs.size(); i++) { + auto columnID = columnIDs[i]; + if (!columns.contains(columnID)) { + continue; + } + columns.at(columnID)->lookup(nodeIDVector, outputVectors[i]); + } +} + +void LocalTable::update(property_id_t propertyID, ValueVector* nodeIDVector, + ValueVector* propertyVector, MemoryManager* mm) { + if (!columns.contains(propertyID)) { + columns.emplace(propertyID, + LocalColumnFactory::createLocalColumn(table->getPropertyColumn(propertyID))); + } + columns.at(propertyID)->update(nodeIDVector, propertyVector, mm); +} + +void LocalTable::update(property_id_t propertyID, offset_t nodeOffset, ValueVector* propertyVector, + sel_t posInPropertyVector, MemoryManager* mm) { + if (!columns.contains(propertyID)) { + columns.emplace(propertyID, + LocalColumnFactory::createLocalColumn(table->getPropertyColumn(propertyID))); + } + columns.at(propertyID)->update(nodeOffset, propertyVector, posInPropertyVector, mm); +} + +void LocalTable::prepareCommit() { + for (auto& [_, column] : columns) { + column->prepareCommit(); + } +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 378f6b1103..605c1052f9 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -532,11 +532,13 @@ template class BaseDiskArray>; template class BaseDiskArray>; template class BaseDiskArray; template class BaseDiskArray; +template class BaseDiskArray; template class BaseInMemDiskArray; template class BaseInMemDiskArray>; template class BaseInMemDiskArray>; template class BaseInMemDiskArray; template class BaseInMemDiskArray; +template class BaseInMemDiskArray; template class InMemDiskArrayBuilder; template class InMemDiskArrayBuilder>; template class InMemDiskArrayBuilder>; @@ -547,6 +549,7 @@ template class InMemDiskArray>; template class InMemDiskArray>; template class InMemDiskArray; template class InMemDiskArray; +template class InMemDiskArray; } // namespace storage } // namespace kuzu diff --git a/src/storage/store/node_column.cpp b/src/storage/store/node_column.cpp index eef09729a0..0ebed89be0 100644 --- a/src/storage/store/node_column.cpp +++ b/src/storage/store/node_column.cpp @@ -112,15 +112,14 @@ NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeader } } -void NodeColumn::batchLookup(const offset_t* nodeOffsets, size_t size, uint8_t* result) { - auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); +void NodeColumn::batchLookup( + Transaction* transaction, const offset_t* nodeOffsets, size_t size, uint8_t* result) { for (auto i = 0u; i < size; ++i) { auto nodeOffset = nodeOffsets[i]; - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); auto cursor = PageUtils::getPageElementCursorForPos(nodeOffset, numValuesPerPage); - cursor.pageIdx += - metadataDA->get(nodeGroupIdx, dummyReadOnlyTransaction->getType()).pageIdx; - readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) -> void { + cursor.pageIdx += metadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; + readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void { memcpy(result + i * numBytesPerFixedSizedValue, frame + (cursor.elemPosInPage * numBytesPerFixedSizedValue), numBytesPerFixedSizedValue); @@ -128,15 +127,14 @@ void NodeColumn::batchLookup(const offset_t* nodeOffsets, size_t size, uint8_t* } } -void BoolNodeColumn::batchLookup(const offset_t* nodeOffsets, size_t size, uint8_t* result) { +void BoolNodeColumn::batchLookup( + Transaction* transaction, const offset_t* nodeOffsets, size_t size, uint8_t* result) { for (auto i = 0u; i < size; ++i) { auto nodeOffset = nodeOffsets[i]; - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); auto cursor = PageUtils::getPageElementCursorForPos(nodeOffset, numValuesPerPage); - auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); - cursor.pageIdx += - metadataDA->get(nodeGroupIdx, dummyReadOnlyTransaction->getType()).pageIdx; - readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) -> void { + cursor.pageIdx += metadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; + readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void { // De-compress bitpacked bools result[i] = NullMask::isNull((uint64_t*)frame, cursor.elemPosInPage); }); @@ -149,13 +147,36 @@ void NodeColumn::scan( scanInternal(transaction, nodeIDVector, resultVector); } +void NodeColumn::scan(transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx, + offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector, + uint64_t offsetInVector) { + if (nullColumn) { + nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, + resultVector, offsetInVector); + } + auto pageCursor = PageUtils::getPageElementCursorForPos(startOffsetInGroup, numValuesPerPage); + auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); + pageCursor.pageIdx += chunkMeta.pageIdx; + auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; + scanUnfiltered(transaction, pageCursor, numValuesToScan, resultVector, offsetInVector); +} + +void NodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { + if (nullColumn) { + nullColumn->scan(nodeGroupIdx, columnChunk->getNullChunk()); + } + auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + FileUtils::readFromFile(dataFH->getFileInfo(), columnChunk->getData(), + columnChunk->getNumBytes(), chunkMetadata.pageIdx * BufferPoolConstants::PAGE_4KB_SIZE); +} + void NodeColumn::scanInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { auto startNodeOffset = nodeIDVector->readNodeOffset(0); assert(startNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(startNodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); auto offsetInNodeGroup = - startNodeOffset - StorageUtils::getStartOffsetForNodeGroup(nodeGroupIdx); + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, numValuesPerPage); auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); pageCursor.pageIdx += chunkMeta.pageIdx; @@ -230,7 +251,7 @@ void NodeColumn::lookupInternal( void NodeColumn::lookupValue(transaction::Transaction* transaction, offset_t nodeOffset, ValueVector* resultVector, uint32_t posInVector) { - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); auto pageCursor = PageUtils::getPageElementCursorForPos(nodeOffset, numValuesPerPage); pageCursor.pageIdx += metadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { @@ -246,6 +267,31 @@ void NodeColumn::readFromPage( bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, func); } +page_idx_t NodeColumn::append( + ColumnChunk* columnChunk, page_idx_t startPageIdx, uint64_t nodeGroupIdx) { + // Main column chunk. + page_idx_t numPagesFlushed = 0; + auto numPagesForChunk = columnChunk->flushBuffer(dataFH, startPageIdx); + metadataDA->resize(nodeGroupIdx + 1); + metadataDA->update(nodeGroupIdx, ColumnChunkMetadata{startPageIdx, numPagesForChunk}); + numPagesFlushed += numPagesForChunk; + startPageIdx += numPagesForChunk; + // Null column chunk. + auto numPagesForNullChunk = + nullColumn->append(columnChunk->getNullChunk(), startPageIdx, nodeGroupIdx); + numPagesFlushed += numPagesForNullChunk; + startPageIdx += numPagesForNullChunk; + // Children column chunks. + assert(childrenColumns.size() == columnChunk->getNumChildren()); + for (auto i = 0u; i < childrenColumns.size(); i++) { + auto numPagesForChild = + childrenColumns[i]->append(columnChunk->getChild(i), startPageIdx, nodeGroupIdx); + numPagesFlushed += numPagesForChild; + startPageIdx += numPagesForChild; + } + return numPagesFlushed; +} + void NodeColumn::write(ValueVector* nodeIDVector, ValueVector* vectorToWriteFrom) { if (nodeIDVector->state->isFlat() && vectorToWriteFrom->state->isFlat()) { auto nodeOffset = @@ -273,31 +319,6 @@ void NodeColumn::write(ValueVector* nodeIDVector, ValueVector* vectorToWriteFrom } } -page_idx_t NodeColumn::append( - ColumnChunk* columnChunk, page_idx_t startPageIdx, uint64_t nodeGroupIdx) { - // Main column chunk. - page_idx_t numPagesFlushed = 0; - auto numPagesForChunk = columnChunk->flushBuffer(dataFH, startPageIdx); - metadataDA->resize(nodeGroupIdx + 1); - metadataDA->update(nodeGroupIdx, ColumnChunkMetadata{startPageIdx, numPagesForChunk}); - numPagesFlushed += numPagesForChunk; - startPageIdx += numPagesForChunk; - // Null column chunk. - auto numPagesForNullChunk = - nullColumn->append(columnChunk->getNullChunk(), startPageIdx, nodeGroupIdx); - numPagesFlushed += numPagesForNullChunk; - startPageIdx += numPagesForNullChunk; - // Children column chunks. - assert(childrenColumns.size() == columnChunk->getNumChildren()); - for (auto i = 0u; i < childrenColumns.size(); i++) { - auto numPagesForChild = - childrenColumns[i]->append(columnChunk->getChild(i), startPageIdx, nodeGroupIdx); - numPagesFlushed += numPagesForChild; - startPageIdx += numPagesForChild; - } - return numPagesFlushed; -} - void NodeColumn::writeInternal( offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { nullColumn->writeInternal(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); @@ -324,8 +345,9 @@ void NodeColumn::writeValue( } WALPageIdxPosInPageAndFrame NodeColumn::createWALVersionOfPageForValue(offset_t nodeOffset) { - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset); - auto originalPageCursor = PageUtils::getPageElementCursorForPos(nodeOffset, numValuesPerPage); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto originalPageCursor = PageUtils::getPageElementCursorForPos( + nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx), numValuesPerPage); originalPageCursor.pageIdx += metadataDA->get(nodeGroupIdx, TransactionType::WRITE).pageIdx; bool insertingNewPage = false; if (originalPageCursor.pageIdx >= dataFH->getNumPages()) { @@ -378,27 +400,13 @@ void NodeColumn::populateWithDefaultVal(const catalog::Property& property, } } -void NodeColumn::scan(transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx, - offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector, - uint64_t offsetInVector) { - if (nullColumn) { - nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, - resultVector, offsetInVector); - } - auto pageCursor = PageUtils::getPageElementCursorForPos(startOffsetInGroup, numValuesPerPage); - auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); - pageCursor.pageIdx += chunkMeta.pageIdx; - auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; - scanUnfiltered(transaction, pageCursor, numValuesToScan, resultVector, offsetInVector); -} - // Page size must be aligned to 8 byte chunks for the 64-bit NullMask algorithms to work // without the possibility of memory errors from reading/writing off the end of a page. static_assert(PageUtils::getNumElementsInAPage(1, false /*requireNullColumn*/) % 8 == 0); -BoolNodeColumn::BoolNodeColumn(const catalog::MetadataDAHInfo& metaDAHeaderInfo, - BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, - Transaction* transaction, bool requireNullColumn) +BoolNodeColumn::BoolNodeColumn(const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, + BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, Transaction* transaction, + bool requireNullColumn) : NodeColumn{LogicalType(LogicalTypeID::BOOL), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, requireNullColumn} { readNodeColumnFunc = BoolNodeColumnFunc::readValuesFromPage; @@ -454,9 +462,8 @@ void NullNodeColumn::writeInternal( writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); } -SerialNodeColumn::SerialNodeColumn(const catalog::MetadataDAHInfo& metaDAHeaderInfo, - BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, - Transaction* transaction) +SerialNodeColumn::SerialNodeColumn(const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, + BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, Transaction* transaction) : NodeColumn{LogicalType(LogicalTypeID::SERIAL), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, false} {} @@ -487,8 +494,8 @@ page_idx_t SerialNodeColumn::append( } std::unique_ptr NodeColumnFactory::createNodeColumn(const LogicalType& dataType, - const catalog::MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, - BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, Transaction* transaction) { + const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, + BufferManager* bufferManager, WAL* wal, Transaction* transaction) { switch (dataType.getLogicalTypeID()) { case LogicalTypeID::BOOL: { return std::make_unique( diff --git a/src/storage/store/node_table.cpp b/src/storage/store/node_table.cpp index 23f1425e3d..f028994548 100644 --- a/src/storage/store/node_table.cpp +++ b/src/storage/store/node_table.cpp @@ -1,5 +1,7 @@ #include "storage/store/node_table.h" +#include "transaction/transaction.h" + using namespace kuzu::catalog; using namespace kuzu::common; using namespace kuzu::transaction; @@ -56,19 +58,26 @@ void NodeTable::scan(Transaction* transaction, ValueVector* nodeIDVector, propertyColumns.at(columnIds[i])->scan(transaction, nodeIDVector, outputVectors[i]); } } + if (transaction->isWriteTransaction()) { + auto localStorage = transaction->getLocalStorage(); + localStorage->scan(tableID, nodeIDVector, columnIds, outputVectors); + } } void NodeTable::lookup(Transaction* transaction, ValueVector* nodeIDVector, - const std::vector& columnIds, const std::vector& outputVectors) { - assert(columnIds.size() == outputVectors.size()); + const std::vector& columnIDs, const std::vector& outputVectors) { auto pos = nodeIDVector->state->selVector->selectedPositions[0]; - for (auto i = 0u; i < columnIds.size(); i++) { - if (columnIds[i] == INVALID_COLUMN_ID) { + for (auto i = 0u; i < columnIDs.size(); i++) { + auto columnID = columnIDs[i]; + if (columnID == INVALID_COLUMN_ID) { outputVectors[i]->setNull(pos, true); } else { - propertyColumns.at(columnIds[i])->lookup(transaction, nodeIDVector, outputVectors[i]); + propertyColumns.at(columnIDs[i])->lookup(transaction, nodeIDVector, outputVectors[i]); } } + if (transaction->isWriteTransaction()) { + transaction->getLocalStorage()->lookup(tableID, nodeIDVector, columnIDs, outputVectors); + } } void NodeTable::insert(Transaction* transaction, ValueVector* nodeIDVector, @@ -87,24 +96,33 @@ void NodeTable::insert(Transaction* transaction, ValueVector* nodeIDVector, insertPK(nodeIDVector, propertyVectors[propertyIDToVectorIdx.at(pkPropertyID)]); } auto currentNumNodeGroups = getNumNodeGroups(transaction); - if (lastOffset >= StorageUtils::getStartOffsetForNodeGroup(currentNumNodeGroups)) { + if (lastOffset >= StorageUtils::getStartOffsetOfNodeGroup(currentNumNodeGroups)) { auto newNodeGroup = std::make_unique(this); newNodeGroup->setNodeGroupIdx(currentNumNodeGroups); append(newNodeGroup.get()); } for (auto& [propertyID, column] : propertyColumns) { - assert(propertyIDToVectorIdx.contains(propertyID)); - if (column->getDataType().getLogicalTypeID() != LogicalTypeID::SERIAL) { - column->write(nodeIDVector, propertyVectors[propertyIDToVectorIdx.at(propertyID)]); + if (column->getDataType().getLogicalTypeID() == LogicalTypeID::SERIAL) { + continue; } + assert(propertyIDToVectorIdx.contains(propertyID)); + transaction->getLocalStorage()->update(tableID, propertyID, nodeIDVector, + propertyVectors[propertyIDToVectorIdx.at(propertyID)]); } wal->addToUpdatedNodeTables(tableID); } -void NodeTable::update( - property_id_t propertyID, ValueVector* nodeIDVector, ValueVector* vectorToWriteFrom) { +void NodeTable::update(Transaction* transaction, property_id_t propertyID, + ValueVector* nodeIDVector, ValueVector* vectorToWriteFrom) { + assert(propertyColumns.contains(propertyID)); + transaction->getLocalStorage()->update(tableID, propertyID, nodeIDVector, vectorToWriteFrom); +} + +void NodeTable::update(Transaction* transaction, property_id_t propertyID, offset_t nodeOffset, + ValueVector* propertyVector, sel_t posInPropertyVector) { assert(propertyColumns.contains(propertyID)); - propertyColumns.at(propertyID)->write(nodeIDVector, vectorToWriteFrom); + transaction->getLocalStorage()->update( + tableID, propertyID, nodeOffset, propertyVector, posInPropertyVector); } void NodeTable::delete_( diff --git a/src/storage/store/string_node_column.cpp b/src/storage/store/string_node_column.cpp index 6e3a2bd33d..ae0f9b1864 100644 --- a/src/storage/store/string_node_column.cpp +++ b/src/storage/store/string_node_column.cpp @@ -13,11 +13,10 @@ void StringNodeColumnFunc::writeStringValuesToPage( uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector) { auto kuStrInFrame = (ku_string_t*)(frame + (posInFrame * sizeof(ku_string_t))); auto kuStrInVector = vector->getValue(posInVector); - if (kuStrInVector.len > ku_string_t::SHORT_STR_LENGTH) { - throw NotImplementedException("VarSizedNodeColumnFunc::writeStringValuesToPage"); - } - memcpy(kuStrInFrame->prefix, kuStrInVector.prefix, kuStrInVector.len); + memcpy(kuStrInFrame->prefix, kuStrInVector.prefix, + std::min((uint64_t)kuStrInVector.len, ku_string_t::SHORT_STR_LENGTH)); kuStrInFrame->len = kuStrInVector.len; + kuStrInFrame->overflowPtr = kuStrInVector.overflowPtr; } StringNodeColumn::StringNodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, @@ -28,12 +27,12 @@ StringNodeColumn::StringNodeColumn(LogicalType dataType, const MetadataDAHInfo& if (this->dataType.getLogicalTypeID() == LogicalTypeID::STRING) { writeNodeColumnFunc = StringNodeColumnFunc::writeStringValuesToPage; } - overflowMetadataDA = std::make_unique>(*metadataFH, + overflowMetadataDA = std::make_unique>(*metadataFH, StorageStructureID::newMetadataID(), metaDAHeaderInfo.childrenInfos[0]->dataDAHPageIdx, bufferManager, wal, transaction); } -void StringNodeColumn::scan(transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx, +void StringNodeColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector, uint64_t offsetInVector) { nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, @@ -52,18 +51,54 @@ void StringNodeColumn::scan(transaction::Transaction* transaction, node_group_id } } +void StringNodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { + NodeColumn::scan(nodeGroupIdx, columnChunk); + auto stringColumnChunk = reinterpret_cast(columnChunk); + auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); + auto inMemOverflowFile = stringColumnChunk->getOverflowFile(); + inMemOverflowFile->addNewPages(overflowMetadata.numPages); + for (auto i = 0u; i < overflowMetadata.numPages; i++) { + auto pageIdx = overflowMetadata.pageIdx + i; + FileUtils::readFromFile(dataFH->getFileInfo(), inMemOverflowFile->getPage(i)->data, + BufferPoolConstants::PAGE_4KB_SIZE, pageIdx * BufferPoolConstants::PAGE_4KB_SIZE); + } +} + page_idx_t StringNodeColumn::append( - storage::ColumnChunk* columnChunk, page_idx_t startPageIdx, node_group_idx_t nodeGroupIdx) { + ColumnChunk* columnChunk, page_idx_t startPageIdx, node_group_idx_t nodeGroupIdx) { auto numPagesForMainChunk = NodeColumn::append(columnChunk, startPageIdx, nodeGroupIdx); auto stringColumnChunk = reinterpret_cast(columnChunk); auto numPagesForOverflow = stringColumnChunk->flushOverflowBuffer(dataFH, startPageIdx + numPagesForMainChunk); overflowMetadataDA->resize(nodeGroupIdx + 1); - overflowMetadataDA->update(nodeGroupIdx, - ColumnChunkMetadata{startPageIdx + numPagesForMainChunk, numPagesForOverflow}); + overflowMetadataDA->update( + nodeGroupIdx, OverflowColumnChunkMetadata{startPageIdx + numPagesForMainChunk, + numPagesForOverflow, stringColumnChunk->getLastOffsetInPage()}); return numPagesForMainChunk + numPagesForOverflow; } +void StringNodeColumn::writeValue( + offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { + auto& kuStr = vectorToWriteFrom->getValue(posInVectorToWriteFrom); + if (!ku_string_t::isShortString(kuStr.len)) { + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); + auto overflowPageIdxInChunk = overflowMetadata.numPages - 1; + auto walPageIdxAndFrame = StorageStructureUtils::createWALVersionIfNecessaryAndPinPage( + overflowMetadata.pageIdx + overflowPageIdxInChunk, false /* insertingNewPage */, + *dataFH, storageStructureID, *bufferManager, *wal); + memcpy(walPageIdxAndFrame.frame + overflowMetadata.lastOffsetInPage, + reinterpret_cast(kuStr.overflowPtr), kuStr.len); + bufferManager->unpin(*wal->fileHandle, walPageIdxAndFrame.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx); + TypeUtils::encodeOverflowPtr( + kuStr.overflowPtr, overflowPageIdxInChunk, overflowMetadata.lastOffsetInPage); + overflowMetadata.lastOffsetInPage += kuStr.len; + overflowMetadataDA->update(nodeGroupIdx, overflowMetadata); + } + NodeColumn::writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); +} + void StringNodeColumn::checkpointInMemory() { NodeColumn::checkpointInMemory(); overflowMetadataDA->checkpointInMemoryIfNecessary(); @@ -79,7 +114,7 @@ void StringNodeColumn::scanInternal( assert(resultVector->dataType.getPhysicalType() == PhysicalTypeID::STRING); auto startNodeOffset = nodeIDVector->readNodeOffset(0); assert(startNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(startNodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); NodeColumn::scanInternal(transaction, nodeIDVector, resultVector); auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { @@ -96,10 +131,10 @@ void StringNodeColumn::lookupInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { assert(dataType.getPhysicalType() == PhysicalTypeID::STRING); auto startNodeOffset = nodeIDVector->readNodeOffset(0); - auto overflowPageIdx = overflowMetadataDA - ->get(StorageUtils::getNodeGroupIdxFromNodeOffset(startNodeOffset), - transaction->getType()) - .pageIdx; + auto overflowPageIdx = + overflowMetadataDA + ->get(StorageUtils::getNodeGroupIdx(startNodeOffset), transaction->getType()) + .pageIdx; NodeColumn::lookupInternal(transaction, nodeIDVector, resultVector); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; diff --git a/src/storage/store/var_list_node_column.cpp b/src/storage/store/var_list_node_column.cpp index fbc112460a..ab69d4beff 100644 --- a/src/storage/store/var_list_node_column.cpp +++ b/src/storage/store/var_list_node_column.cpp @@ -39,9 +39,9 @@ void VarListNodeColumn::scanInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { resultVector->resetAuxiliaryBuffer(); auto startNodeOffset = nodeIDVector->readNodeOffset(0); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(startNodeOffset); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); auto startNodeOffsetInGroup = - startNodeOffset - StorageUtils::getStartOffsetForNodeGroup(nodeGroupIdx); + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); auto listOffsetInfoInStorage = getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup, startNodeOffsetInGroup + nodeIDVector->state->originalSize, resultVector->state); @@ -54,16 +54,16 @@ void VarListNodeColumn::scanInternal( void VarListNodeColumn::lookupValue(Transaction* transaction, offset_t nodeOffset, ValueVector* resultVector, uint32_t posInVector) { - auto nodeGroupIdx = StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset); - auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetForNodeGroup(nodeGroupIdx); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); auto listOffset = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup); auto length = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup + 1) - readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup); auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue(posInVector - 1); resultVector->setValue(posInVector, list_entry_t{offsetInVector, length}); ListVector::resizeDataVector(resultVector, offsetInVector + length); - dataNodeColumn->scan(transaction, StorageUtils::getNodeGroupIdxFromNodeOffset(nodeOffset), - listOffset, listOffset + length, ListVector::getDataVector(resultVector), offsetInVector); + dataNodeColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listOffset, + listOffset + length, ListVector::getDataVector(resultVector), offsetInVector); } page_idx_t VarListNodeColumn::append( diff --git a/src/transaction/transaction_manager.cpp b/src/transaction/transaction_manager.cpp index 684034feea..e356a58775 100644 --- a/src/transaction/transaction_manager.cpp +++ b/src/transaction/transaction_manager.cpp @@ -17,7 +17,8 @@ std::unique_ptr TransactionManager::beginWriteTransaction() { "Cannot start a new write transaction in the system. Only one write transaction at a " "time is allowed in the system."); } - auto transaction = std::make_unique(TransactionType::WRITE, ++lastTransactionID); + auto transaction = std::make_unique( + TransactionType::WRITE, ++lastTransactionID, storageManager, mm); activeWriteTransactionID = lastTransactionID; return transaction; } @@ -27,8 +28,8 @@ std::unique_ptr TransactionManager::beginReadOnlyTransaction() { // ensures calls to other public functions is not restricted. lock_t newTransactionLck{mtxForStartingNewTransactions}; lock_t publicFunctionLck{mtxForSerializingPublicFunctionCalls}; - auto transaction = - std::make_unique(TransactionType::READ_ONLY, ++lastTransactionID); + auto transaction = std::make_unique( + TransactionType::READ_ONLY, ++lastTransactionID, storageManager, mm); activeReadOnlyTransactionIDs.insert(transaction->getID()); return transaction; } diff --git a/test/storage/node_insertion_deletion_test.cpp b/test/storage/node_insertion_deletion_test.cpp index 24f9b229bb..e45120c478 100644 --- a/test/storage/node_insertion_deletion_test.cpp +++ b/test/storage/node_insertion_deletion_test.cpp @@ -23,73 +23,33 @@ class NodeInsertionDeletionTests : public DBTest { void initDBAndConnection() { createDBAndConn(); readConn = std::make_unique(database.get()); - table_id_t personTableID = - getCatalog(*database)->getReadOnlyVersion()->getTableID("person"); - personNodeTable = getStorageManager(*database)->getNodesStore().getNodeTable(personTableID); - uint32_t idPropertyID = getCatalog(*database) - ->getReadOnlyVersion() - ->getNodeProperty(personTableID, "ID") - ->getPropertyID(); - idColumn = getStorageManager(*database)->getNodesStore().getNodePropertyColumn( - personTableID, idPropertyID); conn->beginWriteTransaction(); } - offset_t addNode() { - // TODO(Guodong/Semih/Xiyang): Currently it is not clear when and from where the hash index, - // structured columns, adjacency Lists, and adj columns of a - // newly added node should be informed that a new node is being inserted, so these data - // structures either write values or NULLs or empty Lists etc. Within the scope of these - // tests we only have an ID column and we are manually from outside - // NodesStatisticsAndDeletedIDs adding a NULL value for the ID. This should change later. - offset_t nodeOffset = personNodeTable->getNodeStatisticsAndDeletedIDs()->addNode( - personNodeTable->getTableID()); - auto dataChunk = std::make_shared(2); - // Flatten the data chunk - dataChunk->state->currIdx = 0; - auto nodeIDVector = - std::make_shared(LogicalTypeID::INTERNAL_ID, getMemoryManager(*database)); - dataChunk->insert(0, nodeIDVector); - auto idVector = - std::make_shared(LogicalTypeID::INT64, getMemoryManager(*database)); - dataChunk->insert(1, idVector); - ((nodeID_t*)nodeIDVector->getData())[0].offset = nodeOffset; - idVector->setNull(0, true /* is null */); - idColumn->write(nodeIDVector.get(), idVector.get()); - return nodeOffset; + void deleteNode(offset_t id) { + auto res = conn->query("MATCH (a:person) WHERE a.ID = " + std::to_string(id) + " DELETE a"); + ASSERT_TRUE(res->isSuccess()); + } + + void addNode(offset_t id) { + auto res = conn->query("CREATE (a:person {ID: " + std::to_string(id) + "})"); + ASSERT_TRUE(res->isSuccess()); } public: std::unique_ptr readConn; - NodeTable* personNodeTable; - NodeColumn* idColumn; }; -TEST_F(NodeInsertionDeletionTests, DeletingSameNodeOffsetErrorsTest) { - personNodeTable->getNodeStatisticsAndDeletedIDs()->deleteNode( - personNodeTable->getTableID(), 3 /* person w/ offset/ID 3 */); - try { - personNodeTable->getNodeStatisticsAndDeletedIDs()->deleteNode(personNodeTable->getTableID(), - 3 /* person w/ offset/ID 3 again, which should error */); - FAIL(); - } catch (RuntimeException& e) { - } catch (Exception& e) { FAIL(); } -} - TEST_F(NodeInsertionDeletionTests, DeleteAddMixedTest) { - for (offset_t nodeOffset = 1000; nodeOffset < 9000; ++nodeOffset) { - personNodeTable->getNodeStatisticsAndDeletedIDs()->deleteNode( - personNodeTable->getTableID(), nodeOffset); + for (offset_t nodeOffset = 10; nodeOffset < 90; ++nodeOffset) { + deleteNode(nodeOffset); } - for (int i = 0; i < 8000; ++i) { - auto nodeOffset = addNode(); - ASSERT_TRUE(nodeOffset >= 1000 && nodeOffset < 9000); + for (offset_t i = 10; i < 90; ++i) { + addNode(i); } - - // Add two additional node offsets + // Add additional node offsets for (int i = 0; i < 10; ++i) { - auto nodeOffset = addNode(); - ASSERT_EQ(nodeOffset, 10000 + i); + addNode(10000 + i); } std::string query = "MATCH (a:person) RETURN count(*)"; @@ -100,27 +60,25 @@ TEST_F(NodeInsertionDeletionTests, DeleteAddMixedTest) { ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 10010); ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 10010); - for (offset_t nodeOffset = 0; nodeOffset < 10010; ++nodeOffset) { - personNodeTable->getNodeStatisticsAndDeletedIDs()->deleteNode( - personNodeTable->getTableID(), nodeOffset); + for (offset_t nodeOffset = 0; nodeOffset < 10; ++nodeOffset) { + deleteNode(nodeOffset); } - ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 0); + ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 10000); ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 10010); conn->commit(); conn->beginWriteTransaction(); - ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 0); - ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 0); + ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 10000); + ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 10000); - for (int i = 0; i < 5000; ++i) { - auto nodeOffset = addNode(); - ASSERT_TRUE(nodeOffset >= 0 && nodeOffset < 10010); + for (int i = 0; i < 5; ++i) { + addNode(i); } - ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 5000); - ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 0); + ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 10005); + ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 10000); conn->commit(); conn->beginWriteTransaction(); - ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 5000); - ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 5000); + ASSERT_EQ(conn->query(query)->getNext()->getValue(0)->getValue(), 10005); + ASSERT_EQ(readConn->query(query)->getNext()->getValue(0)->getValue(), 10005); } diff --git a/test/test_files/copy/copy_long_string.test b/test/test_files/copy/copy_long_string.test index f00e5b5d84..b3306dd25a 100644 --- a/test/test_files/copy/copy_long_string.test +++ b/test/test_files/copy/copy_long_string.test @@ -1,5 +1,6 @@ -GROUP CopyLongStringTest -DATASET CSV copy-fault-tests/long-string +-SKIP -- diff --git a/test/test_files/exceptions/copy/duplicated.test b/test/test_files/exceptions/copy/duplicated.test index 6f431bf539..b568909d0d 100644 --- a/test/test_files/exceptions/copy/duplicated.test +++ b/test/test_files/exceptions/copy/duplicated.test @@ -3,7 +3,12 @@ -- --CASE DuplicateIDsError +-CASE DuplicateIntIDsError -STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/duplicate-ids/vPerson.csv" ---- error Copy exception: Found duplicated primary key value 10, which violates the uniqueness constraint of the primary key column. + +-CASE DuplicateStringIDsError +-STATEMENT COPY org FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/duplicate-ids/vOrg.csv" +---- error +Copy exception: Found duplicated primary key value 10, which violates the uniqueness constraint of the primary key column. diff --git a/test/test_files/tinysnb/update_node/create.test b/test/test_files/tinysnb/update_node/create.test index 66b98970d5..a0b68db95b 100644 --- a/test/test_files/tinysnb/update_node/create.test +++ b/test/test_files/tinysnb/update_node/create.test @@ -22,7 +22,6 @@ 9|1980-10-26|10 years 5 months 13:00:00.000024 -CASE InsertNodeWithStringTest --SKIP -STATEMENT CREATE (:person {ID:32, fName:'A'}), (:person {ID:33, fName:'BCD'}), (:person {ID:34, fName:'this is a long name'}) ---- ok -STATEMENT MATCH (a:person) WHERE a.ID > 8 RETURN a.ID, a.fName diff --git a/test/test_files/tinysnb/update_node/set.test b/test/test_files/tinysnb/update_node/set.test index 609d59b46e..44df3dee94 100644 --- a/test/test_files/tinysnb/update_node/set.test +++ b/test/test_files/tinysnb/update_node/set.test @@ -68,7 +68,6 @@ False 22 -CASE SetNodeLongStringPropTest --SKIP -STATEMENT MATCH (a:person) WHERE a.ID=0 SET a.fName='abcdefghijklmnopqrstuvwxyz' ---- ok -STATEMENT MATCH (a:person) WHERE a.ID=0 RETURN a.fName @@ -76,7 +75,6 @@ False abcdefghijklmnopqrstuvwxyz -CASE SetVeryLongListErrorsTest --SKIP -DEFINE STRING_EXCEEDS_OVERFLOW ARANGE 0 5990 -BEGIN_WRITE_TRANSACTION -STATEMENT MATCH (a:person) WHERE a.ID=0 SET a.fName="${STRING_EXCEEDS_OVERFLOW}" @@ -235,3 +233,25 @@ Runtime exception: Maximum length of strings is 4096. Input string's length is 2 ---- 2 0|0 1|10 + +-CASE SetNonNullValueWithWriteTransaction +-BEGIN_WRITE_TRANSACTION +-STATEMENT MATCH (a:person) WHERE a.ID=0 RETURN a.age +---- 1 +35 +-STATEMENT MATCH (a:person) WHERE a.ID=0 SET a.age=70 +---- ok +-STATEMENT MATCH (a:person) WHERE a.ID=0 RETURN a.age +---- 1 +70 + +-CASE SetNullValueWithWriteTransaction +-BEGIN_WRITE_TRANSACTION +-STATEMENT MATCH (a:person) WHERE a.ID=0 RETURN a.age +---- 1 +35 +-STATEMENT MATCH (a:person) WHERE a.ID=0 SET a.age=NULL +---- ok +-STATEMENT MATCH (a:person) WHERE a.ID=0 RETURN a.age +---- 1 + diff --git a/test/test_files/tinysnb/update_node/set_read.test b/test/test_files/tinysnb/update_node/set_read.test index e0d9cacbfd..158ab99138 100644 --- a/test/test_files/tinysnb/update_node/set_read.test +++ b/test/test_files/tinysnb/update_node/set_read.test @@ -44,7 +44,6 @@ 6|X -CASE SetReadTest3 --SKIP -STATEMENT MATCH (a:person)-[e:knows]->(b:person) WHERE a.ID=5 RETURN a.fName, b.fName, e.date ---- 3 Dan|Alice|2021-06-30 diff --git a/test/transaction/transaction_manager_test.cpp b/test/transaction/transaction_manager_test.cpp index a8dd2488f7..22189b652c 100644 --- a/test/transaction/transaction_manager_test.cpp +++ b/test/transaction/transaction_manager_test.cpp @@ -14,23 +14,15 @@ class TransactionManagerTest : public EmptyDBTest { void SetUp() override { EmptyDBTest::SetUp(); FileUtils::createDir(databasePath); - LoggerUtils::createLogger(LoggerConstants::LoggerEnum::BUFFER_MANAGER); - LoggerUtils::createLogger(LoggerConstants::LoggerEnum::WAL); - LoggerUtils::createLogger(LoggerConstants::LoggerEnum::TRANSACTION_MANAGER); - LoggerUtils::createLogger(LoggerConstants::LoggerEnum::STORAGE); - bufferManager = std::make_unique( - BufferPoolConstants::DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING); + createDBAndConn(); + bufferManager = getBufferManager(*database); + std::make_unique(BufferPoolConstants::DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING); wal = std::make_unique(databasePath, *bufferManager); - transactionManager = std::make_unique(*wal); + transactionManager = std::make_unique( + *wal, getStorageManager(*database), getMemoryManager(*database)); } - void TearDown() override { - EmptyDBTest::TearDown(); - LoggerUtils::dropLogger(LoggerConstants::LoggerEnum::BUFFER_MANAGER); - LoggerUtils::dropLogger(LoggerConstants::LoggerEnum::WAL); - LoggerUtils::dropLogger(LoggerConstants::LoggerEnum::TRANSACTION_MANAGER); - LoggerUtils::dropLogger(LoggerConstants::LoggerEnum::STORAGE); - } + void TearDown() override { EmptyDBTest::TearDown(); } public: void runTwoCommitRollback(TransactionType type, bool firstIsCommit, bool secondIsCommit) { @@ -53,7 +45,7 @@ class TransactionManagerTest : public EmptyDBTest { std::unique_ptr transactionManager; private: - std::unique_ptr bufferManager; + BufferManager* bufferManager; std::unique_ptr wal; }; diff --git a/test/transaction/transaction_test.cpp b/test/transaction/transaction_test.cpp index 53fc697348..8bb434b9e3 100644 --- a/test/transaction/transaction_test.cpp +++ b/test/transaction/transaction_test.cpp @@ -1,13 +1,16 @@ #include "common/constants.h" #include "graph_test/graph_test.h" #include "storage/storage_manager.h" +#include "storage/store/node_column.h" #include "transaction/transaction_manager.h" using namespace kuzu::common; using namespace kuzu::storage; -using namespace kuzu::transaction; using namespace kuzu::testing; +namespace kuzu { +namespace transaction { + class TransactionTests : public DBTest { public: void SetUp() override { @@ -100,7 +103,8 @@ class TransactionTests : public DBTest { std::make_shared(LogicalTypeID::INT64, getMemoryManager(*database)); propertyVectorToWriteDataTo->state = dataChunk->state; if (isNull) { - propertyVectorToWriteDataTo->setNull(dataChunk->state->currIdx, true /* is null */); + propertyVectorToWriteDataTo->setNull(dataChunk->state->currIdx, true /* is null + */); } else { propertyVectorToWriteDataTo->setNull( dataChunk->state->currIdx, false /* is not null */); @@ -119,7 +123,8 @@ class TransactionTests : public DBTest { std::make_shared(LogicalTypeID::DOUBLE, getMemoryManager(*database)); propertyVectorToWriteDataTo->state = dataChunk->state; if (isNull) { - propertyVectorToWriteDataTo->setNull(dataChunk->state->currIdx, true /* is null */); + propertyVectorToWriteDataTo->setNull(dataChunk->state->currIdx, true /* is null + */); } else { propertyVectorToWriteDataTo->setNull( dataChunk->state->currIdx, false /* is not null */); @@ -199,20 +204,6 @@ class TransactionTests : public DBTest { NodeColumn* personEyeSightColumn; }; -TEST_F(TransactionTests, SingleTransactionReadWriteToStructuredNodePropertyNonNullTest) { - readAndAssertAgePropertyNode(0 /* node offset */, writeTrx.get(), 35, false /* is not null */); - writeToAgePropertyNode(0 /* node offset */, 70, false /* is not null */); - readAndAssertAgePropertyNode(0 /* node offset */, writeTrx.get(), 70, false /* is not null */); -} - -TEST_F(TransactionTests, SingleTransactionReadWriteToStructuredNodePropertyNullTest) { - readAndAssertAgePropertyNode(0 /* node offset */, writeTrx.get(), 35, false /* is not null */); - writeToAgePropertyNode( - 0 /* node offset */, 12345 /* this argument is ignored */, true /* is null */); - readAndAssertAgePropertyNode(0 /* node offset */, writeTrx.get(), - 888 /* this argument is ignored */, true /* is null */); -} - TEST_F(TransactionTests, Concurrent1Write1ReadTransactionInTheMiddleOfTransaction) { assertReadBehaviorForBeforeRollbackAndCommitForConcurrent1Write1ReadTransactionTest(); } @@ -282,3 +273,6 @@ TEST_F(TransactionTests, ExecuteWriteQueryInReadOnlyTrx) { ASSERT_EQ( result->getErrorMessage(), "Can't execute a write query inside a read-only transaction."); } + +} // namespace transaction +} // namespace kuzu