From a92953d7e9c5d86eca9f91fed7f9f1d4f399b16d Mon Sep 17 00:00:00 2001 From: Benjamin Winger Date: Thu, 19 Oct 2023 09:54:06 -0400 Subject: [PATCH] Rewrite string storage to use index, offset and data columns This sets up the storage layout for compressing strings, and removes the limit on string size in storage. --- CMakeLists.txt | 2 +- src/common/vector/value_vector.cpp | 11 + src/include/common/constants.h | 2 +- src/include/common/vector/value_vector.h | 3 + .../processor/operator/persistent/copy_node.h | 2 +- src/include/storage/compression/compression.h | 44 ++- src/include/storage/storage_info.h | 5 +- src/include/storage/store/column.h | 34 ++- src/include/storage/store/column_chunk.h | 31 +- src/include/storage/store/string_column.h | 34 ++- .../storage/store/string_column_chunk.h | 29 +- .../operator/persistent/copy_node.cpp | 19 +- src/storage/compression/compression.cpp | 135 +++++---- .../stats/table_statistics_collection.cpp | 12 +- src/storage/storage_structure/disk_array.cpp | 3 - src/storage/store/column.cpp | 198 ++++++++++--- src/storage/store/column_chunk.cpp | 40 ++- src/storage/store/string_column.cpp | 266 ++++++++++++------ src/storage/store/string_column_chunk.cpp | 105 ++++--- src/storage/store/var_list_column.cpp | 7 +- test/storage/compression_test.cpp | 2 +- test/test_files/tinysnb/call/call.test | 2 +- 22 files changed, 654 insertions(+), 332 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d09e98bfa..5af2a2ba22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.11) -project(Kuzu VERSION 0.0.12.1 LANGUAGES CXX C) +project(Kuzu VERSION 0.0.12.2 LANGUAGES CXX C) find_package(Threads REQUIRED) diff --git a/src/common/vector/value_vector.cpp b/src/common/vector/value_vector.cpp index a3bf0dcf28..3d7b9182b2 100644 --- a/src/common/vector/value_vector.cpp +++ b/src/common/vector/value_vector.cpp @@ -445,6 +445,17 @@ void StringVector::addString( } } +ku_string_t& StringVector::reserveString(ValueVector* vector, uint32_t vectorPos, uint64_t length) { + KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); + auto stringBuffer = reinterpret_cast(vector->auxiliaryBuffer.get()); + auto& dstStr = vector->getValue(vectorPos); + dstStr.len = length; + if (!ku_string_t::isShortString(length)) { + dstStr.overflowPtr = reinterpret_cast(stringBuffer->allocateOverflow(length)); + } + return dstStr; +} + void StringVector::addString(ValueVector* vector, ku_string_t& dstStr, ku_string_t& srcStr) { KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); auto stringBuffer = reinterpret_cast(vector->auxiliaryBuffer.get()); diff --git a/src/include/common/constants.h b/src/include/common/constants.h index 271e1defbe..5b13e1cd21 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -5,7 +5,7 @@ namespace kuzu { namespace common { -constexpr char KUZU_VERSION[] = "v0.0.12.1"; +constexpr char KUZU_VERSION[] = "v0.0.12.2"; constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11; constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2; diff --git a/src/include/common/vector/value_vector.h b/src/include/common/vector/value_vector.h index 6286083deb..f769155c6a 100644 --- a/src/include/common/vector/value_vector.h +++ b/src/include/common/vector/value_vector.h @@ -118,6 +118,9 @@ class StringVector { static void addString(ValueVector* vector, uint32_t vectorPos, ku_string_t& srcStr); static void addString( ValueVector* vector, uint32_t vectorPos, const char* srcStr, uint64_t length); + // Add empty string with space reserved for the provided size + // Returned value can be modified to set the string contents + static ku_string_t& reserveString(ValueVector* vector, uint32_t vectorPos, uint64_t length); static void addString(ValueVector* vector, ku_string_t& dstStr, ku_string_t& srcStr); static void addString( ValueVector* vector, ku_string_t& dstStr, const char* srcStr, uint64_t length); diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 9c6706e4c9..17b3402e28 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -123,7 +123,7 @@ template<> uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, +uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); } // namespace processor diff --git a/src/include/storage/compression/compression.h b/src/include/storage/compression/compression.h index ff644a9b64..3db122236c 100644 --- a/src/include/storage/compression/compression.h +++ b/src/include/storage/compression/compression.h @@ -42,7 +42,7 @@ struct CompressionMetadata { // Returns true if and only if the provided value within the vector can be updated // in this chunk in-place. bool canUpdateInPlace( - const common::ValueVector& vector, uint32_t pos, common::PhysicalTypeID physicalType) const; + const uint8_t* data, uint32_t pos, common::PhysicalTypeID physicalType) const; bool canAlwaysUpdateInPlace() const; }; @@ -52,8 +52,8 @@ class CompressionAlg { // Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer // Offsets refer to value offsets, not byte offsets - virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, + virtual void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, const CompressionMetadata& metadata) const = 0; // Reads a value from the buffer at the given position and stores it at the given memory address @@ -102,11 +102,11 @@ class Uncompressed : public CompressionAlg { Uncompressed(const Uncompressed&) = default; - inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, + inline void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, const CompressionMetadata& /*metadata*/) const final { - memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue, - numBytesPerValue); + memcpy(dstBuffer + dstOffset * numBytesPerValue, srcBuffer + srcOffset * numBytesPerValue, + numBytesPerValue * numValues); } inline void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, @@ -185,8 +185,9 @@ class IntegerBitpacking : public CompressionAlg { IntegerBitpacking() = default; IntegerBitpacking(const IntegerBitpacking&) = default; - void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer, - common::offset_t posInDst, const CompressionMetadata& metadata) const final; + void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, + const CompressionMetadata& metadata) const final; // Read a single value from the buffer void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, @@ -241,8 +242,9 @@ class BooleanBitpacking : public CompressionAlg { BooleanBitpacking() = default; BooleanBitpacking(const BooleanBitpacking&) = default; - void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer, - common::offset_t posInDst, const CompressionMetadata& metadata) const final; + void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset, + uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues, + const CompressionMetadata& metadata) const final; void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst, common::offset_t posInDst, const CompressionMetadata& metadata) const final; @@ -294,14 +296,28 @@ class ReadCompressedValuesFromPage : public CompressedFunctor { uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata); }; -class WriteCompressedValueToPage : public CompressedFunctor { +class WriteCompressedValuesToPage : public CompressedFunctor { public: - explicit WriteCompressedValueToPage(const common::LogicalType& logicalType) + explicit WriteCompressedValuesToPage(const common::LogicalType& logicalType) : CompressedFunctor(logicalType) {} - WriteCompressedValueToPage(const WriteCompressedValueToPage&) = default; + WriteCompressedValuesToPage(const WriteCompressedValuesToPage&) = default; + + void operator()(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + common::offset_t dataOffset, common::offset_t numValues, + const CompressionMetadata& metadata); +}; + +class WriteCompressedValueToPageFromVector { +public: + explicit WriteCompressedValueToPageFromVector(const common::LogicalType& logicalType) + : writeFunc(logicalType) {} + WriteCompressedValueToPageFromVector(const WriteCompressedValueToPageFromVector&) = default; void operator()(uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata); + +private: + WriteCompressedValuesToPage writeFunc; }; } // namespace storage diff --git a/src/include/storage/storage_info.h b/src/include/storage/storage_info.h index b761332a41..6d018d257f 100644 --- a/src/include/storage/storage_info.h +++ b/src/include/storage/storage_info.h @@ -11,8 +11,9 @@ using storage_version_t = uint64_t; struct StorageVersionInfo { static std::unordered_map getStorageVersionInfo() { - return {{"0.0.12.1", 24}, {"0.0.12", 23}, {"0.0.11", 23}, {"0.0.10", 23}, {"0.0.9", 23}, - {"0.0.8", 17}, {"0.0.7", 15}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3", 1}}; + return {{"0.0.12.2", 25}, {"0.0.12.1", 24}, {"0.0.12", 23}, {"0.0.11", 23}, {"0.0.10", 23}, + {"0.0.9", 23}, {"0.0.8", 17}, {"0.0.7", 15}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, + {"0.0.3", 1}}; } static storage_version_t getStorageVersion(); diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index 61b3410a93..6229e50974 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -16,6 +16,9 @@ using read_values_to_vector_func_t = std::function; using write_values_from_vector_func_t = std::function; +using write_values_func_t = std::function; using read_values_to_page_func_t = std::function* getMetadataDA() const { return metadataDA.get(); } + virtual void scan(transaction::Transaction* transaction, const ReadState& state, + common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result); + virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); + // Append values to the end of the node group, resizing it if necessary + common::offset_t appendValues( + common::node_group_idx_t nodeGroupIdx, const uint8_t* data, common::offset_t numValues); + protected: virtual void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector); @@ -96,16 +112,27 @@ class Column { void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx, const std::function& func); - virtual void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom); + virtual void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, + common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); + virtual void writeValue( + const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, const uint8_t* data); + + // Produces a page cursor for the offset relative to the given node group + PageElementCursor getPageCursorForOffsetInGroup( + common::offset_t nodeOffset, const ReadState& state); + + ReadState getReadState( + transaction::TransactionType transactionType, common::node_group_idx_t nodeGroupIdx) const; + // Produces a page cursor for the absolute node offset PageElementCursor getPageCursorForOffset( transaction::TransactionType transactionType, common::offset_t nodeOffset); WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset); private: static bool containsVarList(common::LogicalType& dataType); - bool canCommitInPlace(common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk); + virtual bool canCommitInPlace( + common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk); void commitLocalChunkInPlace(LocalVectorCollection* localChunk); void commitLocalChunkOutOfPlace(common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk, bool isNewNodeGroup); @@ -135,6 +162,7 @@ class Column { std::unique_ptr nullColumn; read_values_to_vector_func_t readToVectorFunc; write_values_from_vector_func_t writeFromVectorFunc; + write_values_func_t writeFunc; read_values_to_page_func_t readToPageFunc; batch_lookup_func_t batchLookupFunc; RWPropertyStats propertyStatistics; diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index 9182a2a43e..8a166df909 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -2,6 +2,7 @@ #include +#include "common/constants.h" #include "common/types/types.h" #include "common/vector/value_vector.h" #include "storage/buffer_manager/bm_file_handle.h" @@ -13,36 +14,16 @@ namespace storage { class NullColumnChunk; class CompressionAlg; -struct BaseColumnChunkMetadata { +struct ColumnChunkMetadata { common::page_idx_t pageIdx; common::page_idx_t numPages; - - BaseColumnChunkMetadata() - : BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0 /* numPages */} {} - BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages) - : pageIdx(pageIdx), numPages(numPages) {} - virtual ~BaseColumnChunkMetadata() = default; -}; - -struct ColumnChunkMetadata : public BaseColumnChunkMetadata { uint64_t numValues; CompressionMetadata compMeta; - ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {} + ColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0}, numValues{UINT64_MAX} {} ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages, - uint64_t numNodesInChunk, CompressionMetadata compMeta) - : BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk), - compMeta(compMeta) {} -}; - -struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata { - common::offset_t lastOffsetInPage; - - OverflowColumnChunkMetadata() - : BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {} - OverflowColumnChunkMetadata( - common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage) - : BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {} + uint64_t numNodesInChunk, const CompressionMetadata& compMeta) + : pageIdx(pageIdx), numPages(numPages), numValues(numNodesInChunk), compMeta(compMeta) {} }; // Base data segment covers all fixed-sized data types. @@ -120,8 +101,6 @@ class ColumnChunk { private: uint64_t getBufferSize() const; - // Returns the size of the data type in bytes - static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType); protected: common::LogicalType dataType; diff --git a/src/include/storage/store/string_column.h b/src/include/storage/store/string_column.h index cf676893b4..a3e2bac8c5 100644 --- a/src/include/storage/store/string_column.h +++ b/src/include/storage/store/string_column.h @@ -7,6 +7,8 @@ namespace storage { class StringColumn : public Column { public: + using string_offset_t = uint64_t; + using string_index_t = uint32_t; StringColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats propertyStatistics); @@ -18,12 +20,11 @@ class StringColumn : public Column { void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx) final; - void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, - uint32_t posInVectorToWriteFrom) final; + void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, + common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final; - inline InMemDiskArray* getOverflowMetadataDA() { - return overflowMetadataDA.get(); - } + inline Column* getDataColumn() { return dataColumn.get(); } + inline Column* getOffsetColumn() { return offsetColumn.get(); } void checkpointInMemory() final; void rollbackInMemory() final; @@ -31,15 +32,36 @@ class StringColumn : public Column { protected: void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final; + void scanUnfiltered(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, + common::offset_t endOffsetInGroup, common::ValueVector* resultVector, + uint64_t startPosInVector = 0); + void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, + common::offset_t startOffsetInGroup, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); + void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final; + void scanValueToVector(transaction::Transaction* transaction, const ReadState& dataState, + uint64_t startOffset, uint64_t endOffset, common::ValueVector* resultVector, + uint64_t offsetInVector); + void scanOffsets(transaction::Transaction* transaction, const ReadState& state, + uint64_t* offsets, uint64_t index, uint64_t dataSize); + private: void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr, common::ValueVector* resultVector, common::page_idx_t overflowPageIdx); + bool canCommitInPlace( + common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk) final; private: - std::unique_ptr> overflowMetadataDA; + // Main column stores indices of values in the dictionary + // The offset column stores the offsets for each index, and the data column stores the data in + // order. Values are never removed from the dictionary during in-place updates, only appended to + // the end. + std::unique_ptr dataColumn; + std::unique_ptr offsetColumn; }; } // namespace storage diff --git a/src/include/storage/store/string_column_chunk.h b/src/include/storage/store/string_column_chunk.h index 466d2add54..bb4ad63560 100644 --- a/src/include/storage/store/string_column_chunk.h +++ b/src/include/storage/store/string_column_chunk.h @@ -1,12 +1,16 @@ #pragma once -#include "storage/storage_structure/in_mem_file.h" +#include "common/assert.h" +#include "common/exception/not_implemented.h" #include "storage/store/column_chunk.h" namespace kuzu { namespace storage { class StringColumnChunk : public ColumnChunk { + using string_offset_t = uint64_t; + using string_index_t = uint32_t; + public: explicit StringColumnChunk(common::LogicalType dataType, uint64_t capacity); @@ -24,10 +28,19 @@ class StringColumnChunk : public ColumnChunk { KU_UNREACHABLE; } - common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx); + uint64_t getStringLength(common::offset_t pos) const { + auto index = ColumnChunk::getValue(pos); + if (index + 1 < offsetChunk->getNumValues()) { + KU_ASSERT(offsetChunk->getValue(index + 1) >= + offsetChunk->getValue(index)); + return offsetChunk->getValue(index + 1) - + offsetChunk->getValue(index); + } + return stringDataChunk->getNumValues() - offsetChunk->getValue(index); + } - inline InMemOverflowFile* getOverflowFile() { return overflowFile.get(); } - inline common::offset_t getLastOffsetInPage() const { return overflowCursor.offsetInPage; } + ColumnChunk* getDataChunk() { return stringDataChunk.get(); } + ColumnChunk* getOffsetChunk() { return offsetChunk.get(); } private: void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk, @@ -36,13 +49,17 @@ class StringColumnChunk : public ColumnChunk { void setValueFromString(const char* value, uint64_t length, uint64_t pos); private: - std::unique_ptr overflowFile; - PageByteCursor overflowCursor; + // String data is stored as a UINT8 chunk, using the numValues in the chunk to track the number + // of characters stored. + std::unique_ptr stringDataChunk; + std::unique_ptr offsetChunk; }; // STRING template<> std::string StringColumnChunk::getValue(common::offset_t pos) const; +template<> +std::string_view StringColumnChunk::getValue(common::offset_t pos) const; } // namespace storage } // namespace kuzu diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index c3860a8d73..74894de399 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -1,5 +1,7 @@ #include "processor/operator/persistent/copy_node.h" +#include + #include "common/exception/copy.h" #include "common/exception/message.h" #include "common/string_format.h" @@ -94,23 +96,24 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, void CopyNode::populatePKIndex( PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) { checkNonNullConstraint(chunk->getNullChunk(), numNodes); - std::string errorPKValueStr; + std::optional errorPKValueStr; pkIndex->lock(); try { switch (chunk->getDataType().getPhysicalType()) { case PhysicalTypeID::INT64: { auto numAppendedNodes = appendToPKIndex(pkIndex, chunk, startOffset, numNodes); if (numAppendedNodes < numNodes) { - errorPKValueStr = - std::to_string(chunk->getValue(startOffset + numAppendedNodes)); + // TODO(bmwinger): This should be tested where there are multiple node groups + errorPKValueStr = std::to_string(chunk->getValue(numAppendedNodes)); } } break; case PhysicalTypeID::STRING: { auto numAppendedNodes = - appendToPKIndex(pkIndex, chunk, startOffset, numNodes); + appendToPKIndex(pkIndex, chunk, startOffset, numNodes); if (numAppendedNodes < numNodes) { + // TODO(bmwinger): This should be tested where there are multiple node groups errorPKValueStr = - chunk->getValue(startOffset + numAppendedNodes).getAsString(); + static_cast(chunk)->getValue(numAppendedNodes); } } break; default: { @@ -122,8 +125,8 @@ void CopyNode::populatePKIndex( throw; } pkIndex->unlock(); - if (!errorPKValueStr.empty()) { - throw CopyException(ExceptionMessage::existedPKException(errorPKValueStr)); + if (errorPKValueStr) { + throw CopyException(ExceptionMessage::existedPKException(*errorPKValueStr)); } } @@ -168,7 +171,7 @@ uint64_t CopyNode::appendToPKIndex( } template<> -uint64_t CopyNode::appendToPKIndex( +uint64_t CopyNode::appendToPKIndex( PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { auto stringColumnChunk = (StringColumnChunk*)chunk; for (auto i = 0u; i < numValues; i++) { diff --git a/src/storage/compression/compression.cpp b/src/storage/compression/compression.cpp index 2fedfc36d1..38549b494e 100644 --- a/src/storage/compression/compression.cpp +++ b/src/storage/compression/compression.cpp @@ -1,5 +1,6 @@ #include "storage/compression/compression.h" +#include #include #include "common/exception/not_implemented.h" @@ -20,19 +21,18 @@ namespace storage { uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) { using namespace common; switch (dataType.getLogicalTypeID()) { + case LogicalTypeID::SERIAL: case LogicalTypeID::STRUCT: { return 0; } - case LogicalTypeID::STRING: { - return sizeof(ku_string_t); + case LogicalTypeID::STRING: + case LogicalTypeID::BLOB: { + return sizeof(uint32_t); } case LogicalTypeID::VAR_LIST: case LogicalTypeID::INTERNAL_ID: { return sizeof(offset_t); } - case LogicalTypeID::SERIAL: { - return sizeof(int64_t); - } default: { auto size = StorageUtils::getDataTypeSize(dataType); KU_ASSERT(size <= BufferPoolConstants::PAGE_4KB_SIZE); @@ -57,7 +57,7 @@ bool CompressionMetadata::canAlwaysUpdateInPlace() const { } } bool CompressionMetadata::canUpdateInPlace( - const ValueVector& vector, uint32_t pos, PhysicalTypeID physicalType) const { + const uint8_t* data, uint32_t pos, PhysicalTypeID physicalType) const { if (canAlwaysUpdateInPlace()) { return true; } @@ -69,45 +69,46 @@ bool CompressionMetadata::canUpdateInPlace( case CompressionType::INTEGER_BITPACKING: { switch (physicalType) { case PhysicalTypeID::INT64: { - auto value = vector.getValue(pos); + auto value = ((int64_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::INT32: { - auto value = vector.getValue(pos); + auto value = ((int32_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::INT16: { - auto value = vector.getValue(pos); + auto value = ((int16_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::INT8: { - auto value = vector.getValue(pos); + auto value = ((int8_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::UINT64: { - auto value = vector.getValue(pos); + auto value = ((uint64_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { - auto value = vector.getValue(pos); + auto value = ((uint32_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::UINT16: { - auto value = vector.getValue(pos); + auto value = ((uint16_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::UINT8: { - auto value = vector.getValue(pos); + auto value = ((uint8_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } default: { throw common::StorageException( @@ -143,6 +144,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, const LogicalType& da case PhysicalTypeID::UINT64: return IntegerBitpacking::numValues( pageSize, BitpackHeader::readHeader(data)); + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: return IntegerBitpacking::numValues( pageSize, BitpackHeader::readHeader(data)); @@ -263,25 +265,34 @@ void fastpack(const T* in, uint8_t* out, uint8_t bitWidth) { } template -void IntegerBitpacking::setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, - uint8_t* dstBuffer, common::offset_t posInDst, const CompressionMetadata& metadata) const { +void IntegerBitpacking::setValuesFromUncompressed(const uint8_t* srcBuffer, offset_t posInSrc, + uint8_t* dstBuffer, offset_t posInDst, offset_t numValues, + const CompressionMetadata& metadata) const { auto header = BitpackHeader::readHeader(metadata.data); // This is a fairly naive implementation which uses fastunpack/fastpack // to modify the data by decompressing/compressing a single chunk of values. // - // TODO(bmwinger): modify the data in-place + // TODO(bmwinger): modify the data in-place when numValues is small (frequently called with + // numValues=1) // // Data can be considered to be stored in aligned chunks of 32 values // with a size of 32 * bitWidth bits, // or bitWidth 32-bit values (we cast the buffer to a uint32_t* later). auto chunkStart = (uint8_t*)getChunkStart(dstBuffer, posInDst, header.bitWidth); - auto posInChunk = posInDst % CHUNK_SIZE; - auto value = ((T*)srcBuffer)[posInSrc]; - KU_ASSERT(canUpdateInPlace(value, header)); - + auto startPosInChunk = posInDst % CHUNK_SIZE; U chunk[CHUNK_SIZE]; fastunpack(chunkStart, chunk, header.bitWidth); - chunk[posInChunk] = (U)(value - (T)header.offset); + for (offset_t i = 0; i < numValues; i++) { + auto value = ((T*)srcBuffer)[posInSrc]; + KU_ASSERT(canUpdateInPlace(value, header)); + chunk[startPosInChunk + i] = (U)(value - (T)header.offset); + if (startPosInChunk + i + 1 >= CHUNK_SIZE && i + 1 < numValues) { + fastpack(chunk, chunkStart, header.bitWidth); + chunkStart = (uint8_t*)getChunkStart(dstBuffer, posInDst + i + 1, header.bitWidth); + fastunpack(chunkStart, chunk, header.bitWidth); + startPosInChunk = 0; + } + } fastpack(chunk, chunkStart, header.bitWidth); } @@ -419,10 +430,13 @@ template class IntegerBitpacking; template class IntegerBitpacking; template class IntegerBitpacking; -void BooleanBitpacking::setValueFromUncompressed(uint8_t* srcBuffer, offset_t posInSrc, - uint8_t* dstBuffer, offset_t posInDst, const CompressionMetadata& /*metadata*/) const { - auto val = ((bool*)srcBuffer)[posInSrc]; - common::NullMask::setNull((uint64_t*)dstBuffer, posInDst, val); +void BooleanBitpacking::setValuesFromUncompressed(const uint8_t* srcBuffer, offset_t srcOffset, + uint8_t* dstBuffer, offset_t dstOffset, offset_t numValues, + const CompressionMetadata& /*metadata*/) const { + for (auto i = 0; i < numValues; i++) { + common::NullMask::setNull( + (uint64_t*)dstBuffer, dstOffset + i, ((bool*)srcBuffer)[srcOffset + i]); + } } void BooleanBitpacking::getValue(const uint8_t* buffer, offset_t posInBuffer, uint8_t* dst, @@ -501,6 +515,7 @@ void ReadCompressedValuesFromPageToVector::operator()(uint8_t* frame, PageElemen return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(), posInVector, numValuesToRead, metadata); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(), posInVector, numValuesToRead, metadata); @@ -555,6 +570,7 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor& return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata); @@ -579,46 +595,48 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor& } } -void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame, - common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) { +void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame, + const uint8_t* data, offset_t dataOffset, offset_t numValues, + const CompressionMetadata& metadata) { switch (metadata.compression) { case CompressionType::UNCOMPRESSED: - return uncompressed.setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return uncompressed.setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); case CompressionType::INTEGER_BITPACKING: { switch (physicalType) { case PhysicalTypeID::INT64: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT32: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT16: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::INT8: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::UINT64: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT16: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } case PhysicalTypeID::UINT8: { - return IntegerBitpacking().setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return IntegerBitpacking().setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } default: { throw NotImplementedException("INTEGER_BITPACKING is not implemented for type " + @@ -627,10 +645,15 @@ void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame, } } case CompressionType::BOOLEAN_BITPACKING: - return booleanBitpacking.setValueFromUncompressed( - vector->getData(), posInVector, frame, posInFrame, metadata); + return booleanBitpacking.setValuesFromUncompressed( + data, dataOffset, frame, posInFrame, numValues, metadata); } } +void WriteCompressedValueToPageFromVector::operator()(uint8_t* frame, uint16_t posInFrame, + common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) { + writeFunc(frame, posInFrame, vector->getData(), posInVector, 1, metadata); +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index 0d8dbf8d91..64ddc3980b 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -101,10 +101,14 @@ std::unique_ptr TablesStatistics::createMetadataDAHInfo( createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal)); } break; case PhysicalTypeID::STRING: { - auto childMetadataDAHInfo = std::make_unique(); - childMetadataDAHInfo->dataDAHPageIdx = - InMemDiskArray::addDAHPageToFile(metadataFH, bm, wal); - metadataDAHInfo->childrenInfos.push_back(std::move(childMetadataDAHInfo)); + auto dataMetadataDAHInfo = std::make_unique(); + auto offsetMetadataDAHInfo = std::make_unique(); + dataMetadataDAHInfo->dataDAHPageIdx = + InMemDiskArray::addDAHPageToFile(metadataFH, bm, wal); + offsetMetadataDAHInfo->dataDAHPageIdx = + InMemDiskArray::addDAHPageToFile(metadataFH, bm, wal); + metadataDAHInfo->childrenInfos.push_back(std::move(dataMetadataDAHInfo)); + metadataDAHInfo->childrenInfos.push_back(std::move(offsetMetadataDAHInfo)); } break; default: { // DO NOTHING. diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 2ca0550416..2bd137a211 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -528,13 +528,11 @@ template class BaseDiskArray>; template class BaseDiskArray>; template class BaseDiskArray; template class BaseDiskArray; -template class BaseDiskArray; template class BaseInMemDiskArray; template class BaseInMemDiskArray>; template class BaseInMemDiskArray>; template class BaseInMemDiskArray; template class BaseInMemDiskArray; -template class BaseInMemDiskArray; template class InMemDiskArrayBuilder; template class InMemDiskArrayBuilder>; template class InMemDiskArrayBuilder>; @@ -545,7 +543,6 @@ template class InMemDiskArray>; template class InMemDiskArray>; template class InMemDiskArray; template class InMemDiskArray; -template class InMemDiskArray; } // namespace storage } // namespace kuzu diff --git a/src/storage/store/column.cpp b/src/storage/store/column.cpp index 3fd1b4fcee..71310f84a7 100644 --- a/src/storage/store/column.cpp +++ b/src/storage/store/column.cpp @@ -30,12 +30,21 @@ struct InternalIDColumnFunc { } } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID); auto internalID = vector->getValue(posInVector); memcpy(frame + posInFrame * sizeof(offset_t), &internalID.offset, sizeof(offset_t)); } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + uint32_t dataOffset, offset_t numValues, const CompressionMetadata& /*metadata*/) { + auto internalIDs = ((internalID_t*)data); + for (int i = 0; i < numValues; i++) { + memcpy(frame + posInFrame * sizeof(offset_t), &internalIDs[dataOffset + i].offset, + sizeof(offset_t)); + } + } }; struct NullColumnFunc { @@ -49,12 +58,19 @@ struct NullColumnFunc { (uint64_t*)frame, pageCursor.elemPosInPage, posInVector, numValuesToRead); } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. // Otherwise, it could read off the end of the page. - NullMask::setNull( - (uint64_t*)frame, posInFrame, NullMask::isNull(vector->getNullMaskData(), posInVector)); + NullMask::setNull((uint64_t*)frame, posInFrame, vector->isNull(posInVector)); + } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + offset_t dataOffset, offset_t numValues, const CompressionMetadata& /*metadata*/) { + // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. + // Otherwise, it could read off the end of the page. + NullMask::copyNullMask( + (const uint64_t*)data, dataOffset, (uint64_t*)frame, posInFrame, numValues); } }; @@ -73,13 +89,21 @@ struct BoolColumnFunc { } } - static void writeValueToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, + static void writeValueToPageFromVector(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, uint32_t posInVector, const CompressionMetadata& /*metadata*/) { // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. // Otherwise, it could read/write off the end of the page. - NullMask::copyNullMask(vector->getValue(posInVector) ? &NullMask::ALL_NULL_ENTRY : - &NullMask::NO_NULL_ENTRY, - posInVector, (uint64_t*)frame, posInFrame, 1); + NullMask::setNull((uint64_t*)frame, posInFrame, vector->getValue(posInVector)); + } + + static void writeValuesToPage(uint8_t* frame, uint16_t posInFrame, const uint8_t* data, + offset_t dataOffset, offset_t numValues, const CompressionMetadata& /*metadata*/) { + // Casting to uint64_t should be safe as long as the page size is a multiple of 8 bytes. + // Otherwise, it could read/write off the end of the page. + for (offset_t i = 0; i < numValues; i++) { + NullMask::setNull( + (uint64_t*)frame, posInFrame, NullMask::isNull((uint64_t*)data, dataOffset)); + } } static void copyValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result, @@ -110,7 +134,7 @@ static read_values_to_vector_func_t getReadValuesToVectorFunc(const LogicalType& } } -static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& logicalType) { +static read_values_to_page_func_t getReadValuesToPageFunc(const LogicalType& logicalType) { switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::BOOL: return BoolColumnFunc::copyValuesFromPage; @@ -119,15 +143,25 @@ static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& lo } } -static write_values_from_vector_func_t getWriteValuesFromVectorFunc( - const LogicalType& logicalType) { +static write_values_from_vector_func_t getWriteValueFromVectorFunc(const LogicalType& logicalType) { + switch (logicalType.getLogicalTypeID()) { + case LogicalTypeID::INTERNAL_ID: + return InternalIDColumnFunc::writeValueToPageFromVector; + case LogicalTypeID::BOOL: + return BoolColumnFunc::writeValueToPageFromVector; + default: + return WriteCompressedValueToPageFromVector(logicalType); + } +} + +static write_values_func_t getWriteValuesFunc(const LogicalType& logicalType) { switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::INTERNAL_ID: - return InternalIDColumnFunc::writeValueToPage; + return InternalIDColumnFunc::writeValuesToPage; case LogicalTypeID::BOOL: - return BoolColumnFunc::writeValueToPage; + return BoolColumnFunc::writeValuesToPage; default: - return WriteCompressedValueToPage(logicalType); + return WriteCompressedValuesToPage(logicalType); } } @@ -156,7 +190,8 @@ class NullColumn : public Column { metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression, false /*requireNullColumn*/} { readToVectorFunc = NullColumnFunc::readValuesFromPageToVector; - writeFromVectorFunc = NullColumnFunc::writeValueToPage; + writeFromVectorFunc = NullColumnFunc::writeValueToPageFromVector; + writeFunc = NullColumnFunc::writeValuesToPage; } void scan( @@ -226,7 +261,9 @@ class NullColumn : public Column { void write(offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final { - writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + writeValue(chunkMeta, nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); if (vectorToWriteFrom->isNull(posInVectorToWriteFrom)) { propertyStatistics.setHasNull(DUMMY_WRITE_TRANSACTION); } @@ -295,9 +332,10 @@ Column::Column(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BM transaction); numBytesPerFixedSizedValue = getDataTypeSizeInChunk(this->dataType); readToVectorFunc = getReadValuesToVectorFunc(this->dataType); - readToPageFunc = getWriteValuesToPageFunc(this->dataType); + readToPageFunc = getReadValuesToPageFunc(this->dataType); batchLookupFunc = getBatchLookupFromPageFunc(this->dataType); - writeFromVectorFunc = getWriteValuesFromVectorFunc(this->dataType); + writeFromVectorFunc = getWriteValueFromVectorFunc(this->dataType); + writeFunc = getWriteValuesFunc(this->dataType); KU_ASSERT(numBytesPerFixedSizedValue <= BufferPoolConstants::PAGE_4KB_SIZE); if (requireNullColumn) { nullColumn = std::make_unique(metaDAHeaderInfo.nullDAHPageIdx, dataFH, @@ -333,13 +371,11 @@ void Column::scan(transaction::Transaction* transaction, node_group_idx_t nodeGr nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); } - auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); - auto pageCursor = PageUtils::getPageElementCursorForPos(startOffsetInGroup, - chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); - pageCursor.pageIdx += chunkMeta.pageIdx; + auto state = getReadState(transaction->getType(), nodeGroupIdx); + auto pageCursor = getPageCursorForOffsetInGroup(startOffsetInGroup, state); auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; scanUnfiltered( - transaction, pageCursor, numValuesToScan, resultVector, chunkMeta, offsetInVector); + transaction, pageCursor, numValuesToScan, resultVector, state.metadata, offsetInVector); } void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { @@ -350,6 +386,10 @@ void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { columnChunk->setNumValues(0); } else { auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + // Make sure that the chunk is large enough + if (chunkMetadata.numValues > columnChunk->getCapacity()) { + columnChunk->resize(std::bit_ceil(chunkMetadata.numValues)); + } auto cursor = PageElementCursor(chunkMetadata.pageIdx, 0); uint64_t numValuesPerPage = chunkMetadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType); @@ -373,6 +413,7 @@ void Column::scanInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { auto startNodeOffset = nodeIDVector->readNodeOffset(0); KU_ASSERT(startNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); + // TODO: replace with state auto cursor = getPageCursorForOffset(transaction->getType(), startNodeOffset); auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); @@ -491,17 +532,20 @@ void Column::write( nullColumn->write(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); } bool isNull = vectorToWriteFrom->isNull(posInVectorToWriteFrom); - if (isNull) { - return; + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); + if (!isNull) { + writeValue(chunkMeta, nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + } + if (nodeOffset >= chunkMeta.numValues) { + chunkMeta.numValues = nodeOffset + 1; + metadataDA->update(nodeGroupIdx, chunkMeta); } - writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); } -void Column::writeValue( - offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { +void Column::writeValue(const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, + ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto walPageInfo = createWALVersionOfPageForValue(nodeOffset); - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE); KU_ASSERT( chunkMeta.pageIdx <= walPageInfo.originalPageIdx < chunkMeta.pageIdx + chunkMeta.numPages); try { @@ -516,6 +560,20 @@ void Column::writeValue( dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); } +void Column::writeValue( + const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, const uint8_t* data) { + auto walPageInfo = createWALVersionOfPageForValue(nodeOffset); + try { + writeFunc(walPageInfo.frame, walPageInfo.posInPage, data, 0, 1, chunkMeta.compMeta); + } catch (Exception& e) { + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + throw; + } + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); +} + WALPageIdxPosInPageAndFrame Column::createWALVersionOfPageForValue(offset_t nodeOffset) { auto originalPageCursor = getPageCursorForOffset(TransactionType::WRITE, nodeOffset); bool insertingNewPage = false; @@ -529,6 +587,12 @@ WALPageIdxPosInPageAndFrame Column::createWALVersionOfPageForValue(offset_t node return {walPageIdxAndFrame, originalPageCursor.elemPosInPage}; } +ReadState Column::getReadState( + TransactionType transactionType, node_group_idx_t nodeGroupIdx) const { + auto metadata = metadataDA->get(nodeGroupIdx, transactionType); + return {metadata, metadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)}; +} + void Column::setNull(offset_t nodeOffset) { if (nullColumn) { nullColumn->setNull(nodeOffset); @@ -591,7 +655,7 @@ bool Column::canCommitInPlace(node_group_idx_t nodeGroupIdx, LocalVectorCollecti auto localVector = localChunk->getLocalVector(rowIdx); auto offsetInVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); if (!metadata.compMeta.canUpdateInPlace( - *localVector->getVector(), offsetInVector, dataType.getPhysicalType())) { + localVector->getVector()->getData(), offsetInVector, dataType.getPhysicalType())) { return false; } } @@ -688,12 +752,9 @@ void Column::populateWithDefaultVal(const Property& property, Column* column, PageElementCursor Column::getPageCursorForOffset( TransactionType transactionType, offset_t nodeOffset) { auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto state = getReadState(transactionType, nodeGroupIdx); auto offsetInNodeGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - auto chunkMeta = metadataDA->get(nodeGroupIdx, transactionType); - auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, - chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); - pageCursor.pageIdx += chunkMeta.pageIdx; - return pageCursor; + return getPageCursorForOffsetInGroup(offsetInNodeGroup, state); } std::unique_ptr ColumnFactory::createColumn(const LogicalType& dataType, @@ -750,5 +811,70 @@ std::unique_ptr ColumnFactory::createColumn(const LogicalType& dataType, } } +PageElementCursor Column::getPageCursorForOffsetInGroup( + common::offset_t nodeOffset, const ReadState& state) { + auto pageCursor = PageUtils::getPageElementCursorForPos(nodeOffset, state.numValuesPerPage); + pageCursor.pageIdx += state.metadata.pageIdx; + return pageCursor; +} + +void Column::scan(Transaction* transaction, const ReadState& state, offset_t startOffsetInGroup, + offset_t endOffsetInGroup, uint8_t* result) { + auto cursor = getPageCursorForOffsetInGroup(startOffsetInGroup, state); + auto numValuesToScan = endOffsetInGroup - startOffsetInGroup; + uint64_t numValuesScanned = 0; + while (numValuesScanned < numValuesToScan) { + uint64_t numValuesToScanInPage = + std::min((uint64_t)state.numValuesPerPage - cursor.elemPosInPage, + numValuesToScan - numValuesScanned); + readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void { + readToPageFunc(frame, cursor, result, numValuesScanned, numValuesToScanInPage, + state.metadata.compMeta); + }); + numValuesScanned += numValuesToScanInPage; + cursor.nextPage(); + } +} + +offset_t Column::appendValues( + node_group_idx_t nodeGroupIdx, const uint8_t* data, offset_t numValues) { + auto state = getReadState(TransactionType::WRITE, nodeGroupIdx); + auto startOffset = state.metadata.numValues; + auto cursor = getPageCursorForOffsetInGroup(startOffset, state); + offset_t valuesWritten = 0; + while (valuesWritten < numValues) { + bool insertingNewPage = false; + if (cursor.pageIdx >= dataFH->getNumPages()) { + KU_ASSERT(cursor.pageIdx == dataFH->getNumPages()); + DBFileUtils::insertNewPage(*dataFH, dbFileID, *bufferManager, *wal); + insertingNewPage = true; + } + + uint64_t numValuesToWriteInPage = std::min( + (uint64_t)state.numValuesPerPage - cursor.elemPosInPage, numValues - valuesWritten); + auto walPageInfo = DBFileUtils::createWALVersionIfNecessaryAndPinPage( + cursor.pageIdx, insertingNewPage, *dataFH, dbFileID, *bufferManager, *wal); + + try { + writeFunc(walPageInfo.frame, cursor.elemPosInPage, data, 0, numValuesToWriteInPage, + state.metadata.compMeta); + } catch (Exception& e) { + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + throw; + } + bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL); + dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx); + valuesWritten += numValuesToWriteInPage; + if (valuesWritten < numValues) { + cursor.nextPage(); + state.metadata.numPages++; + } + } + state.metadata.numValues += numValues; + metadataDA->update(nodeGroupIdx, state.metadata); + return startOffset; +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index 0566604fff..6752f9b782 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -1,5 +1,7 @@ #include "storage/store/column_chunk.h" +#include "storage/compression/compression.h" +#include "storage/storage_utils.h" #include "storage/store/string_column_chunk.h" #include "storage/store/struct_column_chunk.h" #include "storage/store/var_list_column_chunk.h" @@ -57,10 +59,21 @@ class CompressedFlushBuffer { } else { valuesRemaining -= numValuesPerPage; } + if (compressedSize < BufferPoolConstants::PAGE_4KB_SIZE) { + memset(compressedBuffer.get() + compressedSize, 0, + BufferPoolConstants::PAGE_4KB_SIZE - compressedSize); + } FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), compressedSize, (startPageIdx + numPages) * BufferPoolConstants::PAGE_4KB_SIZE); numPages++; } while (valuesRemaining > 0); + // Make sure that the file is the right length + if (numPages < metadata.numPages) { + memset(compressedBuffer.get(), 0, BufferPoolConstants::PAGE_4KB_SIZE); + FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), + BufferPoolConstants::PAGE_4KB_SIZE, + (startPageIdx + metadata.numPages - 1) * BufferPoolConstants::PAGE_4KB_SIZE); + } return ColumnChunkMetadata( startPageIdx, metadata.numPages, metadata.numValues, metadata.compMeta); } @@ -107,6 +120,7 @@ static std::shared_ptr getCompression( case PhysicalTypeID::UINT64: { return std::make_shared>(); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return std::make_shared>(); } @@ -152,6 +166,7 @@ void ColumnChunk::initializeFunction(bool enableCompression) { getMetadataFunction = booleanGetMetadata; break; } + case PhysicalTypeID::STRING: case PhysicalTypeID::INT64: case PhysicalTypeID::INT32: case PhysicalTypeID::INT16: @@ -306,6 +321,7 @@ void ColumnChunk::copyVectorToBuffer(ValueVector* vector, offset_t startPosInChu } ColumnChunkMetadata ColumnChunk::getMetadataToFlush() const { + KU_ASSERT(numValues <= capacity); return getMetadataFunction(buffer.get(), bufferSize, capacity, numValues); } @@ -314,30 +330,6 @@ ColumnChunkMetadata ColumnChunk::flushBuffer( return flushBufferFunction(buffer.get(), bufferSize, dataFH, startPageIdx, metadata); } -uint32_t ColumnChunk::getDataTypeSizeInChunk(LogicalType& dataType) { - switch (dataType.getLogicalTypeID()) { - case LogicalTypeID::SERIAL: - case LogicalTypeID::STRUCT: { - return 0; - } - case LogicalTypeID::STRING: { - return sizeof(ku_string_t); - } - case LogicalTypeID::VAR_LIST: - case LogicalTypeID::INTERNAL_ID: { - return sizeof(offset_t); - } - // Used by NodeColumn to represent the de-compressed size of booleans in-memory - // Does not reflect the size of booleans on-disk in BoolColumnChunk - case LogicalTypeID::BOOL: { - return 1; - } - default: { - return StorageUtils::getDataTypeSize(dataType); - } - } -} - uint64_t ColumnChunk::getBufferSize() const { switch (dataType.getLogicalTypeID()) { case LogicalTypeID::BOOL: { diff --git a/src/storage/store/string_column.cpp b/src/storage/store/string_column.cpp index 9ca62810ec..a0e1388ee8 100644 --- a/src/storage/store/string_column.cpp +++ b/src/storage/store/string_column.cpp @@ -1,6 +1,5 @@ #include "storage/store/string_column.h" -#include "common/type_utils.h" #include "storage/store/string_column_chunk.h" using namespace kuzu::catalog; @@ -10,29 +9,43 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -struct StringColumnFunc { - static void writeStringValuesToPage(uint8_t* frame, uint16_t posInFrame, ValueVector* vector, - uint32_t posInVector, const CompressionMetadata& /*metadata*/) { - auto kuStrInFrame = (ku_string_t*)(frame + (posInFrame * sizeof(ku_string_t))); - auto kuStrInVector = vector->getValue(posInVector); - memcpy(kuStrInFrame->prefix, kuStrInVector.prefix, - std::min((uint64_t)kuStrInVector.len, ku_string_t::SHORT_STR_LENGTH)); - kuStrInFrame->len = kuStrInVector.len; - kuStrInFrame->overflowPtr = kuStrInVector.overflowPtr; - } -}; - StringColumn::StringColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats stats) : Column{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, - transaction, stats, false /* enableCompression */, true /* requireNullColumn */} { - if (this->dataType.getLogicalTypeID() == LogicalTypeID::STRING) { - writeFromVectorFunc = StringColumnFunc::writeStringValuesToPage; + transaction, stats, true /* enableCompression */, true /* requireNullColumn */} { + dataColumn = std::make_unique(LogicalType(LogicalTypeID::UINT8), + *metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction, + stats, false, false /*requireNullColumn*/); + offsetColumn = std::make_unique(LogicalType(LogicalTypeID::UINT64), + *metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction, + stats, enableCompression, false /*requireNullColumn*/); +} + +void StringColumn::scanOffsets(Transaction* transaction, const ReadState& state, + string_offset_t* offsets, uint64_t index, uint64_t dataSize) { + // We either need to read the next value, or store the maximum string offset at the end. + // Otherwise we won't know what the length of the last string is. + if (index < state.metadata.numValues - 1) { + offsetColumn->scan(transaction, state, index, index + 2, (uint8_t*)offsets); + } else { + offsetColumn->scan(transaction, state, index, index + 1, (uint8_t*)offsets); + offsets[1] = dataSize; + } +} + +void StringColumn::scanValueToVector(Transaction* transaction, const ReadState& dataState, + string_offset_t startOffset, string_offset_t endOffset, ValueVector* resultVector, + uint64_t offsetInVector) { + KU_ASSERT(endOffset >= startOffset); + // Add string to vector first and read directly into the vector + auto& kuString = + StringVector::reserveString(resultVector, offsetInVector, endOffset - startOffset); + dataColumn->scan(transaction, dataState, startOffset, endOffset, (uint8_t*)kuString.getData()); + // Update prefix to match the scanned string data + if (!ku_string_t::isShortString(kuString.len)) { + memcpy(kuString.prefix, kuString.getData(), ku_string_t::PREFIX_LENGTH); } - overflowMetadataDA = std::make_unique>(*metadataFH, - DBFileID::newMetadataFileID(), metaDAHeaderInfo.childrenInfos[0]->dataDAHPageIdx, - bufferManager, wal, transaction); } void StringColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, @@ -40,74 +53,50 @@ void StringColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx, uint64_t offsetInVector) { nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - Column::scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, + scanUnfiltered(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector, offsetInVector); - auto numValuesToRead = endOffsetInGroup - startOffsetInGroup; - auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; - for (auto i = 0u; i < numValuesToRead; i++) { - auto pos = offsetInVector + i; - if (resultVector->isNull(pos)) { - continue; - } - readStringValueFromOvf( - transaction, resultVector->getValue(pos), resultVector, overflowPageIdx); - } } void StringColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) { Column::scan(nodeGroupIdx, columnChunk); auto stringColumnChunk = reinterpret_cast(columnChunk); - auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto inMemOverflowFile = stringColumnChunk->getOverflowFile(); - inMemOverflowFile->addNewPages(overflowMetadata.numPages); - for (auto i = 0u; i < overflowMetadata.numPages; i++) { - auto pageIdx = overflowMetadata.pageIdx + i; - FileUtils::readFromFile(dataFH->getFileInfo(), inMemOverflowFile->getPage(i)->data, - BufferPoolConstants::PAGE_4KB_SIZE, pageIdx * BufferPoolConstants::PAGE_4KB_SIZE); - } + dataColumn->scan(nodeGroupIdx, stringColumnChunk->getDataChunk()); + offsetColumn->scan(nodeGroupIdx, stringColumnChunk->getOffsetChunk()); } void StringColumn::append(ColumnChunk* columnChunk, node_group_idx_t nodeGroupIdx) { Column::append(columnChunk, nodeGroupIdx); auto stringColumnChunk = reinterpret_cast(columnChunk); - auto startPageIdx = dataFH->addNewPages(stringColumnChunk->getOverflowFile()->getNumPages()); - auto numPagesForOverflow = stringColumnChunk->flushOverflowBuffer(dataFH, startPageIdx); - overflowMetadataDA->resize(nodeGroupIdx + 1); - overflowMetadataDA->update( - nodeGroupIdx, OverflowColumnChunkMetadata{startPageIdx, numPagesForOverflow, - stringColumnChunk->getLastOffsetInPage()}); + dataColumn->append(stringColumnChunk->getDataChunk(), nodeGroupIdx); + offsetColumn->append(stringColumnChunk->getOffsetChunk(), nodeGroupIdx); } -void StringColumn::writeValue( - offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { +void StringColumn::writeValue(const ColumnChunkMetadata& chunkMeta, offset_t nodeOffset, + ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto& kuStr = vectorToWriteFrom->getValue(posInVectorToWriteFrom); - if (!ku_string_t::isShortString(kuStr.len)) { - auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE); - auto overflowPageIdxInChunk = overflowMetadata.numPages - 1; - auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage( - overflowMetadata.pageIdx + overflowPageIdxInChunk, false /* insertingNewPage */, - *dataFH, dbFileID, *bufferManager, *wal); - memcpy(walPageIdxAndFrame.frame + overflowMetadata.lastOffsetInPage, - reinterpret_cast(kuStr.overflowPtr), kuStr.len); - bufferManager->unpin(*wal->fileHandle, walPageIdxAndFrame.pageIdxInWAL); - dataFH->releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx); - TypeUtils::encodeOverflowPtr( - kuStr.overflowPtr, overflowPageIdxInChunk, overflowMetadata.lastOffsetInPage); - overflowMetadata.lastOffsetInPage += kuStr.len; - overflowMetadataDA->update(nodeGroupIdx, overflowMetadata); - } - Column::writeValue(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + // Write string data to end of dataColumn + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); + auto startOffset = + dataColumn->appendValues(nodeGroupIdx, (const uint8_t*)kuStr.getData(), kuStr.len); + + // Write offset + string_index_t index = + offsetColumn->appendValues(nodeGroupIdx, (const uint8_t*)&startOffset, 1); + + // Write index to main column + Column::writeValue(chunkMeta, nodeOffset, (uint8_t*)&index); } void StringColumn::checkpointInMemory() { Column::checkpointInMemory(); - overflowMetadataDA->checkpointInMemoryIfNecessary(); + dataColumn->checkpointInMemory(); + offsetColumn->checkpointInMemory(); } void StringColumn::rollbackInMemory() { Column::rollbackInMemory(); - overflowMetadataDA->rollbackInMemoryIfNecessary(); + dataColumn->rollbackInMemory(); + offsetColumn->rollbackInMemory(); } void StringColumn::scanInternal( @@ -116,15 +105,56 @@ void StringColumn::scanInternal( auto startNodeOffset = nodeIDVector->readNodeOffset(0); KU_ASSERT(startNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); - Column::scanInternal(transaction, nodeIDVector, resultVector); - auto overflowPageIdx = overflowMetadataDA->get(nodeGroupIdx, transaction->getType()).pageIdx; + auto startOffsetInGroup = + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + if (nodeIDVector->state->selVector->isUnfiltered()) { + scanUnfiltered(transaction, nodeGroupIdx, startOffsetInGroup, + startOffsetInGroup + nodeIDVector->state->selVector->selectedSize, resultVector); + } else { + scanFiltered(transaction, nodeGroupIdx, startOffsetInGroup, nodeIDVector, resultVector); + } +} + +void StringColumn::scanUnfiltered(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, offset_t startOffsetInGroup, offset_t endOffsetInGroup, + common::ValueVector* resultVector, uint64_t startPosInVector) { + auto numValuesToRead = endOffsetInGroup - startOffsetInGroup; + auto indices = std::make_unique(numValuesToRead); + auto indexState = getReadState(transaction->getType(), nodeGroupIdx); + auto offsetState = offsetColumn->getReadState(transaction->getType(), nodeGroupIdx); + auto dataState = dataColumn->getReadState(transaction->getType(), nodeGroupIdx); + Column::scan( + transaction, indexState, startOffsetInGroup, endOffsetInGroup, (uint8_t*)indices.get()); + for (auto i = 0u; i < numValuesToRead; i++) { + if (resultVector->isNull(startPosInVector + i)) { + continue; + } + string_offset_t offsets[2]; + scanOffsets(transaction, offsetState, offsets, indices[i], dataState.metadata.numValues); + scanValueToVector( + transaction, dataState, offsets[0], offsets[1], resultVector, startPosInVector + i); + } +} + +void StringColumn::scanFiltered(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup, + common::ValueVector* nodeIDVector, common::ValueVector* resultVector) { + + auto indexState = getReadState(transaction->getType(), nodeGroupIdx); + auto offsetState = offsetColumn->getReadState(transaction->getType(), nodeGroupIdx); + auto dataState = dataColumn->getReadState(transaction->getType(), nodeGroupIdx); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; if (resultVector->isNull(pos)) { + // Ignore positions which were scanned as null continue; } - readStringValueFromOvf( - transaction, resultVector->getValue(pos), resultVector, overflowPageIdx); + auto offsetInGroup = startOffsetInGroup + pos; + string_index_t index; + Column::scan(transaction, indexState, offsetInGroup, offsetInGroup + 1, (uint8_t*)&index); + string_offset_t offsets[2]; + scanOffsets(transaction, offsetState, offsets, index, dataState.metadata.numValues); + scanValueToVector(transaction, dataState, offsets[0], offsets[1], resultVector, pos); } } @@ -132,34 +162,90 @@ void StringColumn::lookupInternal( Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { KU_ASSERT(dataType.getPhysicalType() == PhysicalTypeID::STRING); auto startNodeOffset = nodeIDVector->readNodeOffset(0); - auto overflowPageIdx = - overflowMetadataDA - ->get(StorageUtils::getNodeGroupIdx(startNodeOffset), transaction->getType()) - .pageIdx; - Column::lookupInternal(transaction, nodeIDVector, resultVector); + auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset); + + auto indexState = getReadState(transaction->getType(), nodeGroupIdx); + auto offsetState = offsetColumn->getReadState(transaction->getType(), nodeGroupIdx); + auto dataState = dataColumn->getReadState(transaction->getType(), nodeGroupIdx); for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = resultVector->state->selVector->selectedPositions[i]; - if (!resultVector->isNull(pos)) { - readStringValueFromOvf(transaction, resultVector->getValue(pos), - resultVector, overflowPageIdx); + if (resultVector->isNull(pos)) { + // Ignore positions which were scanned as null + continue; } + string_offset_t offsets[2]; + auto offsetInGroup = + startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx) + pos; + string_index_t index; + Column::scan(transaction, indexState, offsetInGroup, offsetInGroup + 1, (uint8_t*)&index); + scanOffsets(transaction, offsetState, offsets, index, dataState.metadata.numValues); + scanValueToVector(transaction, dataState, offsets[0], offsets[1], resultVector, pos); } } -void StringColumn::readStringValueFromOvf(Transaction* transaction, ku_string_t& kuStr, - ValueVector* resultVector, page_idx_t overflowPageIdx) { - if (ku_string_t::isShortString(kuStr.len)) { - return; +bool StringColumn::canCommitInPlace( + node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk) { + std::vector rowIdxesToRead; + for (auto& [nodeOffset, rowIdx] : localChunk->getUpdateInfoRef()) { + rowIdxesToRead.push_back(rowIdx); + } + for (auto& [nodeOffset, rowIdx] : localChunk->getInsertInfoRef()) { + rowIdxesToRead.push_back(rowIdx); + } + std::sort(rowIdxesToRead.begin(), rowIdxesToRead.end()); + auto totalStringLengthToAdd = 0u; + uint64_t newStrings = rowIdxesToRead.size(); + + for (auto rowIdx : rowIdxesToRead) { + auto localVector = localChunk->getLocalVector(rowIdx); + auto offsetInVector = rowIdx & (DEFAULT_VECTOR_CAPACITY - 1); + + auto kuStr = localVector->getVector()->getValue(offsetInVector); + totalStringLengthToAdd += kuStr.len; + } + + // Make sure there is sufficient space in the data chunk (not currently compressed) + auto dataColumnMetadata = getDataColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto totalStringDataAfterUpdate = dataColumnMetadata.numValues + totalStringLengthToAdd; + if (totalStringDataAfterUpdate > + dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE) { + // Data cannot be updated in place + return false; + } + + auto indexColumnMetadata = getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto offsetColumnMetadata = + getOffsetColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); + auto offsetCapacity = + offsetColumnMetadata.compMeta.numValues( + BufferPoolConstants::PAGE_4KB_SIZE, getOffsetColumn()->getDataType()) * + offsetColumnMetadata.numPages; + + auto totalStringsAfterUpdate = offsetColumnMetadata.numValues + newStrings; + + // Check if indices can be updated in-place + if (totalStringsAfterUpdate > offsetCapacity || + // If the index column can store the largest new index in-place + !indexColumnMetadata.compMeta.canUpdateInPlace( + (const uint8_t*)&totalStringsAfterUpdate, 0, getDataType().getPhysicalType())) { + return false; + } + // Check if offsets can be updated in-place + // Indices are limited to 32 bits but in theory could be larger than that since the offset + // column can grow beyond the node group size. + // + // E.g. one big string is written first, followed by NODE_GROUP_SIZE-1 small strings, + // which are all updated in-place many times (which may fit if the first string is large + // enough that 2^n minus the first string's size is large enough to fit the other strings, + // for some n. + // 32 bits should give plenty of space for updates. + if (offsetColumnMetadata.numValues + newStrings > + std::numeric_limits::max() || + !offsetColumnMetadata.compMeta.canUpdateInPlace( + (const uint8_t*)&totalStringDataAfterUpdate, 0, getDataType().getPhysicalType())) { + return false; } - PageByteCursor cursor; - TypeUtils::decodeOverflowPtr(kuStr.overflowPtr, cursor.pageIdx, cursor.offsetInPage); - cursor.pageIdx += overflowPageIdx; - auto [fileHandleToPin, pageIdxToPin] = DBFileUtils::getFileHandleAndPhysicalPageIdxToPin( - *dataFH, cursor.pageIdx, *wal, transaction->getType()); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) { - StringVector::addString( - resultVector, kuStr, (const char*)(frame + cursor.offsetInPage), kuStr.len); - }); + return true; } } // namespace storage diff --git a/src/storage/store/string_column_chunk.cpp b/src/storage/store/string_column_chunk.cpp index 5458d3397f..ab513bbeff 100644 --- a/src/storage/store/string_column_chunk.cpp +++ b/src/storage/store/string_column_chunk.cpp @@ -1,38 +1,46 @@ #include "storage/store/string_column_chunk.h" -#include "common/type_utils.h" -#include "storage/store/table_copy_utils.h" - using namespace kuzu::common; namespace kuzu { namespace storage { StringColumnChunk::StringColumnChunk(LogicalType dataType, uint64_t capacity) - : ColumnChunk{std::move(dataType), capacity} { - overflowFile = std::make_unique(); - overflowCursor.pageIdx = 0; - overflowCursor.offsetInPage = 0; + : ColumnChunk{std::move(dataType), capacity, true /*enableCompression*/} { + bool enableCompression = true; + // Bitpacking might save 1 bit per value with regular ascii compared to UTF-8 + stringDataChunk = + ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT8), false); + // The offset chunk is able to grow beyond the node group size. + // We rely on appending to the dictionary when updating, however if the chunk is full, + // there will be no space for in-place updates. + // The data chunk doubles in size on use, but out of place updates will never need the offset + // chunk to be greater than the node group size since they remove unused entries. + // So the chunk is initialized with a size equal to 3/4 the node group size, making sure there + // is always extra space for updates. + offsetChunk = ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT64), + enableCompression, StorageConstants::NODE_GROUP_SIZE * 0.75); } void StringColumnChunk::resetToEmpty() { ColumnChunk::resetToEmpty(); - overflowFile = std::make_unique(); - overflowCursor.resetValue(); + stringDataChunk->resetToEmpty(); + offsetChunk->resetToEmpty(); } void StringColumnChunk::append(ValueVector* vector) { KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); - ColumnChunk::copyVectorToBuffer(vector, numValues); - auto stringsToSetOverflow = (ku_string_t*)(buffer.get() + numValues * numBytesPerValue); for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { - auto& stringToSet = stringsToSetOverflow[i]; - if (!ku_string_t::isShortString(stringToSet.len)) { - overflowFile->copyStringOverflow( - overflowCursor, reinterpret_cast(stringToSet.overflowPtr), &stringToSet); + // index is stored in main chunk, data is stored in the data chunk + auto pos = vector->state->selVector->selectedPositions[i]; + nullChunk->setNull(numValues, vector->isNull(pos)); + if (vector->isNull(pos)) { + numValues++; + continue; } + auto kuString = vector->getValue(pos); + setValueFromString(kuString.getAsString().c_str(), kuString.len, numValues); } - numValues += vector->state->selVector->selectedSize; } void StringColumnChunk::append( @@ -48,7 +56,6 @@ void StringColumnChunk::append( KU_UNREACHABLE; } } - numValues += numValuesToAppend; } void StringColumnChunk::write( @@ -71,55 +78,63 @@ void StringColumnChunk::write(ValueVector* valueVector, ValueVector* offsetInChu auto offsetInChunk = offsets[offsetInChunkVector->state->selVector->selectedPositions[i]]; auto offsetInVector = valueVector->state->selVector->selectedPositions[i]; nullChunk->setNull(offsetInChunk, valueVector->isNull(offsetInVector)); + if (offsetInChunk >= numValues) { + numValues = offsetInChunk + 1; + } if (!valueVector->isNull(offsetInVector)) { auto kuStr = valueVector->getValue(offsetInVector); - setValueFromString(kuStr.getAsString().c_str(), kuStr.len, offsetInChunk); + setValueFromString((const char*)kuStr.getData(), kuStr.len, offsetInChunk); } } } -page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) { - for (auto i = 0u; i < overflowFile->getNumPages(); i++) { - FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data, - BufferPoolConstants::PAGE_4KB_SIZE, startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE); - startPageIdx++; - } - return overflowFile->getNumPages(); -} - void StringColumnChunk::appendStringColumnChunk( StringColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { - auto otherKuVals = reinterpret_cast(other->buffer.get()); - auto kuVals = reinterpret_cast(buffer.get()); + auto indices = reinterpret_cast(buffer.get()); for (auto i = 0u; i < numValuesToAppend; i++) { - auto posInChunk = i + numValues; + auto posInChunk = numValues; auto posInOtherChunk = i + startPosInOtherChunk; - kuVals[posInChunk] = otherKuVals[posInOtherChunk]; - if (other->nullChunk->isNull(posInOtherChunk) || - otherKuVals[posInOtherChunk].len <= ku_string_t::SHORT_STR_LENGTH) { + if (nullChunk->isNull(posInChunk)) { + indices[posInChunk] = 0; + numValues++; continue; } - PageByteCursor cursorToCopyFrom; - TypeUtils::decodeOverflowPtr(otherKuVals[posInOtherChunk].overflowPtr, - cursorToCopyFrom.pageIdx, cursorToCopyFrom.offsetInPage); - overflowFile->copyStringOverflow(overflowCursor, - other->overflowFile->getPage(cursorToCopyFrom.pageIdx)->data + - cursorToCopyFrom.offsetInPage, - &kuVals[posInChunk]); + auto stringInOtherChunk = other->getValue(posInOtherChunk); + setValueFromString(stringInOtherChunk.data(), stringInOtherChunk.size(), posInChunk); } } void StringColumnChunk::setValueFromString(const char* value, uint64_t length, uint64_t pos) { - TableCopyUtils::validateStrLen(length); - auto val = overflowFile->copyString(value, length, overflowCursor); - setValue(val, pos); + auto space = stringDataChunk->getCapacity() - stringDataChunk->getNumValues(); + if (length > space) { + stringDataChunk->resize(std::bit_ceil(stringDataChunk->getCapacity() + length)); + } + if (pos >= numValues) { + numValues = pos + 1; + } + auto startOffset = stringDataChunk->getNumValues(); + memcpy(stringDataChunk->getData() + startOffset, value, length); + stringDataChunk->setNumValues(startOffset + length); + auto index = offsetChunk->getNumValues(); + + if (index >= offsetChunk->getCapacity()) { + offsetChunk->resize(offsetChunk->getCapacity() * 2); + } + offsetChunk->setValue(startOffset, index); + offsetChunk->setNumValues(index + 1); + ColumnChunk::setValue(index, pos); } // STRING template<> +std::string_view StringColumnChunk::getValue(offset_t pos) const { + auto index = ColumnChunk::getValue(pos); + auto offset = offsetChunk->getValue(index); + return std::string_view((const char*)stringDataChunk->getData() + offset, getStringLength(pos)); +} +template<> std::string StringColumnChunk::getValue(offset_t pos) const { - auto kuStr = ((ku_string_t*)buffer.get())[pos]; - return overflowFile->readString(&kuStr); + return std::string(getValue(pos)); } } // namespace storage diff --git a/src/storage/store/var_list_column.cpp b/src/storage/store/var_list_column.cpp index 55313b78bc..4b897fe838 100644 --- a/src/storage/store/var_list_column.cpp +++ b/src/storage/store/var_list_column.cpp @@ -145,17 +145,16 @@ void VarListColumn::rollbackInMemory() { offset_t VarListColumn::readOffset( Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) { - auto offsetVector = std::make_unique(LogicalTypeID::INT64); - offsetVector->state = DataChunkState::getSingleValueDataChunkState(); auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType()); auto pageCursor = PageUtils::getPageElementCursorForPos(offsetInNodeGroup, chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType)); pageCursor.pageIdx += chunkMeta.pageIdx; + offset_t value; readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { - readToVectorFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */, + readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */, 1 /* numValuesToRead */, chunkMeta.compMeta); }); - return offsetVector->getValue(0); + return value; } ListOffsetInfoInStorage VarListColumn::getListOffsetInfoInStorage(Transaction* transaction, diff --git a/test/storage/compression_test.cpp b/test/storage/compression_test.cpp index 4c889bffa1..078292701a 100644 --- a/test/storage/compression_test.cpp +++ b/test/storage/compression_test.cpp @@ -23,7 +23,7 @@ void test_compression(CompressionAlg& alg, std::vector src) { EXPECT_EQ(src, decompressed); // works with all bit widths (but not all offsets) T value = 0; - alg.setValueFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, metadata); + alg.setValuesFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, 1, metadata); alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata); src[1] = value; EXPECT_EQ(decompressed, src); diff --git a/test/test_files/tinysnb/call/call.test b/test/test_files/tinysnb/call/call.test index c1b534b65d..d0afa6decb 100644 --- a/test/test_files/tinysnb/call/call.test +++ b/test/test_files/tinysnb/call/call.test @@ -144,7 +144,7 @@ height -LOG ReturnDBVersion -STATEMENT CALL db_version() RETURN version ---- 1 -v0.0.12.1 +v0.0.12.2 -LOG ReturnTableConnection -STATEMENT CALL show_connection('knows') RETURN *