diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 8d05fa282f6..ad0152e17aa 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -123,7 +123,7 @@ template<> uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, +uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); } // namespace processor diff --git a/src/include/storage/local_table.h b/src/include/storage/local_table.h index be61da4fbe4..c18247cf937 100644 --- a/src/include/storage/local_table.h +++ b/src/include/storage/local_table.h @@ -34,13 +34,13 @@ class LocalVector { class StringLocalVector : public LocalVector { public: explicit StringLocalVector(MemoryManager* mm) - : LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{ + : LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, totalStringLength{ 0} {}; void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector, common::sel_t offsetInUpdateVector) final; - uint64_t ovfStringLength; + uint64_t totalStringLength; }; class StructLocalVector : public LocalVector { diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index c697cb29478..81bd5d379fa 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -1,17 +1,12 @@ #pragma once -#include "common/copier_config/copier_config.h" -#include "common/type_utils.h" +#include + +#include "common/constants.h" #include "common/types/types.h" #include "common/vector/value_vector.h" #include "compression.h" #include "storage/buffer_manager/bm_file_handle.h" -#include "storage/wal/wal.h" -#include "transaction/transaction.h" - -namespace arrow { -class Array; -} namespace kuzu { namespace storage { @@ -19,35 +14,16 @@ namespace storage { class NullColumnChunk; class CompressionAlg; -struct BaseColumnChunkMetadata { +struct ColumnChunkMetadata { common::page_idx_t pageIdx; common::page_idx_t numPages; - - BaseColumnChunkMetadata() : BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0} {} - BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages) - : pageIdx(pageIdx), numPages(numPages) {} - virtual ~BaseColumnChunkMetadata() = default; -}; - -struct ColumnChunkMetadata : public BaseColumnChunkMetadata { uint64_t numValues; CompressionMetadata compMeta; - ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {} + ColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0}, numValues{UINT64_MAX} {} ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages, - uint64_t numNodesInChunk, CompressionMetadata compMeta) - : BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk), - compMeta(compMeta) {} -}; - -struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata { - common::offset_t lastOffsetInPage; - - OverflowColumnChunkMetadata() - : BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {} - OverflowColumnChunkMetadata( - common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage) - : BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {} + uint64_t numNodesInChunk, const CompressionMetadata& compMeta) + : pageIdx(pageIdx), numPages(numPages), numValues(numNodesInChunk), compMeta(compMeta) {} }; // Base data segment covers all fixed-sized data types. @@ -60,8 +36,9 @@ class ColumnChunk { // ColumnChunks must be initialized after construction, so this constructor should only be used // through the ColumnChunkFactory - explicit ColumnChunk( - common::LogicalType dataType, bool enableCompression = true, bool hasNullChunk = true); + explicit ColumnChunk(common::LogicalType dataType, bool enableCompression = true, + bool hasNullChunk = true, + common::offset_t capacity = common::StorageConstants::NODE_GROUP_SIZE); virtual ~ColumnChunk() = default; @@ -88,9 +65,6 @@ class ColumnChunk { ColumnChunkMetadata flushBuffer( BMFileHandle* dataFH, common::page_idx_t startPageIdx, const ColumnChunkMetadata& metadata); - // Returns the size of the data type in bytes - static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType); - static inline common::page_idx_t getNumPagesForBytes(uint64_t numBytes) { return (numBytes + common::BufferPoolConstants::PAGE_4KB_SIZE - 1) / common::BufferPoolConstants::PAGE_4KB_SIZE; diff --git a/src/include/storage/store/compression.h b/src/include/storage/store/compression.h index 30f64f1f51e..4efc2654095 100644 --- a/src/include/storage/store/compression.h +++ b/src/include/storage/store/compression.h @@ -2,7 +2,6 @@ #include #include -#include #include "common/types/types.h" @@ -52,8 +51,8 @@ class CompressionAlg { // Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer // Offsets refer to value offsets, not byte offsets - virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, + virtual void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, const CompressionMetadata& metadata) const = 0; // Reads a value from the buffer at the given position and stores it at the given memory address @@ -102,11 +101,11 @@ class Uncompressed : public CompressionAlg { Uncompressed(const Uncompressed&) = default; - inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, + inline void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, const CompressionMetadata& /*metadata*/) const final { - memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue, - numBytesPerValue); + memcpy(dstBuffer + dstOffset * numBytesPerValue, srcBuffer + srcOffset * numBytesPerValue, + numBytesPerValue * numValues); } inline void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, @@ -185,8 +184,9 @@ class IntegerBitpacking : public CompressionAlg { IntegerBitpacking() = default; IntegerBitpacking(const IntegerBitpacking&) = default; - void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer, - common::offset_t posInDst, const CompressionMetadata& metadata) const final; + void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, + const CompressionMetadata& metadata) const final; // Read a single value from the buffer void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, @@ -241,8 +241,9 @@ class BooleanBitpacking : public CompressionAlg { BooleanBitpacking() = default; BooleanBitpacking(const BooleanBitpacking&) = default; - void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer, - common::offset_t posInDst, const CompressionMetadata& metadata) const final; + void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, + const CompressionMetadata& metadata) const final; void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, common::offset_t posInDst, const CompressionMetadata& metadata) const final; @@ -294,14 +295,28 @@ class ReadCompressedValuesFromPage : public CompressedFunctor { uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata); }; -class WriteCompressedValueToPage : public CompressedFunctor { +class WriteCompressedValuesToPage : public CompressedFunctor { public: - explicit WriteCompressedValueToPage(const common::LogicalType& logicalType) + explicit WriteCompressedValuesToPage(const common::LogicalType& logicalType) : CompressedFunctor(logicalType) {} - WriteCompressedValueToPage(const WriteCompressedValueToPage&) = default; + WriteCompressedValuesToPage(const WriteCompressedValuesToPage&) = default; + + void operator()(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + common::offset_t dataOffset, common::offset_t numValues, + const CompressionMetadata& metadata); +}; + +class WriteCompressedValueToPageFromVector { +public: + explicit WriteCompressedValueToPageFromVector(const common::LogicalType& logicalType) + : writeFunc(logicalType) {} + WriteCompressedValueToPageFromVector(const WriteCompressedValueToPageFromVector&) = default; void operator()(uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata); + +private: + WriteCompressedValuesToPage writeFunc; }; } // namespace storage diff --git a/src/include/storage/store/node_column.h b/src/include/storage/store/node_column.h index 3e25a4ae15e..b31f765e0e4 100644 --- a/src/include/storage/store/node_column.h +++ b/src/include/storage/store/node_column.h @@ -16,6 +16,9 @@ using read_values_to_vector_func_t = std::function; using write_values_from_vector_func_t = std::function; +using write_values_func_t = std::function; using read_values_to_page_func_t = std::functionget(nodeGroupIdx, transaction); } - virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom); + virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk); + virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result); protected: - virtual void scanInternal(transaction::Transaction* transaction, - common::ValueVector* nodeIDVector, common::ValueVector* resultVector); void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor, uint64_t numValuesToScan, common::ValueVector* resultVector, const CompressionMetadata& compMeta, uint64_t startPosInVector = 0); void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor, common::ValueVector* nodeIDVector, common::ValueVector* resultVector, const CompressionMetadata& compMeta); - virtual void lookupInternal(transaction::Transaction* transaction, - common::ValueVector* nodeIDVector, common::ValueVector* resultVector); - virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset, - common::ValueVector* resultVector, uint32_t posInVector); void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx, const std::function& func); - virtual void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom); - - PageElementCursor getPageCursorForOffset( - transaction::TransactionType transactionType, common::offset_t nodeOffset); - // TODO(Guodong): This is mostly duplicated with - // StorageStructure::createWALVersionOfPageIfNecessaryForElement(). Should be cleared later. - WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset); + // Produces a page cursor for the offset relative to the given node group + PageElementCursor getPageCursorForOffsetInGroup(transaction::TransactionType transactionType, + common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx); protected: StorageStructureID storageStructureID; @@ -112,12 +92,86 @@ class NodeColumn { std::unique_ptr nullColumn; read_values_to_vector_func_t readToVectorFunc; write_values_from_vector_func_t writeFromVectorFunc; + write_values_func_t writeFunc; read_values_to_page_func_t readToPageFunc; batch_lookup_func_t batchLookupFunc; RWPropertyStats propertyStatistics; bool enableCompression; }; +// NodeColumn where we assume it the underlying storage always stores NodeGroupSize values +// Data is indexed using a global offset (which is internally used to find the node group via the +// node group size) +class NodeColumn : public BaseNodeColumn { +public: + NodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, + BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, + transaction::Transaction* transaction, RWPropertyStats propertyStatistics, + bool enableCompression, bool requireNullColumn = true) + : BaseNodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, + wal, transaction, propertyStatistics, enableCompression, requireNullColumn} {} + + // Expose for feature store + virtual void batchLookup(transaction::Transaction* transaction, + const common::offset_t* nodeOffsets, size_t size, uint8_t* result); + + virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, + common::ValueVector* resultVector, uint64_t offsetInVector); + inline void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) override { + BaseNodeColumn::scan(nodeGroupIdx, columnChunk); + } + virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + + virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, + uint32_t posInVectorToWriteFrom); + +protected: + virtual void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, + common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); + virtual void writeValue( + const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, const uint8_t* data); + + virtual void scanInternal(transaction::Transaction* transaction, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector); + virtual void lookupInternal(transaction::Transaction* transaction, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector); + virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset, + common::ValueVector* resultVector, uint32_t posInVector); + + // TODO(Guodong): This is mostly duplicated with + // StorageStructure::createWALVersionOfPageIfNecessaryForElement(). Should be cleared later. + WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset); + + // Produces a page cursor for the absolute node offset + PageElementCursor getPageCursorForOffset( + transaction::TransactionType transactionType, common::offset_t nodeOffset); + // Produces a page cursor for the absolute node offset, given a node group + PageElementCursor getPageCursorForOffsetAndGroup(transaction::TransactionType transactionType, + common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx); +}; + +// NodeColumn for data adjacent to a NodeGroup +// Data is indexed using the node group identifier and the offset within the node group +class AuxiliaryNodeColumn : public BaseNodeColumn { +public: + AuxiliaryNodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, + BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, + transaction::Transaction* transaction, RWPropertyStats propertyStatistics, + bool enableCompression, bool requireNullColumn = true) + : BaseNodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, + wal, transaction, propertyStatistics, enableCompression, requireNullColumn} {} + + // Append values to the end of the node group, resizing it if necessary + common::offset_t appendValues( + common::node_group_idx_t nodeGroupIdx, const uint8_t* data, common::offset_t numValues); + +protected: +}; + struct NodeColumnFactory { static std::unique_ptr createNodeColumn(const common::LogicalType& dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, diff --git a/src/include/storage/store/string_column_chunk.h b/src/include/storage/store/string_column_chunk.h index 52700b809da..6c19f0ec6ee 100644 --- a/src/include/storage/store/string_column_chunk.h +++ b/src/include/storage/store/string_column_chunk.h @@ -1,13 +1,14 @@ #pragma once -#include "common/types/blob.h" -#include "storage/storage_structure/in_mem_file.h" #include "storage/store/column_chunk.h" namespace kuzu { namespace storage { class StringColumnChunk : public ColumnChunk { + using string_offset_t = uint64_t; + using string_index_t = uint32_t; + public: explicit StringColumnChunk(common::LogicalType dataType); @@ -20,13 +21,22 @@ class StringColumnChunk : public ColumnChunk { template T getValue(common::offset_t /*pos*/) const { - throw common::NotImplementedException("VarSizedColumnChunk::getValue"); + throw common::NotImplementedException("StringColumnChunk::getValue"); } - common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx); + uint64_t getStringLength(common::offset_t pos) const { + auto index = ColumnChunk::getValue(pos); + if (index + 1 < offsetChunk->getNumValues()) { + assert(offsetChunk->getValue(index + 1) >= + offsetChunk->getValue(index)); + return offsetChunk->getValue(index + 1) - + offsetChunk->getValue(index); + } + return stringDataChunk->getNumValues() - offsetChunk->getValue(index); + } - inline InMemOverflowFile* getOverflowFile() { return overflowFile.get(); } - inline common::offset_t getLastOffsetInPage() const { return overflowCursor.offsetInPage; } + ColumnChunk* getDataChunk() { return stringDataChunk.get(); } + ColumnChunk* getOffsetChunk() { return offsetChunk.get(); } private: void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk, @@ -37,13 +47,17 @@ class StringColumnChunk : public ColumnChunk { void setValueFromString(const char* value, uint64_t length, uint64_t pos); private: - std::unique_ptr overflowFile; - PageByteCursor overflowCursor; + // String data is stored as a UINT8 chunk, using the numValues in the chunk to track the number + // of characters stored. + std::unique_ptr stringDataChunk; + std::unique_ptr offsetChunk; }; // STRING template<> std::string StringColumnChunk::getValue(common::offset_t pos) const; +template<> +std::string_view StringColumnChunk::getValue(common::offset_t pos) const; } // namespace storage } // namespace kuzu diff --git a/src/include/storage/store/string_node_column.h b/src/include/storage/store/string_node_column.h index bf1dd4a08a7..3b7357e1358 100644 --- a/src/include/storage/store/string_node_column.h +++ b/src/include/storage/store/string_node_column.h @@ -1,18 +1,15 @@ #pragma once -#include "storage/stats/table_statistics.h" #include "storage/store/node_column.h" namespace kuzu { namespace storage { -struct StringNodeColumnFunc { - static void writeStringValuesToPage(uint8_t* frame, uint16_t posInFrame, - common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata); -}; - class StringNodeColumn : public NodeColumn { public: + using string_offset_t = uint64_t; + using string_index_t = uint32_t; + StringNodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats propertyStatistics); @@ -24,12 +21,11 @@ class StringNodeColumn : public NodeColumn { void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx) final; - void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom) final; + void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, + common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final; - inline InMemDiskArray* getOverflowMetadataDA() { - return overflowMetadataDA.get(); - } + inline AuxiliaryNodeColumn* getDataColumn() { return dataColumn.get(); } + inline AuxiliaryNodeColumn* getOffsetColumn() { return offsetColumn.get(); } void checkpointInMemory() final; void rollbackInMemory() final; @@ -37,15 +33,34 @@ class StringNodeColumn : public NodeColumn { protected: void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final; + void scanUnfiltered(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, + common::offset_t endOffsetInGroup, common::ValueVector* resultVector, + uint64_t startPosInVector = 0); + void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final; + void scanValueToVector(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, uint64_t startOffset, uint64_t endOffset, + common::ValueVector* resultVector, uint64_t offsetInVector); + void scanOffsets(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + uint64_t* offsets, uint64_t index); + private: void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr, common::ValueVector* resultVector, common::page_idx_t overflowPageIdx); private: - std::unique_ptr> overflowMetadataDA; + // Main column stores indices of values in the dictionary + // The offset column stores the offsets for each index, and the data column stores the data in + // order. Values are never removed from the dictionary during in-place updates, only appended to + // the end. + std::unique_ptr dataColumn; + std::unique_ptr offsetColumn; }; } // namespace storage diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 37e0ee67d6e..8b5f6b8d6bc 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -93,23 +93,24 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, void CopyNode::populatePKIndex( PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) { checkNonNullConstraint(chunk->getNullChunk(), numNodes); - std::string errorPKValueStr; + std::optional errorPKValueStr; pkIndex->lock(); try { switch (chunk->getDataType().getPhysicalType()) { case PhysicalTypeID::INT64: { auto numAppendedNodes = appendToPKIndex(pkIndex, chunk, startOffset, numNodes); if (numAppendedNodes < numNodes) { - errorPKValueStr = - std::to_string(chunk->getValue(startOffset + numAppendedNodes)); + // TODO(bmwinger): This should be tested where there are multiple node groups + errorPKValueStr = std::to_string(chunk->getValue(numAppendedNodes)); } } break; case PhysicalTypeID::STRING: { auto numAppendedNodes = - appendToPKIndex(pkIndex, chunk, startOffset, numNodes); + appendToPKIndex(pkIndex, chunk, startOffset, numNodes); if (numAppendedNodes < numNodes) { + // TODO(bmwinger): This should be tested where there are multiple node groups errorPKValueStr = - chunk->getValue(startOffset + numAppendedNodes).getAsString(); + static_cast(chunk)->getValue(numAppendedNodes); } } break; default: { @@ -122,8 +123,8 @@ void CopyNode::populatePKIndex( throw; } pkIndex->unlock(); - if (!errorPKValueStr.empty()) { - throw CopyException(ExceptionMessage::existedPKException(errorPKValueStr)); + if (errorPKValueStr) { + throw CopyException(ExceptionMessage::existedPKException(*errorPKValueStr)); } } @@ -171,7 +172,7 @@ uint64_t CopyNode::appendToPKIndex( } template<> -uint64_t CopyNode::appendToPKIndex( +uint64_t CopyNode::appendToPKIndex( PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { auto stringColumnChunk = (StringColumnChunk*)chunk; for (auto i = 0u; i < numValues; i++) { diff --git a/src/storage/local_table.cpp b/src/storage/local_table.cpp index 6fbeec76ae2..789db9322c7 100644 --- a/src/storage/local_table.cpp +++ b/src/storage/local_table.cpp @@ -3,7 +3,6 @@ #include "common/exception/message.h" #include "common/exception/runtime.h" #include "storage/store/node_table.h" -#include "storage/store/string_column_chunk.h" #include "storage/store/string_node_column.h" #include "storage/store/struct_node_column.h" #include "storage/store/var_list_column_chunk.h" @@ -46,9 +45,8 @@ void StringLocalVector::update( auto kuStr = updateVector->getValue(offsetInUpdateVector); if (kuStr.len > BufferPoolConstants::PAGE_4KB_SIZE) { throw RuntimeException(ExceptionMessage::overLargeStringValueException(kuStr.len)); - } else if (!ku_string_t::isShortString(kuStr.len)) { - ovfStringLength += kuStr.len; } + totalStringLength += kuStr.len; LocalVector::update(offsetInLocalVector, updateVector, offsetInUpdateVector); } @@ -227,16 +225,35 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { assert(chunks.contains(nodeGroupIdx)); auto localChunk = chunks.at(nodeGroupIdx).get(); auto stringColumn = reinterpret_cast(column); - auto overflowMetadata = - stringColumn->getOverflowMetadataDA()->get(nodeGroupIdx, TransactionType::WRITE); - auto ovfStringLengthInChunk = 0u; + auto dataColumnMetadata = + stringColumn->getDataColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto offsetColumnMetadata = + stringColumn->getOffsetColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto totalStringLengthInChunk = 0u; + uint64_t newStrings = 0; for (auto& [_, localVector] : localChunk->vectors) { auto stringLocalVector = reinterpret_cast(localVector.get()); - ovfStringLengthInChunk += stringLocalVector->ovfStringLength; - } - if (overflowMetadata.lastOffsetInPage + ovfStringLengthInChunk <= - BufferPoolConstants::PAGE_4KB_SIZE) { - // Write the updated overflow strings to the overflow string buffer. + totalStringLengthInChunk += stringLocalVector->totalStringLength; + newStrings += localVector->vector->state->selVector->selectedSize; + } + auto offsetCapacity = + offsetColumnMetadata.compMeta.numValues( + BufferPoolConstants::PAGE_4KB_SIZE, stringColumn->getOffsetColumn()->getDataType()) * + offsetColumnMetadata.numPages; + // Write in-place as long as there is sufficient space in the data chunk + if (dataColumnMetadata.numValues + totalStringLengthInChunk <= + dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE && + offsetColumnMetadata.numValues + newStrings < offsetCapacity + // Indices are limited to 32 bits but in theory could be larger than that since the offset + // column can grow beyond the node group size. + // + // E.g. one big string is written first, followed by NODE_GROUP_SIZE-1 small strings, + // which are all updated in-place many times (which may fit if the first string is large + // enough that 2^n minus the first string's size is large enough to fit the other strings, + // for some n. + // 32 bits should give plenty of space for updates. + && offsetColumnMetadata.numValues + newStrings < + std::numeric_limits::max()) { commitLocalChunkInPlace(nodeGroupIdx, localChunk); } else { commitLocalChunkOutOfPlace(nodeGroupIdx, localChunk); diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index 77742df2ffe..420f4069d08 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -100,10 +100,12 @@ std::unique_ptr TablesStatistics::createMetadataDAHInfo( createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal)); } break; case PhysicalTypeID::STRING: { - auto childMetadataDAHInfo = std::make_unique(); - childMetadataDAHInfo->dataDAHPageIdx = - InMemDiskArray::addDAHPageToFile(metadataFH, bm, wal); - metadataDAHInfo->childrenInfos.push_back(std::move(childMetadataDAHInfo)); + // Data column + metadataDAHInfo->childrenInfos.push_back( + createMetadataDAHInfo(LogicalType(LogicalTypeID::UINT8), metadataFH, bm, wal)); + // Offset column + metadataDAHInfo->childrenInfos.push_back( + createMetadataDAHInfo(LogicalType(LogicalTypeID::UINT64), metadataFH, bm, wal)); } break; default: { // DO NOTHING. diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 5fe2a9839db..30154d2bd4d 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -534,13 +534,11 @@ template class BaseDiskArray>; template class BaseDiskArray>; template class BaseDiskArray; template class BaseDiskArray; -template class BaseDiskArray; template class BaseInMemDiskArray; template class BaseInMemDiskArray>; template class BaseInMemDiskArray>; template class BaseInMemDiskArray; template class BaseInMemDiskArray; -template class BaseInMemDiskArray; template class InMemDiskArrayBuilder; template class InMemDiskArrayBuilder>; template class InMemDiskArrayBuilder>; @@ -551,7 +549,6 @@ template class InMemDiskArray>; template class InMemDiskArray>; template class InMemDiskArray; template class InMemDiskArray; -template class InMemDiskArray; } // namespace storage } // namespace kuzu diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index 3b020f4a201..86d23e36a22 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -1,7 +1,7 @@ #include "storage/store/column_chunk.h" #include "common/types/value/nested.h" -#include "storage/storage_structure/storage_structure_utils.h" +#include "common/types/value/value.h" #include "storage/store/compression.h" #include "storage/store/string_column_chunk.h" #include "storage/store/struct_column_chunk.h" @@ -9,7 +9,6 @@ #include "storage/store/var_list_column_chunk.h" using namespace kuzu::common; -using namespace kuzu::transaction; namespace kuzu { namespace storage { @@ -62,10 +61,21 @@ class CompressedFlushBuffer { } else { valuesRemaining -= numValuesPerPage; } + if (compressedSize < BufferPoolConstants::PAGE_4KB_SIZE) { + memset(compressedBuffer.get() + compressedSize, 0, + BufferPoolConstants::PAGE_4KB_SIZE - compressedSize); + } FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), compressedSize, (startPageIdx + numPages) * BufferPoolConstants::PAGE_4KB_SIZE); numPages++; } while (valuesRemaining > 0); + // Make sure that the file is the right length + if (numPages < metadata.numPages) { + memset(compressedBuffer.get(), 0, BufferPoolConstants::PAGE_4KB_SIZE); + FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), + BufferPoolConstants::PAGE_4KB_SIZE, + (startPageIdx + metadata.numPages) * BufferPoolConstants::PAGE_4KB_SIZE); + } return ColumnChunkMetadata( startPageIdx, metadata.numPages, metadata.numValues, metadata.compMeta); } @@ -127,13 +137,14 @@ static std::shared_ptr getCompression( } } -ColumnChunk::ColumnChunk(LogicalType dataType, bool enableCompression, bool hasNullChunk) +ColumnChunk::ColumnChunk( + LogicalType dataType, bool enableCompression, bool hasNullChunk, offset_t capacity) : dataType{std::move(dataType)}, numBytesPerValue{getDataTypeSizeInChunk(this->dataType)}, numValues{0} { if (hasNullChunk) { nullChunk = std::make_unique(); } - initializeBuffer(StorageConstants::NODE_GROUP_SIZE); + initializeBuffer(capacity); initializeFunction(enableCompression); } @@ -344,30 +355,6 @@ ColumnChunkMetadata ColumnChunk::flushBuffer( return flushBufferFunction(buffer.get(), bufferSize, dataFH, startPageIdx, metadata); } -uint32_t ColumnChunk::getDataTypeSizeInChunk(LogicalType& dataType) { - switch (dataType.getLogicalTypeID()) { - case LogicalTypeID::SERIAL: - case LogicalTypeID::STRUCT: { - return 0; - } - case LogicalTypeID::STRING: { - return sizeof(ku_string_t); - } - case LogicalTypeID::VAR_LIST: - case LogicalTypeID::INTERNAL_ID: { - return sizeof(offset_t); - } - // Used by NodeColumn to represent the de-compressed size of booleans in-memory - // Does not reflect the size of booleans on-disk in BoolColumnChunk - case LogicalTypeID::BOOL: { - return 1; - } - default: { - return StorageUtils::getDataTypeSize(dataType); - } - } -} - uint64_t ColumnChunk::getBufferSize() const { switch (dataType.getLogicalTypeID()) { case LogicalTypeID::BOOL: { diff --git a/src/storage/store/compression.cpp b/src/storage/store/compression.cpp index 11b5f7f7906..a78124256f0 100644 --- a/src/storage/store/compression.cpp +++ b/src/storage/store/compression.cpp @@ -1,10 +1,8 @@ #include "storage/store/compression.h" -#include - +#include #include -#include "arrow/array.h" #include "common/exception/not_implemented.h" #include "common/exception/storage.h" #include "common/null_mask.h" @@ -26,19 +24,18 @@ namespace storage { uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) { using namespace common; switch (dataType.getLogicalTypeID()) { + case LogicalTypeID::SERIAL: case LogicalTypeID::STRUCT: { return 0; } - case LogicalTypeID::STRING: { - return sizeof(ku_string_t); + case LogicalTypeID::STRING: + case LogicalTypeID::BLOB: { + return sizeof(uint32_t); } case LogicalTypeID::VAR_LIST: case LogicalTypeID::INTERNAL_ID: { return sizeof(offset_t); } - case LogicalTypeID::SERIAL: { - return sizeof(int64_t); - } default: { auto size = StorageUtils::getDataTypeSize(dataType); assert(size <= BufferPoolConstants::PAGE_4KB_SIZE); @@ -269,25 +266,34 @@ void fastpack(const T* in, uint8_t* out, uint8_t bitWidth) { } template -void IntegerBitpacking::setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, const CompressionMetadata& metadata) const { +void IntegerBitpacking::setValuesFromUncompressed(const uint8_t* srcBuffer, offset_t posInSrc, + uint8_t* dstBuffer, offset_t posInDst, offset_t numValues, + const CompressionMetadata& metadata) const { auto header = BitpackHeader::readHeader(metadata.data); // This is a fairly naive implementation which uses fastunpack/fastpack // to modify the data by decompressing/compressing a single chunk of values. // - // TODO(bmwinger): modify the data in-place + // TODO(bmwinger): modify the data in-place when numValues is small (frequently called with + // numValues=1) // // Data can be considered to be stored in aligned chunks of 32 values // with a size of 32 * bitWidth bits, // or bitWidth 32-bit values (we cast the buffer to a uint32_t* later). auto chunkStart = (uint8_t*)getChunkStart(dstBuffer, posInDst, header.bitWidth); - auto posInChunk = posInDst % CHUNK_SIZE; - auto value = ((T*)srcBuffer)[posInSrc]; - assert(canUpdateInPlace(value, header)); - + auto startPosInChunk = posInDst % CHUNK_SIZE; U chunk[CHUNK_SIZE]; fastunpack(chunkStart, chunk, header.bitWidth); - chunk[posInChunk] = (U)(value - (T)header.offset); + for (offset_t i = 0; i < numValues; i++) { + auto value = ((T*)srcBuffer)[posInSrc]; + assert(canUpdateInPlace(value, header)); + chunk[startPosInChunk + i] = (U)(value - (T)header.offset); + if (startPosInChunk + i + 1 >= CHUNK_SIZE && i + 1 < numValues) { + fastpack(chunk, chunkStart, header.bitWidth); + chunkStart = (uint8_t*)getChunkStart(dstBuffer, posInDst + i + 1, header.bitWidth); + fastunpack(chunkStart, chunk, header.bitWidth); + startPosInChunk = 0; + } + } fastpack(chunk, chunkStart, header.bitWidth); } @@ -425,10 +431,13 @@ template class IntegerBitpacking; template class IntegerBitpacking; template class IntegerBitpacking; -void BooleanBitpacking::setValueFromUncompressed(uint8_t* srcBuffer, offset_t posInSrc, - uint8_t* dstBuffer, offset_t posInDst, const CompressionMetadata& /*metadata*/) const { - auto val = ((bool*)srcBuffer)[posInSrc]; - common::NullMask::setNull((uint64_t*)dstBuffer, posInDst, val); +void BooleanBitpacking::setValuesFromUncompressed(const uint8_t* srcBuffer, offset_t srcOffset, + uint8_t* dstBuffer, offset_t dstOffset, offset_t numValues, + const CompressionMetadata& /*metadata*/) const { + for (auto i = 0; i < numValues; i++) { + common::NullMask::setNull( + (uint64_t*)dstBuffer, dstOffset + i, ((bool*)srcBuffer)[srcOffset + i]); + } } void BooleanBitpacking::getValue(const uint8_t* buffer, offset_t posInBuffer, uint8_t* dst, @@ -585,46 +594,47 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor& } } -void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame, - common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) { +void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame, + const uint8_t* data, offset_t dataOffset, offset_t numValues, + const CompressionMetadata& metadata) { switch (metadata.compression) { case CompressionType::UNCOMPRESSED: - return uncompressed.setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return uncompressed.setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); case CompressionType::INTEGER_BITPACKING: { switch (physicalType) { case PhysicalTypeID::INT64: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT32: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT16: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT8: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::UINT64: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT32: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT16: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT8: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } default: { throw NotImplementedException("INTEGER_BITPACKING is not implemented for type " + @@ -633,10 +643,15 @@ void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame, } } case CompressionType::BOOLEAN_BITPACKING: - return booleanBitpacking.setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return booleanBitpacking.setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } } +void WriteCompressedValueToPageFromVector::operator()(uint8_t* frame, uint16_t posInFrame, + common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) { + writeFunc(frame, posInFrame, vector->getData(), posInVector, 1, metadata); +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/node_column.cpp b/src/storage/store/node_column.cpp index 069a40db808..3613b8ee2f6 100644 --- a/src/storage/store/node_column.cpp +++ b/src/storage/store/node_column.cpp @@ -30,7 +30,7 @@ struct InternalIDNodeColumnFunc { } } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { auto relID = vector->getValue(posInVector); memcpy(frame + posInFrame * sizeof(offset_t), &relID.offset, sizeof(offset_t)); @@ -48,12 +48,19 @@ struct NullNodeColumnFunc { (uint64_t*)frame, pageCursor.elemPosInPage, posInVector, numValuesToRead); } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. // Otherwise, it could read off the end of the page. - NullMask::setNull( - (uint64_t*)frame, posInFrame, NullMask::isNull(vector->getNullMaskData(), posInVector)); + NullMask::setNull((uint64_t*)frame, posInFrame, vector->isNull(posInVector)); + } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + offset_t dataOffset, offset_t numValues, const CompressionMetadata& metadata) { + // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. + // Otherwise, it could read off the end of the page. + NullMask::copyNullMask( + (const uint64_t*)data, dataOffset, (uint64_t*)frame, posInFrame, numValues); } }; @@ -72,13 +79,21 @@ struct BoolNodeColumnFunc { } } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. // Otherwise, it could read/write off the end of the page. - NullMask::copyNullMask(vector->getValue(posInVector) ? &NullMask::ALL_NULL_ENTRY : - &NullMask::NO_NULL_ENTRY, - posInVector, (uint64_t*)frame, posInFrame, 1); + NullMask::setNull((uint64_t*)frame, posInFrame, vector->getValue(posInVector)); + } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + offset_t dataOffset, offset_t numValues, const CompressionMetadata& metadata) { + // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. + // Otherwise, it could read/write off the end of the page. + for (offset_t i = 0; i < numValues; i++) { + NullMask::setNull( + (uint64_t*)frame, posInFrame, NullMask::isNull((uint64_t*)data, dataOffset)); + } } static void copyValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result, @@ -109,7 +124,7 @@ static read_values_to_vector_func_t getReadValuesToVectorFunc(const LogicalType& } } -static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& logicalType) { +static read_values_to_page_func_t getReadValuesToPageFunc(const LogicalType& logicalType) { switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::BOOL: return BoolNodeColumnFunc::copyValuesFromPage; @@ -118,15 +133,27 @@ static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& lo } } -static write_values_from_vector_func_t getWriteValuesFromVectorFunc( - const LogicalType& logicalType) { +static write_values_from_vector_func_t getWriteValueFromVectorFunc(const LogicalType& logicalType) { switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::INTERNAL_ID: - return InternalIDNodeColumnFunc::writeValueToPage; + throw std::runtime_error("Not implemented"); + // return InternalIDNodeColumnFunc::writeValuesToPage; case LogicalTypeID::BOOL: - return BoolNodeColumnFunc::writeValueToPage; + return BoolNodeColumnFunc::writeValueToPageFromVector; default: - return WriteCompressedValueToPage(logicalType); + return WriteCompressedValueToPageFromVector(logicalType); + } +} + +static write_values_func_t getWriteValuesFunc(const LogicalType& logicalType) { + switch (logicalType.getLogicalTypeID()) { + case LogicalTypeID::INTERNAL_ID: + throw std::runtime_error("Not implemented"); + // return InternalIDNodeColumnFunc::writeValuesToPage; + case LogicalTypeID::BOOL: + return BoolNodeColumnFunc::writeValuesToPage; + default: + return WriteCompressedValuesToPage(logicalType); } } @@ -155,7 +182,8 @@ class NullNodeColumn : public NodeColumn { metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression, false /*requireNullColumn*/} { readToVectorFunc = NullNodeColumnFunc::readValuesFromPageToVector; - writeFromVectorFunc = NullNodeColumnFunc::writeValueToPage; + writeFromVectorFunc = NullNodeColumnFunc::writeValueToPageFromVector; + writeFunc = NullNodeColumnFunc::writeValuesToPage; } void scan( @@ -225,7 +253,9 @@ class NullNodeColumn : public NodeColumn { void write(offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final { - writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + writeValue(chunkMeta, nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); if (vectorToWriteFrom->isNull(posInVectorToWriteFrom)) { propertyStatistics.setHasNull(DUMMY_WRITE_TRANSACTION); } @@ -267,7 +297,7 @@ class SerialNodeColumn : public NodeColumn { } }; -NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, +BaseNodeColumn::BaseNodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats propertyStatistics, bool enableCompression, bool requireNullColumn) @@ -279,9 +309,10 @@ NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeader transaction); numBytesPerFixedSizedValue = getDataTypeSizeInChunk(this->dataType); readToVectorFunc = getReadValuesToVectorFunc(this->dataType); - readToPageFunc = getWriteValuesToPageFunc(this->dataType); + readToPageFunc = getReadValuesToPageFunc(this->dataType); batchLookupFunc = getBatchLookupFromPageFunc(this->dataType); - writeFromVectorFunc = getWriteValuesFromVectorFunc(this->dataType); + writeFromVectorFunc = getWriteValueFromVectorFunc(this->dataType); + writeFunc = getWriteValuesFunc(this->dataType); assert(numBytesPerFixedSizedValue <= BufferPoolConstants::PAGE_4KB_SIZE); if (requireNullColumn) { nullColumn = std::make_unique(metaDAHeaderInfo.nullDAHPageIdx, dataFH, @@ -308,23 +339,7 @@ void NodeColumn::scan( scanInternal(transaction, nodeIDVector, resultVector); } -void NodeColumn::scan(transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx, - offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector, - uint64_t offsetInVector) { - if (nullColumn) { - nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, - resultVector, offsetInVector); - } - auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); - auto pageCursor = PageUtils::getPageElementCursorForPos(startOffsetInGroup, - chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); - pageCursor.pageIdx += chunkMeta.pageIdx; - auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; - scanUnfiltered( - transaction, pageCursor, numValuesToScan, resultVector, chunkMeta.compMeta, offsetInVector); -} - -void NodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { +void BaseNodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { if (nullColumn) { nullColumn->scan(nodeGroupIdx, columnChunk->getNullChunk()); } @@ -365,7 +380,7 @@ void NodeColumn::scanInternal( } } -void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCursor, +void BaseNodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCursor, uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta, uint64_t startPosInVector) { uint64_t numValuesScanned = 0; @@ -383,7 +398,7 @@ void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pag } } -void NodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor, +void BaseNodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor, ValueVector* nodeIDVector, ValueVector* resultVector, const CompressionMetadata& compMeta) { auto numValuesToScan = nodeIDVector->state->getOriginalSize(); auto numValuesScanned = 0u; @@ -440,7 +455,7 @@ void NodeColumn::lookupValue(transaction::Transaction* transaction, offset_t nod }); } -void NodeColumn::readFromPage( +void BaseNodeColumn::readFromPage( Transaction* transaction, page_idx_t pageIdx, const std::function& func) { auto [fileHandleToPin, pageIdxToPin] = StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( @@ -448,7 +463,7 @@ void NodeColumn::readFromPage( bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, func); } -void NodeColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { +void BaseNodeColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { // Main column chunk. auto preScanMetadata = columnChunk->getMetadataToFlush(); auto startPageIdx = dataFH->addNewPages(preScanMetadata.numPages); @@ -456,7 +471,9 @@ void NodeColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { metadataDA->resize(nodeGroupIdx + 1); metadataDA->update(nodeGroupIdx, metadata); // Null column chunk. - nullColumn->append(columnChunk->getNullChunk(), nodeGroupIdx); + if (nullColumn) { + nullColumn->append(columnChunk->getNullChunk(), nodeGroupIdx); + } } void NodeColumn::write( @@ -466,14 +483,19 @@ void NodeColumn::write( if (isNull) { return; } - writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + writeValue(chunkMeta, nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + auto offsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + if (nodeOffset >= chunkMeta.numValues) { + chunkMeta.numValues = nodeOffset + 1; + metadataDA->update(nodeGroupIdx, chunkMeta); + } } -void NodeColumn::writeValue( - offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { +void NodeColumn::writeValue(const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, + ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto walPageInfo = createWALVersionOfPageForValue(nodeOffset); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); try { writeFromVectorFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom, posInVectorToWriteFrom, chunkMeta.compMeta); @@ -486,6 +508,20 @@ void NodeColumn::writeValue( dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); } +void NodeColumn::writeValue( + const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, const uint8_t* data) { + auto walPageInfo = createWALVersionOfPageForValue(nodeOffset); + try { + writeFunc(walPageInfo.frame, walPageInfo.posInPage, data, 0, 1, chunkMeta.compMeta); + } catch (Exception& e) { + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + throw; + } + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); +} + WALPageIdxPosInPageAndFrame NodeColumn::createWALVersionOfPageForValue(offset_t nodeOffset) { auto originalPageCursor = getPageCursorForOffset(TransactionType::WRITE, nodeOffset); bool insertingNewPage = false; @@ -500,27 +536,27 @@ WALPageIdxPosInPageAndFrame NodeColumn::createWALVersionOfPageForValue(offset_t return {walPageIdxAndFrame, originalPageCursor.elemPosInPage}; } -void NodeColumn::setNull(offset_t nodeOffset) { +void BaseNodeColumn::setNull(offset_t nodeOffset) { if (nullColumn) { nullColumn->setNull(nodeOffset); } } -void NodeColumn::checkpointInMemory() { +void BaseNodeColumn::checkpointInMemory() { metadataDA->checkpointInMemoryIfNecessary(); if (nullColumn) { nullColumn->checkpointInMemory(); } } -void NodeColumn::rollbackInMemory() { +void BaseNodeColumn::rollbackInMemory() { metadataDA->rollbackInMemoryIfNecessary(); if (nullColumn) { nullColumn->rollbackInMemory(); } } -void NodeColumn::populateWithDefaultVal(const Property& property, NodeColumn* nodeColumn, +void BaseNodeColumn::populateWithDefaultVal(const Property& property, NodeColumn* nodeColumn, ValueVector* defaultValueVector, uint64_t numNodeGroups) const { auto columnChunk = ColumnChunkFactory::createColumnChunk(*property.getDataType(), enableCompression); @@ -533,6 +569,11 @@ void NodeColumn::populateWithDefaultVal(const Property& property, NodeColumn* no PageElementCursor NodeColumn::getPageCursorForOffset( TransactionType transactionType, offset_t nodeOffset) { auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + return getPageCursorForOffsetAndGroup(transactionType, nodeOffset, nodeGroupIdx); +} + +PageElementCursor NodeColumn::getPageCursorForOffsetAndGroup( + TransactionType transactionType, offset_t nodeOffset, node_group_idx_t nodeGroupIdx) { auto offsetInNodeGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); auto chunkMeta = metadataDA->get(nodeGroupIdx, transactionType); auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, @@ -591,5 +632,95 @@ std::unique_ptr NodeColumnFactory::createNodeColumn(const LogicalTyp } } +PageElementCursor BaseNodeColumn::getPageCursorForOffsetInGroup( + transaction::TransactionType transactionType, common::offset_t nodeOffset, + common::node_group_idx_t nodeGroupIdx) { + auto chunkMeta = metadataDA->get(nodeGroupIdx, transactionType); + auto pageCursor = PageUtils::getPageElementCursorForPos( + nodeOffset, chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); + pageCursor.pageIdx += chunkMeta.pageIdx; + return pageCursor; +} + +void NodeColumn::scan(transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx, + offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector, + uint64_t offsetInVector) { + if (nullColumn) { + nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, + resultVector, offsetInVector); + } + auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); + auto pageCursor = + getPageCursorForOffsetInGroup(transaction->getType(), startOffsetInGroup, nodeGroupIdx); + auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; + scanUnfiltered( + transaction, pageCursor, numValuesToScan, resultVector, chunkMeta.compMeta, offsetInVector); +} + +void BaseNodeColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, + offset_t startOffsetInGroup, offset_t endOffsetInGroup, uint8_t* result) { + // Use existing nodeGroupIdx + auto cursor = + getPageCursorForOffsetInGroup(transaction->getType(), startOffsetInGroup, nodeGroupIdx); + auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); + auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; + uint64_t numValuesScanned = 0; + auto numValuesPerPage = + chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType); + while (numValuesScanned < numValuesToScan) { + uint64_t numValuesToScanInPage = std::min( + (uint64_t)numValuesPerPage - cursor.elemPosInPage, numValuesToScan - numValuesScanned); + readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void { + readToPageFunc( + frame, cursor, result, numValuesScanned, numValuesToScanInPage, chunkMeta.compMeta); + }); + numValuesScanned += numValuesToScanInPage; + cursor.nextPage(); + } +} + +offset_t AuxiliaryNodeColumn::appendValues( + node_group_idx_t nodeGroupIdx, const uint8_t* data, offset_t numValues) { + auto metadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + auto startOffset = metadata.numValues; + + auto cursor = getPageCursorForOffsetInGroup(TransactionType::WRITE, startOffset, nodeGroupIdx); + offset_t valuesWritten = 0; + while (valuesWritten < numValues) { + bool insertingNewPage = false; + if (cursor.pageIdx >= dataFH->getNumPages()) { + assert(cursor.pageIdx == dataFH->getNumPages()); + StorageStructureUtils::insertNewPage(*dataFH, storageStructureID, *bufferManager, *wal); + insertingNewPage = true; + } + + auto numValuesPerPage = + metadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType); + uint64_t numValuesToWriteInPage = + std::min((uint64_t)numValuesPerPage - cursor.elemPosInPage, numValues - valuesWritten); + auto walPageInfo = StorageStructureUtils::createWALVersionIfNecessaryAndPinPage( + cursor.pageIdx, insertingNewPage, *dataFH, storageStructureID, *bufferManager, *wal); + + try { + writeFunc(walPageInfo.frame, cursor.elemPosInPage, data, 0, numValuesToWriteInPage, + metadata.compMeta); + } catch (Exception& e) { + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + throw; + } + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + valuesWritten += numValuesToWriteInPage; + if (valuesWritten < numValues) { + cursor.nextPage(); + metadata.numPages++; + } + } + metadata.numValues += numValues; + metadataDA->update(nodeGroupIdx, metadata); + return startOffset; +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/string_column_chunk.cpp b/src/storage/store/string_column_chunk.cpp index dff98ec5d90..67f451e7de1 100644 --- a/src/storage/store/string_column_chunk.cpp +++ b/src/storage/store/string_column_chunk.cpp @@ -1,8 +1,7 @@ #include "storage/store/string_column_chunk.h" -#include "common/exception/copy.h" -#include "common/exception/message.h" #include "common/exception/not_implemented.h" +#include "common/types/value/value.h" #include "storage/store/table_copy_utils.h" using namespace kuzu::common; @@ -10,30 +9,43 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -StringColumnChunk::StringColumnChunk(LogicalType dataType) : ColumnChunk{std::move(dataType)} { - overflowFile = std::make_unique(); - overflowCursor.pageIdx = 0; - overflowCursor.offsetInPage = 0; +StringColumnChunk::StringColumnChunk(LogicalType dataType) + : ColumnChunk{std::move(dataType), false /*enableCompression*/} { + // Bitpacking might save 1 bit per value with regular ascii compared to UTF-8 + // Detecting when we need to re-compress the child chunks is not currently supported. + bool enableCompression = false; + stringDataChunk = std::make_unique( + LogicalType(LogicalTypeID::UINT8), enableCompression, false /*hasNullChunk*/); + // The offset chunk is able to grow beyond the node group size. + // We rely on appending to the dictionary when updating, however if the chunk is full, + // there will be no space for in-place updates. + // The data chunk doubles in size on use, but out of place updates will never need the offset + // chunk to be greater than the node group size since they remove unused entries. + // So the chunk is initialized with a size equal to 3/4 the node group size, making sure there + // is always extra space for updates. + offsetChunk = std::make_unique(LogicalType(LogicalTypeID::UINT64), + enableCompression, false /*hasNullChunk*/, StorageConstants::NODE_GROUP_SIZE * 0.75); } void StringColumnChunk::resetToEmpty() { ColumnChunk::resetToEmpty(); - overflowFile = std::make_unique(); - overflowCursor.resetValue(); + stringDataChunk->resetToEmpty(); + offsetChunk->resetToEmpty(); } void StringColumnChunk::append(ValueVector* vector, offset_t startPosInChunk) { assert(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); - ColumnChunk::copyVectorToBuffer(vector, startPosInChunk); - auto stringsToSetOverflow = (ku_string_t*)(buffer.get() + startPosInChunk * numBytesPerValue); for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { - auto& stringToSet = stringsToSetOverflow[i]; - if (!ku_string_t::isShortString(stringToSet.len)) { - overflowFile->copyStringOverflow( - overflowCursor, reinterpret_cast(stringToSet.overflowPtr), &stringToSet); + // index is stored in main chunk, data is stored in the data chunk + auto pos = vector->state->selVector->selectedPositions[i]; + nullChunk->setNull(startPosInChunk + i, vector->isNull(pos)); + if (vector->isNull(pos)) { + numValues++; + continue; } + auto kuString = vector->getValue(pos); + setValueFromString(kuString.getAsString().c_str(), kuString.len, startPosInChunk + i); } - numValues += vector->state->selVector->selectedSize; } void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk, @@ -48,10 +60,9 @@ void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk otherChunk, startPosInOtherChunk, startPosInChunk, numValuesToAppend); } break; default: { - throw NotImplementedException("VarSizedColumnChunk::append"); + throw NotImplementedException("StringColumnChunk::append"); } } - numValues += numValuesToAppend; } void StringColumnChunk::update(ValueVector* vector, vector_idx_t vectorIdx) { @@ -66,40 +77,30 @@ void StringColumnChunk::update(ValueVector* vector, vector_idx_t vectorIdx) { } } } -page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) { - for (auto i = 0u; i < overflowFile->getNumPages(); i++) { - FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data, - BufferPoolConstants::PAGE_4KB_SIZE, startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE); - startPageIdx++; - } - return overflowFile->getNumPages(); -} void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other, offset_t startPosInOtherChunk, offset_t startPosInChunk, uint32_t numValuesToAppend) { - auto otherKuVals = reinterpret_cast(other->buffer.get()); - auto kuVals = reinterpret_cast(buffer.get()); + auto otherOffsets = reinterpret_cast(other->offsetChunk->getData()); + auto otherIndices = reinterpret_cast(other->getData()); + auto indices = reinterpret_cast(buffer.get()); for (auto i = 0u; i < numValuesToAppend; i++) { auto posInChunk = i + startPosInChunk; auto posInOtherChunk = i + startPosInOtherChunk; - kuVals[posInChunk] = otherKuVals[posInOtherChunk]; - if (other->nullChunk->isNull(posInOtherChunk) || - otherKuVals[posInOtherChunk].len <= ku_string_t::SHORT_STR_LENGTH) { + if (nullChunk->isNull(posInChunk)) { + indices[posInChunk] = 0; continue; } - PageByteCursor cursorToCopyFrom; - TypeUtils::decodeOverflowPtr(otherKuVals[posInOtherChunk].overflowPtr, - cursorToCopyFrom.pageIdx, cursorToCopyFrom.offsetInPage); - overflowFile->copyStringOverflow(overflowCursor, - other->overflowFile->getPage(cursorToCopyFrom.pageIdx)->data + - cursorToCopyFrom.offsetInPage, - &kuVals[posInChunk]); + auto stringInOtherChunk = other->getValue(posInOtherChunk); + setValueFromString(stringInOtherChunk.data(), stringInOtherChunk.size(), posInChunk); } } void StringColumnChunk::write(const Value& val, uint64_t posToWrite) { assert(val.getDataType()->getPhysicalType() == PhysicalTypeID::STRING); nullChunk->setNull(posToWrite, val.isNull()); + if (posToWrite >= numValues) { + numValues = posToWrite + 1; + } if (val.isNull()) { return; } @@ -108,16 +109,36 @@ void StringColumnChunk::write(const Value& val, uint64_t posToWrite) { } void StringColumnChunk::setValueFromString(const char* value, uint64_t length, uint64_t pos) { - TableCopyUtils::validateStrLen(length); - auto val = overflowFile->copyString(value, length, overflowCursor); - setValue(val, pos); + auto space = stringDataChunk->getCapacity() - stringDataChunk->getNumValues(); + if (length > space) { + stringDataChunk->resize(std::bit_ceil(stringDataChunk->getCapacity() + length)); + } + if (pos >= numValues) { + numValues = pos + 1; + } + auto startOffset = stringDataChunk->getNumValues(); + memcpy(stringDataChunk->getData() + startOffset, value, length); + stringDataChunk->setNumValues(startOffset + length); + auto index = offsetChunk->getNumValues(); + + if (index >= offsetChunk->getCapacity()) { + offsetChunk->resize(offsetChunk->getCapacity() * 2); + } + offsetChunk->setValue(startOffset, index); + offsetChunk->setNumValues(index + 1); + ColumnChunk::setValue(index, pos); } // STRING template<> +std::string_view StringColumnChunk::getValue(offset_t pos) const { + auto index = ColumnChunk::getValue(pos); + auto offset = offsetChunk->getValue(index); + return std::string_view((const char*)stringDataChunk->getData() + offset, getStringLength(pos)); +} +template<> std::string StringColumnChunk::getValue(offset_t pos) const { - auto kuStr = ((ku_string_t*)buffer.get())[pos]; - return overflowFile->readString(&kuStr); + return std::string(getValue(pos)); } } // namespace storage diff --git a/src/storage/store/string_node_column.cpp b/src/storage/store/string_node_column.cpp index ff99e81e766..88b92837cc6 100644 --- a/src/storage/store/string_node_column.cpp +++ b/src/storage/store/string_node_column.cpp @@ -9,27 +9,44 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -void StringNodeColumnFunc::writeStringValuesToPage(uint8_t* frame, uint16_t posInFrame, - ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { - auto kuStrInFrame = (ku_string_t*)(frame + (posInFrame * sizeof(ku_string_t))); - auto kuStrInVector = vector->getValue(posInVector); - memcpy(kuStrInFrame->prefix, kuStrInVector.prefix, - std::min((uint64_t)kuStrInVector.len, ku_string_t::SHORT_STR_LENGTH)); - kuStrInFrame->len = kuStrInVector.len; - kuStrInFrame->overflowPtr = kuStrInVector.overflowPtr; -} - StringNodeColumn::StringNodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats stats) : NodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, stats, false /* enableCompression */, true /* requireNullColumn */} { - if (this->dataType.getLogicalTypeID() == LogicalTypeID::STRING) { - writeFromVectorFunc = StringNodeColumnFunc::writeStringValuesToPage; + // TODO(bmwinger): detecting if string child columns must be re-compressed when updating is not + // yet supported + auto enableCompression = false; + dataColumn = std::make_unique(LogicalType(LogicalTypeID::UINT8), + *metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction, + stats, enableCompression, false /*requireNullColumn*/); + offsetColumn = std::make_unique(LogicalType(LogicalTypeID::UINT64), + *metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction, + stats, enableCompression, false /*requireNullColumn*/); +} + +void StringNodeColumn::scanOffsets(Transaction* transaction, node_group_idx_t nodeGroupIdx, + string_offset_t* offsets, uint64_t index) { + // We either need to read the next value, or store the maximum string offset at the end. + // Otherwise we won't know what the length of the last string is. + auto numValues = offsetColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; + if (index < numValues - 1) { + offsetColumn->scan(transaction, nodeGroupIdx, index, index + 2, (uint8_t*)offsets); + } else { + offsetColumn->scan(transaction, nodeGroupIdx, index, index + 1, (uint8_t*)offsets); + offsets[1] = dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; } - overflowMetadataDA = std::make_unique>(*metadataFH, - StorageStructureID::newMetadataID(), metaDAHeaderInfo.childrenInfos[0]->dataDAHPageIdx, - bufferManager, wal, transaction); +} + +void StringNodeColumn::scanValueToVector(Transaction* transaction, node_group_idx_t nodeGroupIdx, + string_offset_t startOffset, string_offset_t endOffset, ValueVector* resultVector, + uint64_t offsetInVector) { + assert(endOffset >= startOffset); + // TODO: Add string to vector first and read directly instead of using a temporary buffer + std::unique_ptr stringRead = std::make_unique(endOffset - startOffset); + dataColumn->scan(transaction, nodeGroupIdx, startOffset, endOffset, (uint8_t*)stringRead.get()); + StringVector::addString( + resultVector, offsetInVector, stringRead.get(), endOffset - startOffset); } void StringNodeColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -37,74 +54,51 @@ void StringNodeColumn::scan(Transaction* transaction, node_group_idx_t nodeGroup uint64_t offsetInVector) { nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - NodeColumn::scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, + + scanUnfiltered(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - auto numValuesToRead = endOffsetInGroup - startOffsetInGroup; - auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; - for (auto i = 0u; i < numValuesToRead; i++) { - auto pos = offsetInVector + i; - if (resultVector->isNull(pos)) { - continue; - } - readStringValueFromOvf( - transaction, resultVector->getValue(pos), resultVector, overflowPageIdx); - } } void StringNodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { NodeColumn::scan(nodeGroupIdx, columnChunk); auto stringColumnChunk = reinterpret_cast(columnChunk); - auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto inMemOverflowFile = stringColumnChunk->getOverflowFile(); - inMemOverflowFile->addNewPages(overflowMetadata.numPages); - for (auto i = 0u; i < overflowMetadata.numPages; i++) { - auto pageIdx = overflowMetadata.pageIdx + i; - FileUtils::readFromFile(dataFH->getFileInfo(), inMemOverflowFile->getPage(i)->data, - BufferPoolConstants::PAGE_4KB_SIZE, pageIdx * BufferPoolConstants::PAGE_4KB_SIZE); - } + dataColumn->scan(nodeGroupIdx, stringColumnChunk->getDataChunk()); + offsetColumn->scan(nodeGroupIdx, stringColumnChunk->getOffsetChunk()); } void StringNodeColumn::append(ColumnChunk* columnChunk, node_group_idx_t nodeGroupIdx) { NodeColumn::append(columnChunk, nodeGroupIdx); auto stringColumnChunk = reinterpret_cast(columnChunk); - auto startPageIdx = dataFH->addNewPages(stringColumnChunk->getOverflowFile()->getNumPages()); - auto numPagesForOverflow = stringColumnChunk->flushOverflowBuffer(dataFH, startPageIdx); - overflowMetadataDA->resize(nodeGroupIdx + 1); - overflowMetadataDA->update( - nodeGroupIdx, OverflowColumnChunkMetadata{startPageIdx, numPagesForOverflow, - stringColumnChunk->getLastOffsetInPage()}); + dataColumn->append(stringColumnChunk->getDataChunk(), nodeGroupIdx); + offsetColumn->append(stringColumnChunk->getOffsetChunk(), nodeGroupIdx); } -void StringNodeColumn::writeValue( - offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { +void StringNodeColumn::writeValue(const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, + ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto& kuStr = vectorToWriteFrom->getValue(posInVectorToWriteFrom); - if (!ku_string_t::isShortString(kuStr.len)) { - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto overflowPageIdxInChunk = overflowMetadata.numPages - 1; - auto walPageIdxAndFrame = StorageStructureUtils::createWALVersionIfNecessaryAndPinPage( - overflowMetadata.pageIdx + overflowPageIdxInChunk, false /* insertingNewPage */, - *dataFH, storageStructureID, *bufferManager, *wal); - memcpy(walPageIdxAndFrame.frame + overflowMetadata.lastOffsetInPage, - reinterpret_cast(kuStr.overflowPtr), kuStr.len); - bufferManager->unpin(*wal->fileHandle, walPageIdxAndFrame.pageIdxInWAL); - dataFH->releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx); - TypeUtils::encodeOverflowPtr( - kuStr.overflowPtr, overflowPageIdxInChunk, overflowMetadata.lastOffsetInPage); - overflowMetadata.lastOffsetInPage += kuStr.len; - overflowMetadataDA->update(nodeGroupIdx, overflowMetadata); - } - NodeColumn::writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + // Write string data to end of dataColumn + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto startOffset = + dataColumn->appendValues(nodeGroupIdx, (const uint8_t*)kuStr.getData(), kuStr.len); + + // Write offset + string_index_t index = + offsetColumn->appendValues(nodeGroupIdx, (const uint8_t*)&startOffset, 1); + + // Write index to main column + NodeColumn::writeValue(chunkMeta, nodeOffset, (uint8_t*)&index); } void StringNodeColumn::checkpointInMemory() { - NodeColumn::checkpointInMemory(); - overflowMetadataDA->checkpointInMemoryIfNecessary(); + BaseNodeColumn::checkpointInMemory(); + dataColumn->checkpointInMemory(); + offsetColumn->checkpointInMemory(); } void StringNodeColumn::rollbackInMemory() { - NodeColumn::rollbackInMemory(); - overflowMetadataDA->rollbackInMemoryIfNecessary(); + BaseNodeColumn::rollbackInMemory(); + dataColumn->rollbackInMemory(); + offsetColumn->rollbackInMemory(); } void StringNodeColumn::scanInternal( @@ -113,15 +107,54 @@ void StringNodeColumn::scanInternal( auto startNodeOffset = nodeIDVector->readNodeOffset(0); assert(startNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); - NodeColumn::scanInternal(transaction, nodeIDVector, resultVector); - auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; + auto startOffsetInGroup = + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + if (nodeIDVector->state->selVector->isUnfiltered()) { + scanUnfiltered(transaction, nodeGroupIdx, startOffsetInGroup, + startOffsetInGroup + nodeIDVector->state->selVector->selectedSize, resultVector); + } else { + scanFiltered(transaction, nodeGroupIdx, startOffsetInGroup, nodeIDVector, resultVector); + } +} + +void StringNodeColumn::scanUnfiltered(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, offset_t startOffsetInGroup, offset_t endOffsetInGroup, + common::ValueVector* resultVector, uint64_t startPosInVector) { + auto numValuesToRead = endOffsetInGroup - startOffsetInGroup; + // scan offsets into temporary buffer + // TODO: Would it be more efficient to reuse a small stack allocated buffer multiple times + // instead of a large heap-allocated buffer? + auto indices = std::make_unique(numValuesToRead); + BaseNodeColumn::scan( + transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, (uint8_t*)indices.get()); + for (auto i = 0u; i < numValuesToRead; i++) { + if (resultVector->isNull(startPosInVector + i)) { + continue; + } + // scan string data from the dataColumn into a temporary string + string_offset_t offsets[2]; + scanOffsets(transaction, nodeGroupIdx, offsets, indices[i]); + scanValueToVector( + transaction, nodeGroupIdx, offsets[0], offsets[1], resultVector, startPosInVector + i); + } +} + +void StringNodeColumn::scanFiltered(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector) { for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; if (resultVector->isNull(pos)) { + // Ignore positions which were scanned as null continue; } - readStringValueFromOvf( - transaction, resultVector->getValue(pos), resultVector, overflowPageIdx); + auto offsetInGroup = startOffsetInGroup + pos; + string_index_t index; + BaseNodeColumn::scan( + transaction, nodeGroupIdx, offsetInGroup, offsetInGroup + 1, (uint8_t*)&index); + string_offset_t offsets[2]; + scanOffsets(transaction, nodeGroupIdx, offsets, index); + scanValueToVector(transaction, nodeGroupIdx, offsets[0], offsets[1], resultVector, pos); } } @@ -129,36 +162,23 @@ void StringNodeColumn::lookupInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { assert(dataType.getPhysicalType() == PhysicalTypeID::STRING); auto startNodeOffset = nodeIDVector->readNodeOffset(0); - auto overflowPageIdx = - overflowMetadataDA - ->get(StorageUtils::getNodeGroupIdx(startNodeOffset), transaction->getType()) - .pageIdx; - NodeColumn::lookupInternal(transaction, nodeIDVector, resultVector); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - if (!resultVector->isNull(pos)) { - readStringValueFromOvf(transaction, resultVector->getValue(pos), - resultVector, overflowPageIdx); + if (resultVector->isNull(pos)) { + // Ignore positions which were scanned as null + continue; } + string_offset_t offsets[2]; + auto offsetInGroup = + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx) + pos; + string_index_t index; + BaseNodeColumn::scan( + transaction, nodeGroupIdx, offsetInGroup, offsetInGroup + 1, (uint8_t*)&index); + scanOffsets(transaction, nodeGroupIdx, offsets, index); + scanValueToVector(transaction, nodeGroupIdx, offsets[0], offsets[1], resultVector, pos); } } -void StringNodeColumn::readStringValueFromOvf(Transaction* transaction, ku_string_t& kuStr, - ValueVector* resultVector, page_idx_t overflowPageIdx) { - if (ku_string_t::isShortString(kuStr.len)) { - return; - } - PageByteCursor cursor; - TypeUtils::decodeOverflowPtr(kuStr.overflowPtr, cursor.pageIdx, cursor.offsetInPage); - cursor.pageIdx += overflowPageIdx; - auto [fileHandleToPin, pageIdxToPin] = - StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( - *dataFH, cursor.pageIdx, *wal, transaction->getType()); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) { - StringVector::addString( - resultVector, kuStr, (const char*)(frame + cursor.offsetInPage), kuStr.len); - }); -} - } // namespace storage } // namespace kuzu diff --git a/src/storage/store/struct_column_chunk.cpp b/src/storage/store/struct_column_chunk.cpp index 24a29934b4a..79b9e0e43e9 100644 --- a/src/storage/store/struct_column_chunk.cpp +++ b/src/storage/store/struct_column_chunk.cpp @@ -1,10 +1,8 @@ #include "storage/store/struct_column_chunk.h" -#include "common/exception/not_implemented.h" #include "common/types/value/nested.h" -#include "storage/store/string_column_chunk.h" +#include "common/types/value/value.h" #include "storage/store/table_copy_utils.h" -#include "storage/store/var_list_column_chunk.h" using namespace kuzu::common; diff --git a/src/storage/store/var_list_node_column.cpp b/src/storage/store/var_list_node_column.cpp index ed79ec9badf..436f35cb15d 100644 --- a/src/storage/store/var_list_node_column.cpp +++ b/src/storage/store/var_list_node_column.cpp @@ -146,17 +146,16 @@ void VarListNodeColumn::rollbackInMemory() { offset_t VarListNodeColumn::readOffset( Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) { - auto offsetVector = std::make_unique(LogicalTypeID::INT64); - offsetVector->state = DataChunkState::getSingleValueDataChunkState(); auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); pageCursor.pageIdx += chunkMeta.pageIdx; + offset_t value; readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { - readToVectorFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */, + readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */, 1 /* numValuesToRead */, chunkMeta.compMeta); }); - return offsetVector->getValue(0); + return value; } ListOffsetInfoInStorage VarListNodeColumn::getListOffsetInfoInStorage(Transaction* transaction, diff --git a/test/storage/compression_test.cpp b/test/storage/compression_test.cpp index 0ff4c69cb65..5fdb659a865 100644 --- a/test/storage/compression_test.cpp +++ b/test/storage/compression_test.cpp @@ -24,7 +24,7 @@ void test_compression(CompressionAlg& alg, std::vector src) { EXPECT_EQ(src, decompressed); // works with all bit widths (but not all offsets) T value = 0; - alg.setValueFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, metadata); + alg.setValuesFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, 1, metadata); alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata); src[1] = value; EXPECT_EQ(decompressed, src);