Skip to content

Commit

Permalink
Compress string indices and offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Nov 13, 2023
1 parent ec84643 commit 45535e4
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/include/storage/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct CompressionMetadata {
// Returns true if and only if the provided value within the vector can be updated
// in this chunk in-place.
bool canUpdateInPlace(
const common::ValueVector& vector, uint32_t pos, common::PhysicalTypeID physicalType) const;
const uint8_t* data, uint32_t pos, common::PhysicalTypeID physicalType) const;
bool canAlwaysUpdateInPlace() const;
};

Expand Down
39 changes: 22 additions & 17 deletions src/storage/compression/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ bool CompressionMetadata::canAlwaysUpdateInPlace() const {
}
}
bool CompressionMetadata::canUpdateInPlace(
const ValueVector& vector, uint32_t pos, PhysicalTypeID physicalType) const {
const uint8_t* data, uint32_t pos, PhysicalTypeID physicalType) const {
if (canAlwaysUpdateInPlace()) {
return true;
}
Expand All @@ -69,45 +69,46 @@ bool CompressionMetadata::canUpdateInPlace(
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
case PhysicalTypeID::INT64: {
auto value = vector.getValue<int64_t>(pos);
auto value = ((int64_t*)data)[pos];
return IntegerBitpacking<int64_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::INT32: {
auto value = vector.getValue<int32_t>(pos);
auto value = ((int32_t*)data)[pos];
return IntegerBitpacking<int32_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::INT16: {
auto value = vector.getValue<int16_t>(pos);
auto value = ((int16_t*)data)[pos];
return IntegerBitpacking<int16_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::INT8: {
auto value = vector.getValue<int8_t>(pos);
auto value = ((int8_t*)data)[pos];
return IntegerBitpacking<int8_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
auto value = vector.getValue<uint64_t>(pos);
auto value = ((uint64_t*)data)[pos];
return IntegerBitpacking<uint64_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::STRING:
case PhysicalTypeID::UINT32: {
auto value = vector.getValue<uint32_t>(pos);
auto value = ((uint32_t*)data)[pos];
return IntegerBitpacking<uint32_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::UINT16: {
auto value = vector.getValue<uint16_t>(pos);
auto value = ((uint16_t*)data)[pos];
return IntegerBitpacking<uint16_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::UINT8: {
auto value = vector.getValue<uint8_t>(pos);
auto value = ((uint8_t*)data)[pos];
return IntegerBitpacking<uint8_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(data));
value, BitpackHeader::readHeader(this->data));
}
default: {
throw common::StorageException(
Expand Down Expand Up @@ -143,6 +144,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, const LogicalType& da
case PhysicalTypeID::UINT64:
return IntegerBitpacking<uint64_t>::numValues(
pageSize, BitpackHeader::readHeader(data));
case PhysicalTypeID::STRING:
case PhysicalTypeID::UINT32:
return IntegerBitpacking<uint32_t>::numValues(
pageSize, BitpackHeader::readHeader(data));
Expand Down Expand Up @@ -513,6 +515,7 @@ void ReadCompressedValuesFromPageToVector::operator()(uint8_t* frame, PageElemen
return IntegerBitpacking<uint64_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
}
case PhysicalTypeID::STRING:
case PhysicalTypeID::UINT32: {
return IntegerBitpacking<uint32_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
Expand Down Expand Up @@ -567,6 +570,7 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor&
return IntegerBitpacking<uint64_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
result, startPosInResult, numValuesToRead, metadata);
}
case PhysicalTypeID::STRING:
case PhysicalTypeID::UINT32: {
return IntegerBitpacking<uint32_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
result, startPosInResult, numValuesToRead, metadata);
Expand Down Expand Up @@ -621,6 +625,7 @@ void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame
return IntegerBitpacking<uint64_t>().setValuesFromUncompressed(
data, dataOffset, frame, posInFrame, numValues, metadata);
}
case PhysicalTypeID::STRING:
case PhysicalTypeID::UINT32: {
return IntegerBitpacking<uint32_t>().setValuesFromUncompressed(
data, dataOffset, frame, posInFrame, numValues, metadata);
Expand Down
27 changes: 20 additions & 7 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
auto pos = vector->vector->state->selVector->selectedPositions[i];
KU_ASSERT(vector->validityMask[pos]);
if (!metadata.canUpdateInPlace(
*vector->vector, pos, column->getDataType().getPhysicalType())) {
vector->vector->getData(), pos, column->getDataType().getPhysicalType())) {
return commitLocalChunkOutOfPlace(nodeGroupIdx, chunk);
}
}
Expand Down Expand Up @@ -225,6 +225,7 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
KU_ASSERT(chunks.contains(nodeGroupIdx));
auto localChunk = chunks.at(nodeGroupIdx).get();
auto stringColumn = reinterpret_cast<StringColumn*>(column);
auto indexColumnMetadata = stringColumn->getMetadata(nodeGroupIdx, TransactionType::WRITE);
auto dataColumnMetadata =
stringColumn->getDataColumn()->getMetadata(nodeGroupIdx, TransactionType::WRITE);
auto offsetColumnMetadata =
Expand All @@ -240,10 +241,19 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
offsetColumnMetadata.compMeta.numValues(
BufferPoolConstants::PAGE_4KB_SIZE, stringColumn->getOffsetColumn()->getDataType()) *
offsetColumnMetadata.numPages;
// Write in-place as long as there is sufficient space in the data chunk
if (dataColumnMetadata.numValues + totalStringLengthInChunk <=
dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE &&
offsetColumnMetadata.numValues + newStrings < offsetCapacity

auto totalStringsAfterUpdate = offsetColumnMetadata.numValues + newStrings;
auto totalStringDataAfterUpdate = dataColumnMetadata.numValues + totalStringLengthInChunk;

// Make sure there is sufficient space in the data chunk (not currently compressed)
bool canUpdateDataInPlace = totalStringDataAfterUpdate <=
dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE;
bool canUpdateIndicesInPlace =
totalStringsAfterUpdate < offsetCapacity &&
// If the index column can store the largest new index in-place
indexColumnMetadata.compMeta.canUpdateInPlace(
(const uint8_t*)&totalStringsAfterUpdate, 0, column->getDataType().getPhysicalType());
bool canUpdateOffsetsInPlace =
// Indices are limited to 32 bits but in theory could be larger than that since the offset
// column can grow beyond the node group size.
//
Expand All @@ -252,8 +262,11 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
// enough that 2^n minus the first string's size is large enough to fit the other strings,
// for some n.
// 32 bits should give plenty of space for updates.
&& offsetColumnMetadata.numValues + newStrings <
std::numeric_limits<StringColumn::string_index_t>::max()) {
offsetColumnMetadata.numValues + newStrings <
std::numeric_limits<StringColumn::string_index_t>::max() &&
offsetColumnMetadata.compMeta.canUpdateInPlace((const uint8_t*)&totalStringDataAfterUpdate,
0, column->getDataType().getPhysicalType());
if (canUpdateDataInPlace && canUpdateIndicesInPlace && canUpdateOffsetsInPlace) {
commitLocalChunkInPlace(nodeGroupIdx, localChunk);
} else {
commitLocalChunkOutOfPlace(nodeGroupIdx, localChunk);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ static std::shared_ptr<CompressionAlg> getCompression(
case PhysicalTypeID::UINT64: {
return std::make_shared<IntegerBitpacking<uint64_t>>();
}
case PhysicalTypeID::STRING:
case PhysicalTypeID::UINT32: {
return std::make_shared<IntegerBitpacking<uint32_t>>();
}
Expand Down Expand Up @@ -165,6 +166,7 @@ void ColumnChunk::initializeFunction(bool enableCompression) {
getMetadataFunction = booleanGetMetadata;
break;
}
case PhysicalTypeID::STRING:
case PhysicalTypeID::INT64:
case PhysicalTypeID::INT32:
case PhysicalTypeID::INT16:
Expand Down
7 changes: 2 additions & 5 deletions src/storage/store/string_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ StringColumn::StringColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHe
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats)
: Column{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
transaction, stats, false /* enableCompression */, true /* requireNullColumn */} {
// TODO(bmwinger): detecting if string child columns must be re-compressed when updating is not
// yet supported
auto enableCompression = false;
transaction, stats, true /* enableCompression */, true /* requireNullColumn */} {
dataColumn = std::make_unique<AuxiliaryColumn>(LogicalType(LogicalTypeID::UINT8),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction,
stats, enableCompression, false /*requireNullColumn*/);
stats, false, false /*requireNullColumn*/);
offsetColumn = std::make_unique<AuxiliaryColumn>(LogicalType(LogicalTypeID::UINT64),
*metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction,
stats, enableCompression, false /*requireNullColumn*/);
Expand Down
7 changes: 3 additions & 4 deletions src/storage/store/string_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ namespace kuzu {
namespace storage {

StringColumnChunk::StringColumnChunk(LogicalType dataType, uint64_t capacity)
: ColumnChunk{std::move(dataType), capacity, false /*enableCompression*/} {
: ColumnChunk{std::move(dataType), capacity, true /*enableCompression*/} {
bool enableCompression = true;
// Bitpacking might save 1 bit per value with regular ascii compared to UTF-8
// Detecting when we need to re-compress the child chunks is not currently supported.
bool enableCompression = false;
stringDataChunk =
ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT8), enableCompression);
ColumnChunkFactory::createColumnChunk(LogicalType(LogicalTypeID::UINT8), false);
// The offset chunk is able to grow beyond the node group size.
// We rely on appending to the dictionary when updating, however if the chunk is full,
// there will be no space for in-place updates.
Expand Down

0 comments on commit 45535e4

Please sign in to comment.