From 4bda12c3dc93bc93c37945fb65dccfd3399148f7 Mon Sep 17 00:00:00 2001 From: Benjamin Winger Date: Thu, 19 Oct 2023 09:54:06 -0400 Subject: [PATCH] Rewrite string storage to use index, offset and data columns This sets up the storage layout for compressing strings, and removes the limit on string size in storage. --- .../processor/operator/persistent/copy_node.h | 2 +- src/include/storage/local_table.h | 4 +- src/include/storage/store/column.h | 116 +++++++--- src/include/storage/store/column_chunk.h | 31 +-- src/include/storage/store/compression.h | 42 ++-- src/include/storage/store/string_column.h | 32 ++- .../storage/store/string_column_chunk.h | 29 ++- .../operator/persistent/copy_node.cpp | 19 +- src/storage/local_table.cpp | 38 +++- .../stats/table_statistics_collection.cpp | 10 +- src/storage/storage_structure/disk_array.cpp | 3 - src/storage/store/column.cpp | 212 ++++++++++++++---- src/storage/store/column_chunk.cpp | 36 +-- src/storage/store/compression.cpp | 96 ++++---- src/storage/store/string_column.cpp | 201 +++++++++-------- src/storage/store/string_column_chunk.cpp | 104 +++++---- src/storage/store/var_list_column.cpp | 7 +- test/storage/compression_test.cpp | 2 +- 18 files changed, 629 insertions(+), 355 deletions(-) diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index cd000d04d3f..1773af9c885 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -119,7 +119,7 @@ template<> uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, +uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); } // namespace processor diff --git a/src/include/storage/local_table.h b/src/include/storage/local_table.h index 8186dbb663f..08ff0f25e4d 100644 --- a/src/include/storage/local_table.h +++ b/src/include/storage/local_table.h @@ -35,13 +35,13 @@ class LocalVector { class StringLocalVector : public LocalVector { public: explicit StringLocalVector(MemoryManager* mm) - : LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{ + : LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, totalStringLength{ 0} {}; void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector, common::sel_t offsetInUpdateVector) final; - uint64_t ovfStringLength; + uint64_t totalStringLength; }; class StructLocalVector : public LocalVector { diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index 7097f3aea20..8c66864f756 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -16,6 +16,9 @@ using read_values_to_vector_func_t = std::function; using write_values_from_vector_func_t = std::function; +using write_values_func_t = std::function; using read_values_to_page_func_t = std::functionget(nodeGroupIdx, transaction); } - virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom); + virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk); + virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result); protected: - virtual void scanInternal(transaction::Transaction* transaction, - common::ValueVector* nodeIDVector, common::ValueVector* resultVector); void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor, uint64_t numValuesToScan, common::ValueVector* resultVector, const CompressionMetadata& compMeta, uint64_t startPosInVector = 0); void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor, common::ValueVector* nodeIDVector, common::ValueVector* resultVector, - const CompressionMetadata& compMeta); - virtual void lookupInternal(transaction::Transaction* transaction, - common::ValueVector* nodeIDVector, common::ValueVector* resultVector); - virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset, - common::ValueVector* resultVector, uint32_t posInVector); + const CompressionMetadata& chunkMeta); void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx, const std::function& func); - virtual void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom); - - PageElementCursor getPageCursorForOffset( - transaction::TransactionType transactionType, common::offset_t nodeOffset); - WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset); + // Produces a page cursor for the offset relative to the given node group + PageElementCursor getPageCursorForOffsetInGroup(transaction::TransactionType transactionType, + common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx); private: // check if val is in range [start, end) @@ -119,12 +101,66 @@ class Column { std::unique_ptr nullColumn; read_values_to_vector_func_t readToVectorFunc; write_values_from_vector_func_t writeFromVectorFunc; + write_values_func_t writeFunc; read_values_to_page_func_t readToPageFunc; batch_lookup_func_t batchLookupFunc; RWPropertyStats propertyStatistics; bool enableCompression; }; +// Column where we assume it the underlying storage always stores NodeGroupSize values +// Data is indexed using a global offset (which is internally used to find the node group via the +// node group size) +class Column : public BaseColumn { +public: + Column(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, + BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, + transaction::Transaction* transaction, RWPropertyStats propertyStatistics, + bool enableCompression, bool requireNullColumn = true) + : BaseColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, + transaction, propertyStatistics, enableCompression, requireNullColumn} {} + + // Expose for feature store + virtual void batchLookup(transaction::Transaction* transaction, + const common::offset_t* nodeOffsets, size_t size, uint8_t* result); + + virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, + common::ValueVector* resultVector, uint64_t offsetInVector); + inline void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) override { + BaseColumn::scan(nodeGroupIdx, columnChunk); + } + virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + + virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, + uint32_t posInVectorToWriteFrom); + +protected: + virtual void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, + common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); + virtual void writeValue( + const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, const uint8_t* data); + + virtual void scanInternal(transaction::Transaction* transaction, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector); + virtual void lookupInternal(transaction::Transaction* transaction, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector); + virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset, + common::ValueVector* resultVector, uint32_t posInVector); + + WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset); + + // Produces a page cursor for the absolute node offset + PageElementCursor getPageCursorForOffset( + transaction::TransactionType transactionType, common::offset_t nodeOffset); + // Produces a page cursor for the absolute node offset, given a node group + PageElementCursor getPageCursorForOffsetAndGroup(transaction::TransactionType transactionType, + common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx); +}; + class InternalIDColumn : public Column { public: InternalIDColumn(const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, @@ -161,6 +197,24 @@ class InternalIDColumn : public Column { common::table_id_t commonTableID; }; +// Column for data adjacent to a NodeGroup +// Data is indexed using the node group identifier and the offset within the node group +class AuxiliaryColumn : public BaseColumn { +public: + AuxiliaryColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, + BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, + transaction::Transaction* transaction, RWPropertyStats propertyStatistics, + bool enableCompression, bool requireNullColumn = true) + : BaseColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, + transaction, propertyStatistics, enableCompression, requireNullColumn} {} + + // Append values to the end of the node group, resizing it if necessary + common::offset_t appendValues( + common::node_group_idx_t nodeGroupIdx, const uint8_t* data, common::offset_t numValues); + +protected: +}; + struct ColumnFactory { static std::unique_ptr createColumn(const common::LogicalType& dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index 0b17db95589..b4d83c326d5 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -2,6 +2,7 @@ #include +#include "common/constants.h" #include "common/types/types.h" #include "common/vector/value_vector.h" #include "compression.h" @@ -13,36 +14,16 @@ namespace storage { class NullColumnChunk; class CompressionAlg; -struct BaseColumnChunkMetadata { +struct ColumnChunkMetadata { common::page_idx_t pageIdx; common::page_idx_t numPages; - - BaseColumnChunkMetadata() - : BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0 /* numPages */} {} - BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages) - : pageIdx(pageIdx), numPages(numPages) {} - virtual ~BaseColumnChunkMetadata() = default; -}; - -struct ColumnChunkMetadata : public BaseColumnChunkMetadata { uint64_t numValues; CompressionMetadata compMeta; - ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {} + ColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0}, numValues{UINT64_MAX} {} ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages, - uint64_t numNodesInChunk, CompressionMetadata compMeta) - : BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk), - compMeta(compMeta) {} -}; - -struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata { - common::offset_t lastOffsetInPage; - - OverflowColumnChunkMetadata() - : BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {} - OverflowColumnChunkMetadata( - common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage) - : BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {} + uint64_t numNodesInChunk, const CompressionMetadata& compMeta) + : pageIdx(pageIdx), numPages(numPages), numValues(numNodesInChunk), compMeta(compMeta) {} }; // Base data segment covers all fixed-sized data types. @@ -118,8 +99,6 @@ class ColumnChunk { private: uint64_t getBufferSize() const; - // Returns the size of the data type in bytes - static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType); protected: common::LogicalType dataType; diff --git a/src/include/storage/store/compression.h b/src/include/storage/store/compression.h index 7f332ee9ed4..1c70cbcffe3 100644 --- a/src/include/storage/store/compression.h +++ b/src/include/storage/store/compression.h @@ -52,8 +52,8 @@ class CompressionAlg { // Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer // Offsets refer to value offsets, not byte offsets - virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, + virtual void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, const CompressionMetadata& metadata) const = 0; // Reads a value from the buffer at the given position and stores it at the given memory address @@ -102,11 +102,11 @@ class Uncompressed : public CompressionAlg { Uncompressed(const Uncompressed&) = default; - inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, + inline void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, const CompressionMetadata& /*metadata*/) const final { - memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue, - numBytesPerValue); + memcpy(dstBuffer + dstOffset * numBytesPerValue, srcBuffer + srcOffset * numBytesPerValue, + numBytesPerValue * numValues); } inline void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, @@ -185,8 +185,9 @@ class IntegerBitpacking : public CompressionAlg { IntegerBitpacking() = default; IntegerBitpacking(const IntegerBitpacking&) = default; - void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer, - common::offset_t posInDst, const CompressionMetadata& metadata) const final; + void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, + const CompressionMetadata& metadata) const final; // Read a single value from the buffer void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, @@ -241,8 +242,9 @@ class BooleanBitpacking : public CompressionAlg { BooleanBitpacking() = default; BooleanBitpacking(const BooleanBitpacking&) = default; - void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer, - common::offset_t posInDst, const CompressionMetadata& metadata) const final; + void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, + const CompressionMetadata& metadata) const final; void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, common::offset_t posInDst, const CompressionMetadata& metadata) const final; @@ -294,14 +296,28 @@ class ReadCompressedValuesFromPage : public CompressedFunctor { uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata); }; -class WriteCompressedValueToPage : public CompressedFunctor { +class WriteCompressedValuesToPage : public CompressedFunctor { public: - explicit WriteCompressedValueToPage(const common::LogicalType& logicalType) + explicit WriteCompressedValuesToPage(const common::LogicalType& logicalType) : CompressedFunctor(logicalType) {} - WriteCompressedValueToPage(const WriteCompressedValueToPage&) = default; + WriteCompressedValuesToPage(const WriteCompressedValuesToPage&) = default; + + void operator()(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + common::offset_t dataOffset, common::offset_t numValues, + const CompressionMetadata& metadata); +}; + +class WriteCompressedValueToPageFromVector { +public: + explicit WriteCompressedValueToPageFromVector(const common::LogicalType& logicalType) + : writeFunc(logicalType) {} + WriteCompressedValueToPageFromVector(const WriteCompressedValueToPageFromVector&) = default; void operator()(uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata); + +private: + WriteCompressedValuesToPage writeFunc; }; } // namespace storage diff --git a/src/include/storage/store/string_column.h b/src/include/storage/store/string_column.h index cf676893b48..a67827c770c 100644 --- a/src/include/storage/store/string_column.h +++ b/src/include/storage/store/string_column.h @@ -7,6 +7,8 @@ namespace storage { class StringColumn : public Column { public: + using string_offset_t = uint64_t; + using string_index_t = uint32_t; StringColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats propertyStatistics); @@ -18,12 +20,11 @@ class StringColumn : public Column { void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx) final; - void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom) final; + void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, + common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final; - inline InMemDiskArray* getOverflowMetadataDA() { - return overflowMetadataDA.get(); - } + inline AuxiliaryColumn* getDataColumn() { return dataColumn.get(); } + inline AuxiliaryColumn* getOffsetColumn() { return offsetColumn.get(); } void checkpointInMemory() final; void rollbackInMemory() final; @@ -31,15 +32,34 @@ class StringColumn : public Column { protected: void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final; + void scanUnfiltered(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, + common::offset_t endOffsetInGroup, common::ValueVector* resultVector, + uint64_t startPosInVector = 0); + void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final; + void scanValueToVector(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, uint64_t startOffset, uint64_t endOffset, + common::ValueVector* resultVector, uint64_t offsetInVector); + void scanOffsets(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + uint64_t* offsets, uint64_t index); + private: void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr, common::ValueVector* resultVector, common::page_idx_t overflowPageIdx); private: - std::unique_ptr> overflowMetadataDA; + // Main column stores indices of values in the dictionary + // The offset column stores the offsets for each index, and the data column stores the data in + // order. Values are never removed from the dictionary during in-place updates, only appended to + // the end. + std::unique_ptr dataColumn; + std::unique_ptr offsetColumn; }; } // namespace storage diff --git a/src/include/storage/store/string_column_chunk.h b/src/include/storage/store/string_column_chunk.h index e64f93bda4b..f1262b0629d 100644 --- a/src/include/storage/store/string_column_chunk.h +++ b/src/include/storage/store/string_column_chunk.h @@ -1,13 +1,15 @@ #pragma once #include "common/exception/not_implemented.h" -#include "storage/storage_structure/in_mem_file.h" #include "storage/store/column_chunk.h" namespace kuzu { namespace storage { class StringColumnChunk : public ColumnChunk { + using string_offset_t = uint64_t; + using string_index_t = uint32_t; + public: explicit StringColumnChunk(common::LogicalType dataType, uint64_t capacity); @@ -21,13 +23,22 @@ class StringColumnChunk : public ColumnChunk { template T getValue(common::offset_t /*pos*/) const { - throw common::NotImplementedException("VarSizedColumnChunk::getValue"); + throw common::NotImplementedException("StringColumnChunk::getValue"); } - common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx); + uint64_t getStringLength(common::offset_t pos) const { + auto index = ColumnChunk::getValue(pos); + if (index + 1 < offsetChunk->getNumValues()) { + assert(offsetChunk->getValue(index + 1) >= + offsetChunk->getValue(index)); + return offsetChunk->getValue(index + 1) - + offsetChunk->getValue(index); + } + return stringDataChunk->getNumValues() - offsetChunk->getValue(index); + } - inline InMemOverflowFile* getOverflowFile() { return overflowFile.get(); } - inline common::offset_t getLastOffsetInPage() const { return overflowCursor.offsetInPage; } + ColumnChunk* getDataChunk() { return stringDataChunk.get(); } + ColumnChunk* getOffsetChunk() { return offsetChunk.get(); } private: void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk, @@ -36,13 +47,17 @@ class StringColumnChunk : public ColumnChunk { void setValueFromString(const char* value, uint64_t length, uint64_t pos); private: - std::unique_ptr overflowFile; - PageByteCursor overflowCursor; + // String data is stored as a UINT8 chunk, using the numValues in the chunk to track the number + // of characters stored. + std::unique_ptr stringDataChunk; + std::unique_ptr offsetChunk; }; // STRING template<> std::string StringColumnChunk::getValue(common::offset_t pos) const; +template<> +std::string_view StringColumnChunk::getValue(common::offset_t pos) const; } // namespace storage } // namespace kuzu diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 0b0f60042a9..fee650a6318 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -1,5 +1,7 @@ #include "processor/operator/persistent/copy_node.h" +#include + #include "common/exception/copy.h" #include "common/exception/message.h" #include "common/string_format.h" @@ -94,23 +96,24 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, void CopyNode::populatePKIndex( PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) { checkNonNullConstraint(chunk->getNullChunk(), numNodes); - std::string errorPKValueStr; + std::optional errorPKValueStr; pkIndex->lock(); try { switch (chunk->getDataType().getPhysicalType()) { case PhysicalTypeID::INT64: { auto numAppendedNodes = appendToPKIndex(pkIndex, chunk, startOffset, numNodes); if (numAppendedNodes < numNodes) { - errorPKValueStr = - std::to_string(chunk->getValue(startOffset + numAppendedNodes)); + // TODO(bmwinger): This should be tested where there are multiple node groups + errorPKValueStr = std::to_string(chunk->getValue(numAppendedNodes)); } } break; case PhysicalTypeID::STRING: { auto numAppendedNodes = - appendToPKIndex(pkIndex, chunk, startOffset, numNodes); + appendToPKIndex(pkIndex, chunk, startOffset, numNodes); if (numAppendedNodes < numNodes) { + // TODO(bmwinger): This should be tested where there are multiple node groups errorPKValueStr = - chunk->getValue(startOffset + numAppendedNodes).getAsString(); + static_cast(chunk)->getValue(numAppendedNodes); } } break; default: { @@ -123,8 +126,8 @@ void CopyNode::populatePKIndex( throw; } pkIndex->unlock(); - if (!errorPKValueStr.empty()) { - throw CopyException(ExceptionMessage::existedPKException(errorPKValueStr)); + if (errorPKValueStr) { + throw CopyException(ExceptionMessage::existedPKException(*errorPKValueStr)); } } @@ -169,7 +172,7 @@ uint64_t CopyNode::appendToPKIndex( } template<> -uint64_t CopyNode::appendToPKIndex( +uint64_t CopyNode::appendToPKIndex( PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { auto stringColumnChunk = (StringColumnChunk*)chunk; for (auto i = 0u; i < numValues; i++) { diff --git a/src/storage/local_table.cpp b/src/storage/local_table.cpp index ddd537f0a94..8ac4689278a 100644 --- a/src/storage/local_table.cpp +++ b/src/storage/local_table.cpp @@ -45,9 +45,8 @@ void StringLocalVector::update( auto kuStr = updateVector->getValue(offsetInUpdateVector); if (kuStr.len > BufferPoolConstants::PAGE_4KB_SIZE) { throw RuntimeException(ExceptionMessage::overLargeStringValueException(kuStr.len)); - } else if (!ku_string_t::isShortString(kuStr.len)) { - ovfStringLength += kuStr.len; } + totalStringLength += kuStr.len; LocalVector::update(offsetInLocalVector, updateVector, offsetInUpdateVector); } @@ -226,16 +225,35 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { assert(chunks.contains(nodeGroupIdx)); auto localChunk = chunks.at(nodeGroupIdx).get(); auto stringColumn = reinterpret_cast(column); - auto overflowMetadata = - stringColumn->getOverflowMetadataDA()->get(nodeGroupIdx, TransactionType::WRITE); - auto ovfStringLengthInChunk = 0u; + auto dataColumnMetadata = + stringColumn->getDataColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto offsetColumnMetadata = + stringColumn->getOffsetColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto totalStringLengthInChunk = 0u; + uint64_t newStrings = 0; for (auto& [_, localVector] : localChunk->vectors) { auto stringLocalVector = reinterpret_cast(localVector.get()); - ovfStringLengthInChunk += stringLocalVector->ovfStringLength; - } - if (overflowMetadata.lastOffsetInPage + ovfStringLengthInChunk <= - BufferPoolConstants::PAGE_4KB_SIZE) { - // Write the updated overflow strings to the overflow string buffer. + totalStringLengthInChunk += stringLocalVector->totalStringLength; + newStrings += localVector->vector->state->selVector->selectedSize; + } + auto offsetCapacity = + offsetColumnMetadata.compMeta.numValues( + BufferPoolConstants::PAGE_4KB_SIZE, stringColumn->getOffsetColumn()->getDataType()) * + offsetColumnMetadata.numPages; + // Write in-place as long as there is sufficient space in the data chunk + if (dataColumnMetadata.numValues + totalStringLengthInChunk <= + dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE && + offsetColumnMetadata.numValues + newStrings < offsetCapacity + // Indices are limited to 32 bits but in theory could be larger than that since the offset + // column can grow beyond the node group size. + // + // E.g. one big string is written first, followed by NODE_GROUP_SIZE-1 small strings, + // which are all updated in-place many times (which may fit if the first string is large + // enough that 2^n minus the first string's size is large enough to fit the other strings, + // for some n. + // 32 bits should give plenty of space for updates. + && offsetColumnMetadata.numValues + newStrings < + std::numeric_limits::max()) { commitLocalChunkInPlace(nodeGroupIdx, localChunk); } else { commitLocalChunkOutOfPlace(nodeGroupIdx, localChunk); diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index 691271e9a5e..2ce7ee4306b 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -101,10 +101,12 @@ std::unique_ptr TablesStatistics::createMetadataDAHInfo( createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal)); } break; case PhysicalTypeID::STRING: { - auto childMetadataDAHInfo = std::make_unique(); - childMetadataDAHInfo->dataDAHPageIdx = - InMemDiskArray::addDAHPageToFile(metadataFH, bm, wal); - metadataDAHInfo->childrenInfos.push_back(std::move(childMetadataDAHInfo)); + // Data column + metadataDAHInfo->childrenInfos.push_back( + createMetadataDAHInfo(LogicalType(LogicalTypeID::UINT8), metadataFH, bm, wal)); + // Offset column + metadataDAHInfo->childrenInfos.push_back( + createMetadataDAHInfo(LogicalType(LogicalTypeID::UINT64), metadataFH, bm, wal)); } break; default: { // DO NOTHING. diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 6e6e259d034..b14a43abb6f 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -528,13 +528,11 @@ template class BaseDiskArray>; template class BaseDiskArray>; template class BaseDiskArray; template class BaseDiskArray; -template class BaseDiskArray; template class BaseInMemDiskArray; template class BaseInMemDiskArray>; template class BaseInMemDiskArray>; template class BaseInMemDiskArray; template class BaseInMemDiskArray; -template class BaseInMemDiskArray; template class InMemDiskArrayBuilder; template class InMemDiskArrayBuilder>; template class InMemDiskArrayBuilder>; @@ -545,7 +543,6 @@ template class InMemDiskArray>; template class InMemDiskArray>; template class InMemDiskArray; template class InMemDiskArray; -template class InMemDiskArray; } // namespace storage } // namespace kuzu diff --git a/src/storage/store/column.cpp b/src/storage/store/column.cpp index d93f7f1c3a5..9ef99ce1219 100644 --- a/src/storage/store/column.cpp +++ b/src/storage/store/column.cpp @@ -31,12 +31,21 @@ struct InternalIDColumnFunc { } } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { assert(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID); auto internalID = vector->getValue(posInVector); memcpy(frame + posInFrame * sizeof(offset_t), &internalID.offset, sizeof(offset_t)); } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + uint32_t dataOffset, offset_t numValues, const CompressionMetadata& /*metadata*/) { + auto internalIDs = ((internalID_t*)data); + for (int i = 0; i < numValues; i++) { + memcpy(frame + posInFrame * sizeof(offset_t), &internalIDs[dataOffset + i].offset, + sizeof(offset_t)); + } + } }; struct NullColumnFunc { @@ -50,12 +59,19 @@ struct NullColumnFunc { (uint64_t*)frame, pageCursor.elemPosInPage, posInVector, numValuesToRead); } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. // Otherwise, it could read off the end of the page. - NullMask::setNull( - (uint64_t*)frame, posInFrame, NullMask::isNull(vector->getNullMaskData(), posInVector)); + NullMask::setNull((uint64_t*)frame, posInFrame, vector->isNull(posInVector)); + } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + offset_t dataOffset, offset_t numValues, const CompressionMetadata& /*metadata*/) { + // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. + // Otherwise, it could read off the end of the page. + NullMask::copyNullMask( + (const uint64_t*)data, dataOffset, (uint64_t*)frame, posInFrame, numValues); } }; @@ -74,13 +90,21 @@ struct BoolColumnFunc { } } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. // Otherwise, it could read/write off the end of the page. - NullMask::copyNullMask(vector->getValue(posInVector) ? &NullMask::ALL_NULL_ENTRY : - &NullMask::NO_NULL_ENTRY, - posInVector, (uint64_t*)frame, posInFrame, 1); + NullMask::setNull((uint64_t*)frame, posInFrame, vector->getValue(posInVector)); + } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + offset_t dataOffset, offset_t numValues, const CompressionMetadata& /*metadata*/) { + // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. + // Otherwise, it could read/write off the end of the page. + for (offset_t i = 0; i < numValues; i++) { + NullMask::setNull( + (uint64_t*)frame, posInFrame, NullMask::isNull((uint64_t*)data, dataOffset)); + } } static void copyValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result, @@ -111,7 +135,7 @@ static read_values_to_vector_func_t getReadValuesToVectorFunc(const LogicalType& } } -static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& logicalType) { +static read_values_to_page_func_t getReadValuesToPageFunc(const LogicalType& logicalType) { switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::BOOL: return BoolColumnFunc::copyValuesFromPage; @@ -120,15 +144,25 @@ static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& lo } } -static write_values_from_vector_func_t getWriteValuesFromVectorFunc( - const LogicalType& logicalType) { +static write_values_from_vector_func_t getWriteValueFromVectorFunc(const LogicalType& logicalType) { + switch (logicalType.getLogicalTypeID()) { + case LogicalTypeID::INTERNAL_ID: + return InternalIDColumnFunc::writeValueToPageFromVector; + case LogicalTypeID::BOOL: + return BoolColumnFunc::writeValueToPageFromVector; + default: + return WriteCompressedValueToPageFromVector(logicalType); + } +} + +static write_values_func_t getWriteValuesFunc(const LogicalType& logicalType) { switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::INTERNAL_ID: - return InternalIDColumnFunc::writeValueToPage; + return InternalIDColumnFunc::writeValuesToPage; case LogicalTypeID::BOOL: - return BoolColumnFunc::writeValueToPage; + return BoolColumnFunc::writeValuesToPage; default: - return WriteCompressedValueToPage(logicalType); + return WriteCompressedValuesToPage(logicalType); } } @@ -157,7 +191,8 @@ class NullColumn : public Column { metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression, false /*requireNullColumn*/} { readToVectorFunc = NullColumnFunc::readValuesFromPageToVector; - writeFromVectorFunc = NullColumnFunc::writeValueToPage; + writeFromVectorFunc = NullColumnFunc::writeValueToPageFromVector; + writeFunc = NullColumnFunc::writeValuesToPage; } void scan( @@ -227,7 +262,9 @@ class NullColumn : public Column { void write(offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final { - writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + writeValue(chunkMeta, nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); if (vectorToWriteFrom->isNull(posInVectorToWriteFrom)) { propertyStatistics.setHasNull(DUMMY_WRITE_TRANSACTION); } @@ -284,8 +321,8 @@ void InternalIDColumn::populateCommonTableID(ValueVector* resultVector) const { } } -Column::Column(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, - BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, +BaseColumn::BaseColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, + BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats propertyStatistics, bool enableCompression, bool requireNullColumn) : dbFileID{DBFileID::newDataFileID()}, dataType{std::move(dataType)}, dataFH{dataFH}, @@ -296,9 +333,10 @@ Column::Column(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BM transaction); numBytesPerFixedSizedValue = getDataTypeSizeInChunk(this->dataType); readToVectorFunc = getReadValuesToVectorFunc(this->dataType); - readToPageFunc = getWriteValuesToPageFunc(this->dataType); + readToPageFunc = getReadValuesToPageFunc(this->dataType); batchLookupFunc = getBatchLookupFromPageFunc(this->dataType); - writeFromVectorFunc = getWriteValuesFromVectorFunc(this->dataType); + writeFromVectorFunc = getWriteValueFromVectorFunc(this->dataType); + writeFunc = getWriteValuesFunc(this->dataType); assert(numBytesPerFixedSizedValue <= BufferPoolConstants::PAGE_4KB_SIZE); if (requireNullColumn) { nullColumn = std::make_unique(metaDAHeaderInfo.nullDAHPageIdx, dataFH, @@ -334,15 +372,14 @@ void Column::scan(transaction::Transaction* transaction, node_group_idx_t nodeGr resultVector, offsetInVector); } auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); - auto pageCursor = PageUtils::getPageElementCursorForPos(startOffsetInGroup, - chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); - pageCursor.pageIdx += chunkMeta.pageIdx; + auto pageCursor = + getPageCursorForOffsetInGroup(transaction->getType(), startOffsetInGroup, nodeGroupIdx); auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; scanUnfiltered( transaction, pageCursor, numValuesToScan, resultVector, chunkMeta.compMeta, offsetInVector); } -void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { +void BaseColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { if (nullColumn) { nullColumn->scan(nodeGroupIdx, columnChunk->getNullChunk()); } @@ -383,7 +420,7 @@ void Column::scanInternal( } } -void Column::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCursor, +void BaseColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCursor, uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta, uint64_t startPosInVector) { uint64_t numValuesScanned = 0; @@ -401,7 +438,7 @@ void Column::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCur } } -void Column::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor, +void BaseColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor, ValueVector* nodeIDVector, ValueVector* resultVector, const CompressionMetadata& compMeta) { auto numValuesToScan = nodeIDVector->state->getOriginalSize(); auto numValuesScanned = 0u; @@ -459,14 +496,14 @@ void Column::lookupValue(transaction::Transaction* transaction, offset_t nodeOff }); } -void Column::readFromPage( +void BaseColumn::readFromPage( Transaction* transaction, page_idx_t pageIdx, const std::function& func) { auto [fileHandleToPin, pageIdxToPin] = DBFileUtils::getFileHandleAndPhysicalPageIdxToPin( *dataFH, pageIdx, *wal, transaction->getType()); bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, func); } -void Column::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { +void BaseColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { // Main column chunk. auto preScanMetadata = columnChunk->getMetadataToFlush(); auto startPageIdx = dataFH->addNewPages(preScanMetadata.numPages); @@ -485,17 +522,20 @@ void Column::write( nullColumn->write(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); } bool isNull = vectorToWriteFrom->isNull(posInVectorToWriteFrom); - if (isNull) { - return; + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + if (!isNull) { + writeValue(chunkMeta, nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + } + if (nodeOffset >= chunkMeta.numValues) { + chunkMeta.numValues = nodeOffset + 1; + metadataDA->update(nodeGroupIdx, chunkMeta); } - writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); } -void Column::writeValue( - offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { +void Column::writeValue(const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, + ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto walPageInfo = createWALVersionOfPageForValue(nodeOffset); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); try { writeFromVectorFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom, posInVectorToWriteFrom, chunkMeta.compMeta); @@ -508,6 +548,20 @@ void Column::writeValue( dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); } +void Column::writeValue( + const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, const uint8_t* data) { + auto walPageInfo = createWALVersionOfPageForValue(nodeOffset); + try { + writeFunc(walPageInfo.frame, walPageInfo.posInPage, data, 0, 1, chunkMeta.compMeta); + } catch (Exception& e) { + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + throw; + } + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); +} + WALPageIdxPosInPageAndFrame Column::createWALVersionOfPageForValue(offset_t nodeOffset) { auto originalPageCursor = getPageCursorForOffset(TransactionType::WRITE, nodeOffset); bool insertingNewPage = false; @@ -521,27 +575,27 @@ WALPageIdxPosInPageAndFrame Column::createWALVersionOfPageForValue(offset_t node return {walPageIdxAndFrame, originalPageCursor.elemPosInPage}; } -void Column::setNull(offset_t nodeOffset) { +void BaseColumn::setNull(offset_t nodeOffset) { if (nullColumn) { nullColumn->setNull(nodeOffset); } } -void Column::checkpointInMemory() { +void BaseColumn::checkpointInMemory() { metadataDA->checkpointInMemoryIfNecessary(); if (nullColumn) { nullColumn->checkpointInMemory(); } } -void Column::rollbackInMemory() { +void BaseColumn::rollbackInMemory() { metadataDA->rollbackInMemoryIfNecessary(); if (nullColumn) { nullColumn->rollbackInMemory(); } } -void Column::populateWithDefaultVal(const Property& property, Column* column, +void BaseColumn::populateWithDefaultVal(const Property& property, Column* column, ValueVector* defaultValueVector, uint64_t numNodeGroups) const { auto columnChunk = ColumnChunkFactory::createColumnChunk(*property.getDataType(), enableCompression); @@ -554,6 +608,11 @@ void Column::populateWithDefaultVal(const Property& property, Column* column, PageElementCursor Column::getPageCursorForOffset( TransactionType transactionType, offset_t nodeOffset) { auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + return getPageCursorForOffsetAndGroup(transactionType, nodeOffset, nodeGroupIdx); +} + +PageElementCursor Column::getPageCursorForOffsetAndGroup( + TransactionType transactionType, offset_t nodeOffset, node_group_idx_t nodeGroupIdx) { auto offsetInNodeGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); auto chunkMeta = metadataDA->get(nodeGroupIdx, transactionType); auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, @@ -618,5 +677,80 @@ std::unique_ptr ColumnFactory::createColumn(const common::LogicalType& d } } +PageElementCursor BaseColumn::getPageCursorForOffsetInGroup( + transaction::TransactionType transactionType, common::offset_t nodeOffset, + common::node_group_idx_t nodeGroupIdx) { + auto chunkMeta = metadataDA->get(nodeGroupIdx, transactionType); + auto pageCursor = PageUtils::getPageElementCursorForPos( + nodeOffset, chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); + pageCursor.pageIdx += chunkMeta.pageIdx; + return pageCursor; +} + +void BaseColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, + offset_t startOffsetInGroup, offset_t endOffsetInGroup, uint8_t* result) { + // Use existing nodeGroupIdx + auto cursor = + getPageCursorForOffsetInGroup(transaction->getType(), startOffsetInGroup, nodeGroupIdx); + auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); + auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; + uint64_t numValuesScanned = 0; + auto numValuesPerPage = + chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType); + while (numValuesScanned < numValuesToScan) { + uint64_t numValuesToScanInPage = std::min( + (uint64_t)numValuesPerPage - cursor.elemPosInPage, numValuesToScan - numValuesScanned); + readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void { + readToPageFunc( + frame, cursor, result, numValuesScanned, numValuesToScanInPage, chunkMeta.compMeta); + }); + numValuesScanned += numValuesToScanInPage; + cursor.nextPage(); + } +} + +offset_t AuxiliaryColumn::appendValues( + node_group_idx_t nodeGroupIdx, const uint8_t* data, offset_t numValues) { + auto metadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + auto startOffset = metadata.numValues; + + auto cursor = getPageCursorForOffsetInGroup(TransactionType::WRITE, startOffset, nodeGroupIdx); + offset_t valuesWritten = 0; + while (valuesWritten < numValues) { + bool insertingNewPage = false; + if (cursor.pageIdx >= dataFH->getNumPages()) { + assert(cursor.pageIdx == dataFH->getNumPages()); + DBFileUtils::insertNewPage(*dataFH, dbFileID, *bufferManager, *wal); + insertingNewPage = true; + } + + auto numValuesPerPage = + metadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType); + uint64_t numValuesToWriteInPage = + std::min((uint64_t)numValuesPerPage - cursor.elemPosInPage, numValues - valuesWritten); + auto walPageInfo = DBFileUtils::createWALVersionIfNecessaryAndPinPage( + cursor.pageIdx, insertingNewPage, *dataFH, dbFileID, *bufferManager, *wal); + + try { + writeFunc(walPageInfo.frame, cursor.elemPosInPage, data, 0, numValuesToWriteInPage, + metadata.compMeta); + } catch (Exception& e) { + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + throw; + } + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + valuesWritten += numValuesToWriteInPage; + if (valuesWritten < numValues) { + cursor.nextPage(); + metadata.numPages++; + } + } + metadata.numValues += numValues; + metadataDA->update(nodeGroupIdx, metadata); + return startOffset; +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index fbcb2bbff0b..3383c29a6c8 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -1,5 +1,6 @@ #include "storage/store/column_chunk.h" +#include "storage/storage_utils.h" #include "storage/store/compression.h" #include "storage/store/string_column_chunk.h" #include "storage/store/struct_column_chunk.h" @@ -58,10 +59,21 @@ class CompressedFlushBuffer { } else { valuesRemaining -= numValuesPerPage; } + if (compressedSize < BufferPoolConstants::PAGE_4KB_SIZE) { + memset(compressedBuffer.get() + compressedSize, 0, + BufferPoolConstants::PAGE_4KB_SIZE - compressedSize); + } FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), compressedSize, (startPageIdx + numPages) * BufferPoolConstants::PAGE_4KB_SIZE); numPages++; } while (valuesRemaining > 0); + // Make sure that the file is the right length + if (numPages < metadata.numPages) { + memset(compressedBuffer.get(), 0, BufferPoolConstants::PAGE_4KB_SIZE); + FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), + BufferPoolConstants::PAGE_4KB_SIZE, + (startPageIdx + metadata.numPages) * BufferPoolConstants::PAGE_4KB_SIZE); + } return ColumnChunkMetadata( startPageIdx, metadata.numPages, metadata.numValues, metadata.compMeta); } @@ -317,30 +329,6 @@ ColumnChunkMetadata ColumnChunk::flushBuffer( return flushBufferFunction(buffer.get(), bufferSize, dataFH, startPageIdx, metadata); } -uint32_t ColumnChunk::getDataTypeSizeInChunk(LogicalType& dataType) { - switch (dataType.getLogicalTypeID()) { - case LogicalTypeID::SERIAL: - case LogicalTypeID::STRUCT: { - return 0; - } - case LogicalTypeID::STRING: { - return sizeof(ku_string_t); - } - case LogicalTypeID::VAR_LIST: - case LogicalTypeID::INTERNAL_ID: { - return sizeof(offset_t); - } - // Used by NodeColumn to represent the de-compressed size of booleans in-memory - // Does not reflect the size of booleans on-disk in BoolColumnChunk - case LogicalTypeID::BOOL: { - return 1; - } - default: { - return StorageUtils::getDataTypeSize(dataType); - } - } -} - uint64_t ColumnChunk::getBufferSize() const { switch (dataType.getLogicalTypeID()) { case LogicalTypeID::BOOL: { diff --git a/src/storage/store/compression.cpp b/src/storage/store/compression.cpp index c478bfb29da..ca26e8b58dc 100644 --- a/src/storage/store/compression.cpp +++ b/src/storage/store/compression.cpp @@ -1,5 +1,6 @@ #include "storage/store/compression.h" +#include #include #include "common/exception/not_implemented.h" @@ -20,19 +21,18 @@ namespace storage { uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) { using namespace common; switch (dataType.getLogicalTypeID()) { + case LogicalTypeID::SERIAL: case LogicalTypeID::STRUCT: { return 0; } - case LogicalTypeID::STRING: { - return sizeof(ku_string_t); + case LogicalTypeID::STRING: + case LogicalTypeID::BLOB: { + return sizeof(uint32_t); } case LogicalTypeID::VAR_LIST: case LogicalTypeID::INTERNAL_ID: { return sizeof(offset_t); } - case LogicalTypeID::SERIAL: { - return sizeof(int64_t); - } default: { auto size = StorageUtils::getDataTypeSize(dataType); assert(size <= BufferPoolConstants::PAGE_4KB_SIZE); @@ -263,25 +263,34 @@ void fastpack(const T* in, uint8_t* out, uint8_t bitWidth) { } template -void IntegerBitpacking::setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, const CompressionMetadata& metadata) const { +void IntegerBitpacking::setValuesFromUncompressed(const uint8_t* srcBuffer, offset_t posInSrc, + uint8_t* dstBuffer, offset_t posInDst, offset_t numValues, + const CompressionMetadata& metadata) const { auto header = BitpackHeader::readHeader(metadata.data); // This is a fairly naive implementation which uses fastunpack/fastpack // to modify the data by decompressing/compressing a single chunk of values. // - // TODO(bmwinger): modify the data in-place + // TODO(bmwinger): modify the data in-place when numValues is small (frequently called with + // numValues=1) // // Data can be considered to be stored in aligned chunks of 32 values // with a size of 32 * bitWidth bits, // or bitWidth 32-bit values (we cast the buffer to a uint32_t* later). auto chunkStart = (uint8_t*)getChunkStart(dstBuffer, posInDst, header.bitWidth); - auto posInChunk = posInDst % CHUNK_SIZE; - auto value = ((T*)srcBuffer)[posInSrc]; - assert(canUpdateInPlace(value, header)); - + auto startPosInChunk = posInDst % CHUNK_SIZE; U chunk[CHUNK_SIZE]; fastunpack(chunkStart, chunk, header.bitWidth); - chunk[posInChunk] = (U)(value - (T)header.offset); + for (offset_t i = 0; i < numValues; i++) { + auto value = ((T*)srcBuffer)[posInSrc]; + assert(canUpdateInPlace(value, header)); + chunk[startPosInChunk + i] = (U)(value - (T)header.offset); + if (startPosInChunk + i + 1 >= CHUNK_SIZE && i + 1 < numValues) { + fastpack(chunk, chunkStart, header.bitWidth); + chunkStart = (uint8_t*)getChunkStart(dstBuffer, posInDst + i + 1, header.bitWidth); + fastunpack(chunkStart, chunk, header.bitWidth); + startPosInChunk = 0; + } + } fastpack(chunk, chunkStart, header.bitWidth); } @@ -419,10 +428,13 @@ template class IntegerBitpacking; template class IntegerBitpacking; template class IntegerBitpacking; -void BooleanBitpacking::setValueFromUncompressed(uint8_t* srcBuffer, offset_t posInSrc, - uint8_t* dstBuffer, offset_t posInDst, const CompressionMetadata& /*metadata*/) const { - auto val = ((bool*)srcBuffer)[posInSrc]; - common::NullMask::setNull((uint64_t*)dstBuffer, posInDst, val); +void BooleanBitpacking::setValuesFromUncompressed(const uint8_t* srcBuffer, offset_t srcOffset, + uint8_t* dstBuffer, offset_t dstOffset, offset_t numValues, + const CompressionMetadata& /*metadata*/) const { + for (auto i = 0; i < numValues; i++) { + common::NullMask::setNull( + (uint64_t*)dstBuffer, dstOffset + i, ((bool*)srcBuffer)[srcOffset + i]); + } } void BooleanBitpacking::getValue(const uint8_t* buffer, offset_t posInBuffer, uint8_t* dst, @@ -579,46 +591,47 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor& } } -void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame, - common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) { +void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame, + const uint8_t* data, offset_t dataOffset, offset_t numValues, + const CompressionMetadata& metadata) { switch (metadata.compression) { case CompressionType::UNCOMPRESSED: - return uncompressed.setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return uncompressed.setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); case CompressionType::INTEGER_BITPACKING: { switch (physicalType) { case PhysicalTypeID::INT64: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT32: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT16: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT8: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::UINT64: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT32: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT16: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT8: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } default: { throw NotImplementedException("INTEGER_BITPACKING is not implemented for type " + @@ -627,10 +640,15 @@ void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame, } } case CompressionType::BOOLEAN_BITPACKING: - return booleanBitpacking.setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return booleanBitpacking.setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } } +void WriteCompressedValueToPageFromVector::operator()(uint8_t* frame, uint16_t posInFrame, + common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) { + writeFunc(frame, posInFrame, vector->getData(), posInVector, 1, metadata); +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/string_column.cpp b/src/storage/store/string_column.cpp index 0c344e54eb1..b12a3584a62 100644 --- a/src/storage/store/string_column.cpp +++ b/src/storage/store/string_column.cpp @@ -10,29 +10,44 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -struct StringColumnFunc { - static void writeStringValuesToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, - uint32_t posInVector, const CompressionMetadata& /*metadata*/) { - auto kuStrInFrame = (ku_string_t*)(frame + (posInFrame * sizeof(ku_string_t))); - auto kuStrInVector = vector->getValue(posInVector); - memcpy(kuStrInFrame->prefix, kuStrInVector.prefix, - std::min((uint64_t)kuStrInVector.len, ku_string_t::SHORT_STR_LENGTH)); - kuStrInFrame->len = kuStrInVector.len; - kuStrInFrame->overflowPtr = kuStrInVector.overflowPtr; - } -}; - StringColumn::StringColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats stats) : Column{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, transaction, stats, false /* enableCompression */, true /* requireNullColumn */} { - if (this->dataType.getLogicalTypeID() == LogicalTypeID::STRING) { - writeFromVectorFunc = StringColumnFunc::writeStringValuesToPage; + // TODO(bmwinger): detecting if string child columns must be re-compressed when updating is not + // yet supported + auto enableCompression = false; + dataColumn = std::make_unique(LogicalType(LogicalTypeID::UINT8), + *metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction, + stats, enableCompression, false /*requireNullColumn*/); + offsetColumn = std::make_unique(LogicalType(LogicalTypeID::UINT64), + *metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction, + stats, enableCompression, false /*requireNullColumn*/); +} + +void StringColumn::scanOffsets(Transaction* transaction, node_group_idx_t nodeGroupIdx, + string_offset_t* offsets, uint64_t index) { + // We either need to read the next value, or store the maximum string offset at the end. + // Otherwise we won't know what the length of the last string is. + auto numValues = offsetColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; + if (index < numValues - 1) { + offsetColumn->scan(transaction, nodeGroupIdx, index, index + 2, (uint8_t*)offsets); + } else { + offsetColumn->scan(transaction, nodeGroupIdx, index, index + 1, (uint8_t*)offsets); + offsets[1] = dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues; } - overflowMetadataDA = std::make_unique>(*metadataFH, - DBFileID::newMetadataFileID(), metaDAHeaderInfo.childrenInfos[0]->dataDAHPageIdx, - bufferManager, wal, transaction); +} + +void StringColumn::scanValueToVector(Transaction* transaction, node_group_idx_t nodeGroupIdx, + string_offset_t startOffset, string_offset_t endOffset, ValueVector* resultVector, + uint64_t offsetInVector) { + assert(endOffset >= startOffset); + // TODO: Add string to vector first and read directly instead of using a temporary buffer + std::unique_ptr stringRead = std::make_unique(endOffset - startOffset); + dataColumn->scan(transaction, nodeGroupIdx, startOffset, endOffset, (uint8_t*)stringRead.get()); + StringVector::addString( + resultVector, offsetInVector, stringRead.get(), endOffset - startOffset); } void StringColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -40,74 +55,50 @@ void StringColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, uint64_t offsetInVector) { nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - Column::scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, + scanUnfiltered(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - auto numValuesToRead = endOffsetInGroup - startOffsetInGroup; - auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; - for (auto i = 0u; i < numValuesToRead; i++) { - auto pos = offsetInVector + i; - if (resultVector->isNull(pos)) { - continue; - } - readStringValueFromOvf( - transaction, resultVector->getValue(pos), resultVector, overflowPageIdx); - } } void StringColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { Column::scan(nodeGroupIdx, columnChunk); auto stringColumnChunk = reinterpret_cast(columnChunk); - auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto inMemOverflowFile = stringColumnChunk->getOverflowFile(); - inMemOverflowFile->addNewPages(overflowMetadata.numPages); - for (auto i = 0u; i < overflowMetadata.numPages; i++) { - auto pageIdx = overflowMetadata.pageIdx + i; - FileUtils::readFromFile(dataFH->getFileInfo(), inMemOverflowFile->getPage(i)->data, - BufferPoolConstants::PAGE_4KB_SIZE, pageIdx * BufferPoolConstants::PAGE_4KB_SIZE); - } + dataColumn->scan(nodeGroupIdx, stringColumnChunk->getDataChunk()); + offsetColumn->scan(nodeGroupIdx, stringColumnChunk->getOffsetChunk()); } void StringColumn::append(ColumnChunk* columnChunk, node_group_idx_t nodeGroupIdx) { Column::append(columnChunk, nodeGroupIdx); auto stringColumnChunk = reinterpret_cast(columnChunk); - auto startPageIdx = dataFH->addNewPages(stringColumnChunk->getOverflowFile()->getNumPages()); - auto numPagesForOverflow = stringColumnChunk->flushOverflowBuffer(dataFH, startPageIdx); - overflowMetadataDA->resize(nodeGroupIdx + 1); - overflowMetadataDA->update( - nodeGroupIdx, OverflowColumnChunkMetadata{startPageIdx, numPagesForOverflow, - stringColumnChunk->getLastOffsetInPage()}); + dataColumn->append(stringColumnChunk->getDataChunk(), nodeGroupIdx); + offsetColumn->append(stringColumnChunk->getOffsetChunk(), nodeGroupIdx); } -void StringColumn::writeValue( - offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { +void StringColumn::writeValue(const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, + ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto& kuStr = vectorToWriteFrom->getValue(posInVectorToWriteFrom); - if (!ku_string_t::isShortString(kuStr.len)) { - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto overflowPageIdxInChunk = overflowMetadata.numPages - 1; - auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage( - overflowMetadata.pageIdx + overflowPageIdxInChunk, false /* insertingNewPage */, - *dataFH, dbFileID, *bufferManager, *wal); - memcpy(walPageIdxAndFrame.frame + overflowMetadata.lastOffsetInPage, - reinterpret_cast(kuStr.overflowPtr), kuStr.len); - bufferManager->unpin(*wal->fileHandle, walPageIdxAndFrame.pageIdxInWAL); - dataFH->releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx); - TypeUtils::encodeOverflowPtr( - kuStr.overflowPtr, overflowPageIdxInChunk, overflowMetadata.lastOffsetInPage); - overflowMetadata.lastOffsetInPage += kuStr.len; - overflowMetadataDA->update(nodeGroupIdx, overflowMetadata); - } - Column::writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + // Write string data to end of dataColumn + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto startOffset = + dataColumn->appendValues(nodeGroupIdx, (const uint8_t*)kuStr.getData(), kuStr.len); + + // Write offset + string_index_t index = + offsetColumn->appendValues(nodeGroupIdx, (const uint8_t*)&startOffset, 1); + + // Write index to main column + Column::writeValue(chunkMeta, nodeOffset, (uint8_t*)&index); } void StringColumn::checkpointInMemory() { - Column::checkpointInMemory(); - overflowMetadataDA->checkpointInMemoryIfNecessary(); + BaseColumn::checkpointInMemory(); + dataColumn->checkpointInMemory(); + offsetColumn->checkpointInMemory(); } void StringColumn::rollbackInMemory() { - Column::rollbackInMemory(); - overflowMetadataDA->rollbackInMemoryIfNecessary(); + BaseColumn::rollbackInMemory(); + dataColumn->rollbackInMemory(); + offsetColumn->rollbackInMemory(); } void StringColumn::scanInternal( @@ -116,15 +107,51 @@ void StringColumn::scanInternal( auto startNodeOffset = nodeIDVector->readNodeOffset(0); assert(startNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); - Column::scanInternal(transaction, nodeIDVector, resultVector); - auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; + auto startOffsetInGroup = + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + if (nodeIDVector->state->selVector->isUnfiltered()) { + scanUnfiltered(transaction, nodeGroupIdx, startOffsetInGroup, + startOffsetInGroup + nodeIDVector->state->selVector->selectedSize, resultVector); + } else { + scanFiltered(transaction, nodeGroupIdx, startOffsetInGroup, nodeIDVector, resultVector); + } +} + +void StringColumn::scanUnfiltered(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, offset_t startOffsetInGroup, offset_t endOffsetInGroup, + common::ValueVector* resultVector, uint64_t startPosInVector) { + auto numValuesToRead = endOffsetInGroup - startOffsetInGroup; + auto indices = std::make_unique(numValuesToRead); + BaseColumn::scan( + transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, (uint8_t*)indices.get()); + for (auto i = 0u; i < numValuesToRead; i++) { + if (resultVector->isNull(startPosInVector + i)) { + continue; + } + // scan string data from the dataColumn into a temporary string + string_offset_t offsets[2]; + scanOffsets(transaction, nodeGroupIdx, offsets, indices[i]); + scanValueToVector( + transaction, nodeGroupIdx, offsets[0], offsets[1], resultVector, startPosInVector + i); + } +} + +void StringColumn::scanFiltered(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector) { for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; if (resultVector->isNull(pos)) { + // Ignore positions which were scanned as null continue; } - readStringValueFromOvf( - transaction, resultVector->getValue(pos), resultVector, overflowPageIdx); + auto offsetInGroup = startOffsetInGroup + pos; + string_index_t index; + BaseColumn::scan( + transaction, nodeGroupIdx, offsetInGroup, offsetInGroup + 1, (uint8_t*)&index); + string_offset_t offsets[2]; + scanOffsets(transaction, nodeGroupIdx, offsets, index); + scanValueToVector(transaction, nodeGroupIdx, offsets[0], offsets[1], resultVector, pos); } } @@ -132,35 +159,23 @@ void StringColumn::lookupInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { assert(dataType.getPhysicalType() == PhysicalTypeID::STRING); auto startNodeOffset = nodeIDVector->readNodeOffset(0); - auto overflowPageIdx = - overflowMetadataDA - ->get(StorageUtils::getNodeGroupIdx(startNodeOffset), transaction->getType()) - .pageIdx; - Column::lookupInternal(transaction, nodeIDVector, resultVector); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - if (!resultVector->isNull(pos)) { - readStringValueFromOvf(transaction, resultVector->getValue(pos), - resultVector, overflowPageIdx); + if (resultVector->isNull(pos)) { + // Ignore positions which were scanned as null + continue; } + string_offset_t offsets[2]; + auto offsetInGroup = + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx) + pos; + string_index_t index; + BaseColumn::scan( + transaction, nodeGroupIdx, offsetInGroup, offsetInGroup + 1, (uint8_t*)&index); + scanOffsets(transaction, nodeGroupIdx, offsets, index); + scanValueToVector(transaction, nodeGroupIdx, offsets[0], offsets[1], resultVector, pos); } } -void StringColumn::readStringValueFromOvf(Transaction* transaction, ku_string_t& kuStr, - ValueVector* resultVector, page_idx_t overflowPageIdx) { - if (ku_string_t::isShortString(kuStr.len)) { - return; - } - PageByteCursor cursor; - TypeUtils::decodeOverflowPtr(kuStr.overflowPtr, cursor.pageIdx, cursor.offsetInPage); - cursor.pageIdx += overflowPageIdx; - auto [fileHandleToPin, pageIdxToPin] = DBFileUtils::getFileHandleAndPhysicalPageIdxToPin( - *dataFH, cursor.pageIdx, *wal, transaction->getType()); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) { - StringVector::addString( - resultVector, kuStr, (const char*)(frame + cursor.offsetInPage), kuStr.len); - }); -} - } // namespace storage } // namespace kuzu diff --git a/src/storage/store/string_column_chunk.cpp b/src/storage/store/string_column_chunk.cpp index d4a27aed8a8..9788ff0aed5 100644 --- a/src/storage/store/string_column_chunk.cpp +++ b/src/storage/store/string_column_chunk.cpp @@ -1,8 +1,6 @@ #include "storage/store/string_column_chunk.h" #include "common/exception/not_implemented.h" -#include "common/type_utils.h" -#include "storage/store/table_copy_utils.h" using namespace kuzu::common; @@ -10,30 +8,42 @@ namespace kuzu { namespace storage { StringColumnChunk::StringColumnChunk(LogicalType dataType, uint64_t capacity) - : ColumnChunk{std::move(dataType), capacity} { - overflowFile = std::make_unique(); - overflowCursor.pageIdx = 0; - overflowCursor.offsetInPage = 0; + : ColumnChunk{std::move(dataType), capacity, false /*enableCompression*/} { + // Bitpacking might save 1 bit per value with regular ascii compared to UTF-8 + // Detecting when we need to re-compress the child chunks is not currently supported. + bool enableCompression = false; + stringDataChunk = + ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT8), enableCompression); + // The offset chunk is able to grow beyond the node group size. + // We rely on appending to the dictionary when updating, however if the chunk is full, + // there will be no space for in-place updates. + // The data chunk doubles in size on use, but out of place updates will never need the offset + // chunk to be greater than the node group size since they remove unused entries. + // So the chunk is initialized with a size equal to 3/4 the node group size, making sure there + // is always extra space for updates. + offsetChunk = ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT64), + enableCompression, false /*needFinalize*/, StorageConstants::NODE_GROUP_SIZE * 0.75); } void StringColumnChunk::resetToEmpty() { ColumnChunk::resetToEmpty(); - overflowFile = std::make_unique(); - overflowCursor.resetValue(); + stringDataChunk->resetToEmpty(); + offsetChunk->resetToEmpty(); } void StringColumnChunk::append(ValueVector* vector, offset_t startPosInChunk) { assert(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); - ColumnChunk::copyVectorToBuffer(vector, startPosInChunk); - auto stringsToSetOverflow = (ku_string_t*)(buffer.get() + startPosInChunk * numBytesPerValue); for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { - auto& stringToSet = stringsToSetOverflow[i]; - if (!ku_string_t::isShortString(stringToSet.len)) { - overflowFile->copyStringOverflow( - overflowCursor, reinterpret_cast(stringToSet.overflowPtr), &stringToSet); + // index is stored in main chunk, data is stored in the data chunk + auto pos = vector->state->selVector->selectedPositions[i]; + nullChunk->setNull(startPosInChunk + i, vector->isNull(pos)); + if (vector->isNull(pos)) { + numValues++; + continue; } + auto kuString = vector->getValue(pos); + setValueFromString(kuString.getAsString().c_str(), kuString.len, startPosInChunk + i); } - numValues += vector->state->selVector->selectedSize; } void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk, @@ -48,10 +58,9 @@ void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk otherChunk, startPosInOtherChunk, startPosInChunk, numValuesToAppend); } break; default: { - throw NotImplementedException("VarSizedColumnChunk::append"); + throw NotImplementedException("StringColumnChunk::append"); } } - numValues += numValuesToAppend; } void StringColumnChunk::write(ValueVector* vector, offset_t startOffsetInChunk) { @@ -77,55 +86,62 @@ void StringColumnChunk::write(ValueVector* valueVector, ValueVector* offsetInChu auto offsetInChunk = offsets[offsetInChunkVector->state->selVector->selectedPositions[i]]; auto offsetInVector = valueVector->state->selVector->selectedPositions[i]; nullChunk->setNull(offsetInChunk, valueVector->isNull(offsetInVector)); + if (offsetInChunk >= numValues) { + numValues = offsetInChunk + 1; + } if (!valueVector->isNull(offsetInVector)) { auto kuStr = valueVector->getValue(offsetInVector); - setValueFromString(kuStr.getAsString().c_str(), kuStr.len, offsetInChunk); + setValueFromString((const char*)kuStr.getData(), kuStr.len, offsetInChunk); } } } -page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) { - for (auto i = 0u; i < overflowFile->getNumPages(); i++) { - FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data, - BufferPoolConstants::PAGE_4KB_SIZE, startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE); - startPageIdx++; - } - return overflowFile->getNumPages(); -} - void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other, offset_t startPosInOtherChunk, offset_t startPosInChunk, uint32_t numValuesToAppend) { - auto otherKuVals = reinterpret_cast(other->buffer.get()); - auto kuVals = reinterpret_cast(buffer.get()); + auto indices = reinterpret_cast(buffer.get()); for (auto i = 0u; i < numValuesToAppend; i++) { auto posInChunk = i + startPosInChunk; auto posInOtherChunk = i + startPosInOtherChunk; - kuVals[posInChunk] = otherKuVals[posInOtherChunk]; - if (other->nullChunk->isNull(posInOtherChunk) || - otherKuVals[posInOtherChunk].len <= ku_string_t::SHORT_STR_LENGTH) { + if (nullChunk->isNull(posInChunk)) { + indices[posInChunk] = 0; continue; } - PageByteCursor cursorToCopyFrom; - TypeUtils::decodeOverflowPtr(otherKuVals[posInOtherChunk].overflowPtr, - cursorToCopyFrom.pageIdx, cursorToCopyFrom.offsetInPage); - overflowFile->copyStringOverflow(overflowCursor, - other->overflowFile->getPage(cursorToCopyFrom.pageIdx)->data + - cursorToCopyFrom.offsetInPage, - &kuVals[posInChunk]); + auto stringInOtherChunk = other->getValue(posInOtherChunk); + setValueFromString(stringInOtherChunk.data(), stringInOtherChunk.size(), posInChunk); } } void StringColumnChunk::setValueFromString(const char* value, uint64_t length, uint64_t pos) { - TableCopyUtils::validateStrLen(length); - auto val = overflowFile->copyString(value, length, overflowCursor); - setValue(val, pos); + auto space = stringDataChunk->getCapacity() - stringDataChunk->getNumValues(); + if (length > space) { + stringDataChunk->resize(std::bit_ceil(stringDataChunk->getCapacity() + length)); + } + if (pos >= numValues) { + numValues = pos + 1; + } + auto startOffset = stringDataChunk->getNumValues(); + memcpy(stringDataChunk->getData() + startOffset, value, length); + stringDataChunk->setNumValues(startOffset + length); + auto index = offsetChunk->getNumValues(); + + if (index >= offsetChunk->getCapacity()) { + offsetChunk->resize(offsetChunk->getCapacity() * 2); + } + offsetChunk->setValue(startOffset, index); + offsetChunk->setNumValues(index + 1); + ColumnChunk::setValue(index, pos); } // STRING template<> +std::string_view StringColumnChunk::getValue(offset_t pos) const { + auto index = ColumnChunk::getValue(pos); + auto offset = offsetChunk->getValue(index); + return std::string_view((const char*)stringDataChunk->getData() + offset, getStringLength(pos)); +} +template<> std::string StringColumnChunk::getValue(offset_t pos) const { - auto kuStr = ((ku_string_t*)buffer.get())[pos]; - return overflowFile->readString(&kuStr); + return std::string(getValue(pos)); } } // namespace storage diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index c4ba106ce40..52880d80ebf 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -145,17 +145,16 @@ void VarListColumn::rollbackInMemory() { offset_t VarListColumn::readOffset( Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) { - auto offsetVector = std::make_unique(LogicalTypeID::INT64); - offsetVector->state = DataChunkState::getSingleValueDataChunkState(); auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); pageCursor.pageIdx += chunkMeta.pageIdx; + offset_t value; readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { - readToVectorFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */, + readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */, 1 /* numValuesToRead */, chunkMeta.compMeta); }); - return offsetVector->getValue(0); + return value; } ListOffsetInfoInStorage VarListColumn::getListOffsetInfoInStorage(Transaction* transaction, diff --git a/test/storage/compression_test.cpp b/test/storage/compression_test.cpp index fff580791f4..85b827bcdba 100644 --- a/test/storage/compression_test.cpp +++ b/test/storage/compression_test.cpp @@ -23,7 +23,7 @@ void test_compression(CompressionAlg& alg, std::vector src) { EXPECT_EQ(src, decompressed); // works with all bit widths (but not all offsets) T value = 0; - alg.setValueFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, metadata); + alg.setValuesFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, 1, metadata); alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata); src[1] = value; EXPECT_EQ(decompressed, src);