diff --git a/src/include/storage/compression/compression.h b/src/include/storage/compression/compression.h index 91ab83b0798..3db122236cd 100644 --- a/src/include/storage/compression/compression.h +++ b/src/include/storage/compression/compression.h @@ -42,7 +42,7 @@ struct CompressionMetadata { // Returns true if and only if the provided value within the vector can be updated // in this chunk in-place. bool canUpdateInPlace( - const common::ValueVector& vector, uint32_t pos, common::PhysicalTypeID physicalType) const; + const uint8_t* data, uint32_t pos, common::PhysicalTypeID physicalType) const; bool canAlwaysUpdateInPlace() const; }; diff --git a/src/storage/compression/compression.cpp b/src/storage/compression/compression.cpp index 545b462de7b..38549b494e7 100644 --- a/src/storage/compression/compression.cpp +++ b/src/storage/compression/compression.cpp @@ -57,7 +57,7 @@ bool CompressionMetadata::canAlwaysUpdateInPlace() const { } } bool CompressionMetadata::canUpdateInPlace( - const ValueVector& vector, uint32_t pos, PhysicalTypeID physicalType) const { + const uint8_t* data, uint32_t pos, PhysicalTypeID physicalType) const { if (canAlwaysUpdateInPlace()) { return true; } @@ -69,45 +69,46 @@ bool CompressionMetadata::canUpdateInPlace( case CompressionType::INTEGER_BITPACKING: { switch (physicalType) { case PhysicalTypeID::INT64: { - auto value = vector.getValue(pos); + auto value = ((int64_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::INT32: { - auto value = vector.getValue(pos); + auto value = ((int32_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::INT16: { - auto value = vector.getValue(pos); + auto value = ((int16_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::INT8: { - auto value = vector.getValue(pos); + auto value = ((int8_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::UINT64: { - auto value = vector.getValue(pos); + auto value = ((uint64_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { - auto value = vector.getValue(pos); + auto value = ((uint32_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::UINT16: { - auto value = vector.getValue(pos); + auto value = ((uint16_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } case PhysicalTypeID::UINT8: { - auto value = vector.getValue(pos); + auto value = ((uint8_t*)data)[pos]; return IntegerBitpacking::canUpdateInPlace( - value, BitpackHeader::readHeader(data)); + value, BitpackHeader::readHeader(this->data)); } default: { throw common::StorageException( @@ -143,6 +144,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, const LogicalType& da case PhysicalTypeID::UINT64: return IntegerBitpacking::numValues( pageSize, BitpackHeader::readHeader(data)); + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: return IntegerBitpacking::numValues( pageSize, BitpackHeader::readHeader(data)); @@ -513,6 +515,7 @@ void ReadCompressedValuesFromPageToVector::operator()(uint8_t* frame, PageElemen return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(), posInVector, numValuesToRead, metadata); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(), posInVector, numValuesToRead, metadata); @@ -567,6 +570,7 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor& return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return IntegerBitpacking().decompressFromPage(frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata); @@ -621,6 +625,7 @@ void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame return IntegerBitpacking().setValuesFromUncompressed( data, dataOffset, frame, posInFrame, numValues, metadata); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return IntegerBitpacking().setValuesFromUncompressed( data, dataOffset, frame, posInFrame, numValues, metadata); diff --git a/src/storage/local_table.cpp b/src/storage/local_table.cpp index 7d35e23ad43..31585349fd3 100644 --- a/src/storage/local_table.cpp +++ b/src/storage/local_table.cpp @@ -184,7 +184,7 @@ void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { auto pos = vector->vector->state->selVector->selectedPositions[i]; KU_ASSERT(vector->validityMask[pos]); if (!metadata.canUpdateInPlace( - *vector->vector, pos, column->getDataType().getPhysicalType())) { + vector->vector->getData(), pos, column->getDataType().getPhysicalType())) { return commitLocalChunkOutOfPlace(nodeGroupIdx, chunk); } } @@ -225,6 +225,7 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { KU_ASSERT(chunks.contains(nodeGroupIdx)); auto localChunk = chunks.at(nodeGroupIdx).get(); auto stringColumn = reinterpret_cast(column); + auto indexColumnMetadata = stringColumn->getMetadata(nodeGroupIdx, TransactionType::WRITE); auto dataColumnMetadata = stringColumn->getDataColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE); auto offsetColumnMetadata = @@ -240,10 +241,19 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { offsetColumnMetadata.compMeta.numValues( BufferPoolConstants::PAGE_4KB_SIZE, stringColumn->getOffsetColumn()->getDataType()) * offsetColumnMetadata.numPages; - // Write in-place as long as there is sufficient space in the data chunk - if (dataColumnMetadata.numValues + totalStringLengthInChunk <= - dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE && - offsetColumnMetadata.numValues + newStrings < offsetCapacity + + auto totalStringsAfterUpdate = offsetColumnMetadata.numValues + newStrings; + auto totalStringDataAfterUpdate = dataColumnMetadata.numValues + totalStringLengthInChunk; + + // Make sure there is sufficient space in the data chunk (not currently compressed) + bool canUpdateDataInPlace = totalStringDataAfterUpdate <= + dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE; + bool canUpdateIndicesInPlace = + totalStringsAfterUpdate < offsetCapacity && + // If the index column can store the largest new index in-place + indexColumnMetadata.compMeta.canUpdateInPlace( + (const uint8_t*)&totalStringsAfterUpdate, 0, column->getDataType().getPhysicalType()); + bool canUpdateOffsetsInPlace = // Indices are limited to 32 bits but in theory could be larger than that since the offset // column can grow beyond the node group size. // @@ -252,8 +262,11 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) { // enough that 2^n minus the first string's size is large enough to fit the other strings, // for some n. // 32 bits should give plenty of space for updates. - && offsetColumnMetadata.numValues + newStrings < - std::numeric_limits::max()) { + offsetColumnMetadata.numValues + newStrings < + std::numeric_limits::max() && + offsetColumnMetadata.compMeta.canUpdateInPlace((const uint8_t*)&totalStringDataAfterUpdate, + 0, column->getDataType().getPhysicalType()); + if (canUpdateDataInPlace && canUpdateIndicesInPlace && canUpdateOffsetsInPlace) { commitLocalChunkInPlace(nodeGroupIdx, localChunk); } else { commitLocalChunkOutOfPlace(nodeGroupIdx, localChunk); diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index dafe231f23d..b11e8606e37 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -120,6 +120,7 @@ static std::shared_ptr getCompression( case PhysicalTypeID::UINT64: { return std::make_shared>(); } + case PhysicalTypeID::STRING: case PhysicalTypeID::UINT32: { return std::make_shared>(); } @@ -165,6 +166,7 @@ void ColumnChunk::initializeFunction(bool enableCompression) { getMetadataFunction = booleanGetMetadata; break; } + case PhysicalTypeID::STRING: case PhysicalTypeID::INT64: case PhysicalTypeID::INT32: case PhysicalTypeID::INT16: diff --git a/src/storage/store/string_column.cpp b/src/storage/store/string_column.cpp index f514240b52d..0becc4ac8f4 100644 --- a/src/storage/store/string_column.cpp +++ b/src/storage/store/string_column.cpp @@ -13,13 +13,10 @@ StringColumn::StringColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHe BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction, RWPropertyStats stats) : Column{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal, - transaction, stats, false /* enableCompression */, true /* requireNullColumn */} { - // TODO(bmwinger): detecting if string child columns must be re-compressed when updating is not - // yet supported - auto enableCompression = false; + transaction, stats, true /* enableCompression */, true /* requireNullColumn */} { dataColumn = std::make_unique(LogicalType(LogicalTypeID::UINT8), *metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction, - stats, enableCompression, false /*requireNullColumn*/); + stats, false, false /*requireNullColumn*/); offsetColumn = std::make_unique(LogicalType(LogicalTypeID::UINT64), *metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction, stats, enableCompression, false /*requireNullColumn*/); diff --git a/src/storage/store/string_column_chunk.cpp b/src/storage/store/string_column_chunk.cpp index 3b2ca4a3a18..2554af3414d 100644 --- a/src/storage/store/string_column_chunk.cpp +++ b/src/storage/store/string_column_chunk.cpp @@ -8,12 +8,11 @@ namespace kuzu { namespace storage { StringColumnChunk::StringColumnChunk(LogicalType dataType, uint64_t capacity) - : ColumnChunk{std::move(dataType), capacity, false /*enableCompression*/} { + : ColumnChunk{std::move(dataType), capacity, true /*enableCompression*/} { + bool enableCompression = true; // Bitpacking might save 1 bit per value with regular ascii compared to UTF-8 - // Detecting when we need to re-compress the child chunks is not currently supported. - bool enableCompression = false; stringDataChunk = - ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT8), enableCompression); + ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT8), false); // The offset chunk is able to grow beyond the node group size. // We rely on appending to the dictionary when updating, however if the chunk is full, // there will be no space for in-place updates.