Skip to content

Commit

Permalink
Merge pull request #2140 from kuzudb/fixed-delta
Browse files Browse the repository at this point in the history
Frame of reference encoding
  • Loading branch information
benjaminwinger committed Oct 6, 2023
2 parents eb9ad9a + 717b2d9 commit 2a19754
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 52 deletions.
32 changes: 22 additions & 10 deletions src/include/storage/store/compression.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <array>
#include <cstdint>
#include <limits>

Expand Down Expand Up @@ -27,11 +28,12 @@ enum class CompressionType : uint8_t {
};

struct CompressionMetadata {
static constexpr uint8_t DATA_SIZE = 9;
CompressionType compression;
// Extra data to be used to store codec-specific information
uint8_t data;
explicit CompressionMetadata(
CompressionType compression = CompressionType::UNCOMPRESSED, uint8_t data = 0)
std::array<uint8_t, DATA_SIZE> data;
explicit CompressionMetadata(CompressionType compression = CompressionType::UNCOMPRESSED,
std::array<uint8_t, DATA_SIZE> data = {})
: compression{compression}, data{data} {}

// Returns the number of values which will be stored in the given data size
Expand Down Expand Up @@ -149,20 +151,30 @@ class Uncompressed : public CompressionAlg {
const uint32_t numBytesPerValue;
};

// Six bits are needed for the bit width (fewer for smaller types, but the header byte is the same
// for simplicity) One bit (the eighth) is needed to indicate if there are negative values The
// seventh bit is unused
// Serialized as nine bytes.
// In the first byte:
// Six bits are needed for the bit width (fewer for smaller types, but the header byte is the same
// for simplicity)
// One bit (the eighth) is needed to indicate if there are negative values
// The seventh bit is unused
// The eight remaining bytes are used for the offset
struct BitpackHeader {
uint8_t bitWidth;
bool hasNegative;
// Offset to apply to all values
// Stored as unsigned, but for signed variants of IntegerBitpacking
// it gets cast to a signed type on use, letting it also store negative offsets
uint64_t offset;
static const uint8_t NEGATIVE_FLAG = 0b10000000;
static const uint8_t BITWIDTH_MASK = 0b01111111;

uint8_t getDataByte() const;
std::array<uint8_t, CompressionMetadata::DATA_SIZE> getData() const;

static BitpackHeader readHeader(uint8_t data);
static BitpackHeader readHeader(
const std::array<uint8_t, CompressionMetadata::DATA_SIZE>& data);
};

// Augmented with Frame of Reference encoding using an offset stored in the compression metadata
template<typename T>
class IntegerBitpacking : public CompressionAlg {
using U = std::make_unsigned_t<T>;
Expand Down Expand Up @@ -198,7 +210,7 @@ class IntegerBitpacking : public CompressionAlg {
CompressionMetadata getCompressionMetadata(
const uint8_t* srcBuffer, uint64_t numValues) const override {
auto header = getBitWidth(srcBuffer, numValues);
CompressionMetadata metadata{CompressionType::INTEGER_BITPACKING, header.getDataByte()};
CompressionMetadata metadata{CompressionType::INTEGER_BITPACKING, header.getData()};
return metadata;
}

Expand Down Expand Up @@ -239,7 +251,7 @@ class BooleanBitpacking : public CompressionAlg {

inline CompressionMetadata getCompressionMetadata(
const uint8_t* srcBuffer, uint64_t numValues) const override {
return CompressionMetadata{CompressionType::BOOLEAN_BITPACKING, 0};
return CompressionMetadata{CompressionType::BOOLEAN_BITPACKING};
}
uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize,
Expand Down
120 changes: 86 additions & 34 deletions src/storage/store/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,31 +188,49 @@ static inline T abs(typename std::enable_if<std::is_signed<T>::value, T>::type v
template<typename T>
BitpackHeader IntegerBitpacking<T>::getBitWidth(
const uint8_t* srcBuffer, uint64_t numValues) const {
U max = 0ull;
auto hasNegative = false;
T max = std::numeric_limits<T>::min(), min = std::numeric_limits<T>::max();
for (int i = 0; i < numValues; i++) {
T value = ((T*)srcBuffer)[i];
U absolute = abs<T>(value);
if (absolute > max) {
max = absolute;
}
if (value < 0) {
hasNegative = true;
}
}
if (hasNegative) {
// Needs an extra bit for two's complement encoding
return BitpackHeader{static_cast<uint8_t>(std::bit_width(max) + 1), true /*hasNegative*/};
if (value > max) {
max = value;
}
if (value < min) {
min = value;
}
}
bool hasNegative;
uint64_t offset = 0;
uint8_t bitWidth;
// Frame of reference encoding is only used when values are either all positive or all negative,
// and when we will save at least 1 bit per value.
if (min > 0 && max > 0 && std::bit_width((U)(max - min)) < std::bit_width((U)max)) {
offset = min;
bitWidth = static_cast<uint8_t>(std::bit_width((U)(max - min)));
hasNegative = false;
} else if (min < 0 && max < 0 && std::bit_width((U)(min - max)) < std::bit_width((U)max)) {
offset = (U)max;
bitWidth = static_cast<uint8_t>(std::bit_width((U)(min - max))) + 1;
// This is somewhat suboptimal since we know that the values are all negative
// We could use an offset equal to the minimum, but values which are all negative are
// probably going to grow in the negative direction, leading to many re-compressions when
// inserting
hasNegative = true;
} else if (min < 0) {
bitWidth = static_cast<uint8_t>(std::bit_width((U)std::max(abs<T>(min), abs<T>(max)))) + 1;
hasNegative = true;
} else {
return BitpackHeader{static_cast<uint8_t>(std::bit_width(max)), false /*hasNegative*/};
bitWidth = static_cast<uint8_t>(std::bit_width((U)std::max(abs<T>(min), abs<T>(max))));
hasNegative = false;
}
return BitpackHeader{bitWidth, hasNegative, offset};
}

template<typename T>
bool IntegerBitpacking<T>::canUpdateInPlace(T value, const BitpackHeader& header) {
T adjustedValue = value - (T)header.offset;
// If there are negatives, the effective bit width is smaller
auto valueSize = std::bit_width((U)abs<T>(value));
if (!header.hasNegative && value < 0) {
auto valueSize = std::bit_width((U)abs<T>(adjustedValue));
if (!header.hasNegative && adjustedValue < 0) {
return false;
}
if ((header.hasNegative && valueSize > header.bitWidth - 1) ||
Expand Down Expand Up @@ -269,7 +287,7 @@ void IntegerBitpacking<T>::setValueFromUncompressed(uint8_t* srcBuffer, common::

U chunk[CHUNK_SIZE];
fastunpack(chunkStart, chunk, header.bitWidth);
chunk[posInChunk] = (U)value;
chunk[posInChunk] = (U)(value - (T)header.offset);
fastpack(chunk, chunkStart, header.bitWidth);
}

Expand All @@ -284,6 +302,11 @@ void IntegerBitpacking<T>::getValues(const uint8_t* chunkStart, uint8_t pos, uin
if (header.hasNegative) {
SignExtend<T, U, CHUNK_SIZE>((uint8_t*)chunk, header.bitWidth);
}
if (header.offset != 0) {
for (int i = pos; i < pos + numValuesToRead; i++) {
chunk[i] = (U)((T)chunk[i] + (T)header.offset);
}
}
memcpy(dst, &chunk[pos], sizeof(T) * numValuesToRead);
}

Expand Down Expand Up @@ -313,17 +336,38 @@ uint64_t IntegerBitpacking<T>::compressNextPage(const uint8_t*& srcBuffer,
assert(dstBufferSize >= sizeToCompress);
// This might overflow the source buffer if there are fewer values remaining than the chunk size
// so we stop at the end of the last full chunk and use a temporary array to avoid overflow.
auto lastFullChunkEnd = numValuesToCompress - numValuesToCompress % CHUNK_SIZE;
for (auto i = 0ull; i < lastFullChunkEnd; i += CHUNK_SIZE) {
fastpack((const U*)srcBuffer + i, dstBuffer + i * bitWidth / 8, bitWidth);
}
// Pack last partial chunk, avoiding overflows
if (numValuesToCompress % CHUNK_SIZE > 0) {
// TODO(bmwinger): optimize to remove temporary array
U chunk[CHUNK_SIZE] = {0};
memcpy(chunk, (const U*)srcBuffer + lastFullChunkEnd,
numValuesToCompress % CHUNK_SIZE * sizeof(U));
fastpack(chunk, dstBuffer + lastFullChunkEnd * bitWidth / 8, bitWidth);
if (header.offset == 0) {
auto lastFullChunkEnd = numValuesToCompress - numValuesToCompress % CHUNK_SIZE;
for (auto i = 0ull; i < lastFullChunkEnd; i += CHUNK_SIZE) {
fastpack((const U*)srcBuffer + i, dstBuffer + i * bitWidth / 8, bitWidth);
}
// Pack last partial chunk, avoiding overflows
if (numValuesToCompress % CHUNK_SIZE > 0) {
// TODO(bmwinger): optimize to remove temporary array
U chunk[CHUNK_SIZE] = {0};
memcpy(chunk, (const U*)srcBuffer + lastFullChunkEnd,
numValuesToCompress % CHUNK_SIZE * sizeof(U));
fastpack(chunk, dstBuffer + lastFullChunkEnd * bitWidth / 8, bitWidth);
}
} else {
U tmp[CHUNK_SIZE];
auto lastFullChunkEnd = numValuesToCompress - numValuesToCompress % CHUNK_SIZE;
for (auto i = 0ull; i < lastFullChunkEnd; i += CHUNK_SIZE) {
for (int j = 0; j < CHUNK_SIZE; j++) {
tmp[j] = (U)(((T*)srcBuffer)[i + j] - (T)header.offset);
}
fastpack(tmp, dstBuffer + i * bitWidth / 8, bitWidth);
}
// Pack last partial chunk, avoiding overflows
auto remainingValues = numValuesToCompress % CHUNK_SIZE;
if (remainingValues > 0) {
memcpy(tmp, (const U*)srcBuffer + lastFullChunkEnd, remainingValues * sizeof(U));
for (int i = 0; i < remainingValues; i++) {
tmp[i] = (U)((T)tmp[i] - (T)header.offset);
}
memset(tmp + remainingValues, 0, CHUNK_SIZE - remainingValues);
fastpack(tmp, dstBuffer + lastFullChunkEnd * bitWidth / 8, bitWidth);
}
}
srcBuffer += numValuesToCompress * sizeof(U);
return sizeToCompress;
Expand Down Expand Up @@ -358,6 +402,11 @@ void IntegerBitpacking<T>::decompressFromPage(const uint8_t* srcBuffer, uint64_t
if (header.hasNegative) {
SignExtend<T, U, CHUNK_SIZE>(dstBuffer + dstIndex * sizeof(U), header.bitWidth);
}
if (header.offset != 0) {
for (int i = 0; i < CHUNK_SIZE; i++) {
((T*)dstBuffer)[dstIndex + i] += (T)header.offset;
}
}
srcCursor += bytesPerChunk;
}
// Copy remaining values from within the last chunk.
Expand Down Expand Up @@ -409,18 +458,21 @@ void BooleanBitpacking::decompressFromPage(const uint8_t* srcBuffer, uint64_t sr
}
}

uint8_t BitpackHeader::getDataByte() const {
uint8_t data = bitWidth;
std::array<uint8_t, CompressionMetadata::DATA_SIZE> BitpackHeader::getData() const {
std::array<uint8_t, CompressionMetadata::DATA_SIZE> data = {bitWidth};
*(uint64_t*)&data[1] = offset;
if (hasNegative) {
data |= NEGATIVE_FLAG;
data[0] |= NEGATIVE_FLAG;
}
return data;
}

BitpackHeader BitpackHeader::readHeader(uint8_t data) {
BitpackHeader BitpackHeader::readHeader(
const std::array<uint8_t, CompressionMetadata::DATA_SIZE>& data) {
BitpackHeader header;
header.bitWidth = data & BITWIDTH_MASK;
header.hasNegative = data & NEGATIVE_FLAG;
header.bitWidth = data[0] & BITWIDTH_MASK;
header.hasNegative = data[0] & NEGATIVE_FLAG;
header.offset = *(uint64_t*)&data[1];
return header;
}

Expand Down
32 changes: 24 additions & 8 deletions test/storage/compression_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ using namespace kuzu::storage;

template<typename T>
void test_compression(CompressionAlg& alg, std::vector<T> src) {
// Force offset of 0 for bitpacking
src[0] = 0;
auto pageSize = 4096;
std::vector<uint8_t> dest(pageSize);

Expand All @@ -20,7 +22,7 @@ void test_compression(CompressionAlg& alg, std::vector<T> src) {
std::vector<T> decompressed(src.size());
alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata);
EXPECT_EQ(src, decompressed);
// works with all bit widths
// works with all bit widths (but not all offsets)
T value = 0;
alg.setValueFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, metadata);
alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata);
Expand Down Expand Up @@ -73,8 +75,6 @@ TEST(CompressionTests, IntegerPackingTest32) {
src[i] = i;
}
auto alg = IntegerBitpacking<int32_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth,
std::bit_width(static_cast<uint32_t>(length - 1)));
test_compression(alg, src);
}

Expand All @@ -85,31 +85,26 @@ TEST(CompressionTests, IntegerPackingTest32Small) {
src[i] = i;
}
auto alg = IntegerBitpacking<int32_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth,
std::bit_width(static_cast<uint32_t>(length - 1)));
test_compression(alg, src);
}

TEST(CompressionTests, IntegerPackingTest64) {
std::vector<int64_t> src(128, 6);
auto alg = IntegerBitpacking<int64_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth, std::bit_width(6u));
test_compression(alg, src);
}

TEST(CompressionTests, IntegerPackingTestNegative32) {
std::vector<int32_t> src(128, -6);
src[5] = 20;
auto alg = IntegerBitpacking<int32_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth, std::bit_width(20u) + 1);
test_compression(alg, src);
}

TEST(CompressionTests, IntegerPackingTestNegative64) {
std::vector<int64_t> src(128, -6);
src[5] = 20;
auto alg = IntegerBitpacking<int64_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth, std::bit_width(20u) + 1);
test_compression(alg, src);
}

Expand Down Expand Up @@ -268,3 +263,24 @@ TEST(CompressionTests, IntegerPackingMultiPageUnsigned8) {

integerPackingMultiPage(src);
}

// Tests with a large fixed offset
TEST(CompressionTests, OffsetIntegerPackingMultiPageUnsigned64) {
int64_t numValues = 100;
std::vector<uint64_t> src(numValues);
for (int i = 0; i < numValues; i++) {
src[i] = 10000000 + i;
}

integerPackingMultiPage(src);
}

TEST(CompressionTests, OffsetIntegerPackingMultiPageSigned64) {
int64_t numValues = 100;
std::vector<int64_t> src(numValues);
for (int i = 0; i < numValues; i++) {
src[i] = -10000000 - i;
}

integerPackingMultiPage(src);
}

0 comments on commit 2a19754

Please sign in to comment.