Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frame of reference encoding #2140

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/include/storage/store/compression.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <array>
#include <cstdint>
#include <limits>

Expand Down Expand Up @@ -27,11 +28,12 @@ enum class CompressionType : uint8_t {
};

struct CompressionMetadata {
static constexpr uint8_t DATA_SIZE = 9;
CompressionType compression;
// Extra data to be used to store codec-specific information
uint8_t data;
explicit CompressionMetadata(
CompressionType compression = CompressionType::UNCOMPRESSED, uint8_t data = 0)
std::array<uint8_t, DATA_SIZE> data;
explicit CompressionMetadata(CompressionType compression = CompressionType::UNCOMPRESSED,
std::array<uint8_t, DATA_SIZE> data = {})
: compression{compression}, data{data} {}

// Returns the number of values which will be stored in the given data size
Expand Down Expand Up @@ -149,20 +151,30 @@ class Uncompressed : public CompressionAlg {
const uint32_t numBytesPerValue;
};

// Six bits are needed for the bit width (fewer for smaller types, but the header byte is the same
// for simplicity) One bit (the eighth) is needed to indicate if there are negative values The
// seventh bit is unused
// Serialized as nine bytes.
// In the first byte:
// Six bits are needed for the bit width (fewer for smaller types, but the header byte is the same
// for simplicity)
// One bit (the eighth) is needed to indicate if there are negative values
// The seventh bit is unused
// The eight remaining bytes are used for the offset
struct BitpackHeader {
uint8_t bitWidth;
bool hasNegative;
// Offset to apply to all values
// Stored as unsigned, but for signed variants of IntegerBitpacking
// it gets cast to a signed type on use, letting it also store negative offsets
uint64_t offset;
static const uint8_t NEGATIVE_FLAG = 0b10000000;
static const uint8_t BITWIDTH_MASK = 0b01111111;

uint8_t getDataByte() const;
std::array<uint8_t, CompressionMetadata::DATA_SIZE> getData() const;

static BitpackHeader readHeader(uint8_t data);
static BitpackHeader readHeader(
const std::array<uint8_t, CompressionMetadata::DATA_SIZE>& data);
};

// Augmented with Frame of Reference encoding using an offset stored in the compression metadata
template<typename T>
class IntegerBitpacking : public CompressionAlg {
using U = std::make_unsigned_t<T>;
Expand Down Expand Up @@ -198,7 +210,7 @@ class IntegerBitpacking : public CompressionAlg {
CompressionMetadata getCompressionMetadata(
const uint8_t* srcBuffer, uint64_t numValues) const override {
auto header = getBitWidth(srcBuffer, numValues);
CompressionMetadata metadata{CompressionType::INTEGER_BITPACKING, header.getDataByte()};
CompressionMetadata metadata{CompressionType::INTEGER_BITPACKING, header.getData()};
return metadata;
}

Expand Down Expand Up @@ -239,7 +251,7 @@ class BooleanBitpacking : public CompressionAlg {

inline CompressionMetadata getCompressionMetadata(
const uint8_t* srcBuffer, uint64_t numValues) const override {
return CompressionMetadata{CompressionType::BOOLEAN_BITPACKING, 0};
return CompressionMetadata{CompressionType::BOOLEAN_BITPACKING};
}
uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize,
Expand Down
120 changes: 86 additions & 34 deletions src/storage/store/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,31 +188,49 @@
template<typename T>
BitpackHeader IntegerBitpacking<T>::getBitWidth(
const uint8_t* srcBuffer, uint64_t numValues) const {
U max = 0ull;
auto hasNegative = false;
T max = std::numeric_limits<T>::min(), min = std::numeric_limits<T>::max();
for (int i = 0; i < numValues; i++) {
T value = ((T*)srcBuffer)[i];
U absolute = abs<T>(value);
if (absolute > max) {
max = absolute;
}
if (value < 0) {
hasNegative = true;
}
}
if (hasNegative) {
// Needs an extra bit for two's complement encoding
return BitpackHeader{static_cast<uint8_t>(std::bit_width(max) + 1), true /*hasNegative*/};
if (value > max) {
max = value;
}
if (value < min) {
min = value;
}
}
bool hasNegative;
uint64_t offset = 0;
uint8_t bitWidth;
// Frame of reference encoding is only used when values are either all positive or all negative,
// and when we will save at least 1 bit per value.
if (min > 0 && max > 0 && std::bit_width((U)(max - min)) < std::bit_width((U)max)) {
offset = min;
bitWidth = static_cast<uint8_t>(std::bit_width((U)(max - min)));
hasNegative = false;
} else if (min < 0 && max < 0 && std::bit_width((U)(min - max)) < std::bit_width((U)max)) {
offset = (U)max;
bitWidth = static_cast<uint8_t>(std::bit_width((U)(min - max))) + 1;

Check warning on line 212 in src/storage/store/compression.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/compression.cpp#L211-L212

Added lines #L211 - L212 were not covered by tests
// This is somewhat suboptimal since we know that the values are all negative
// We could use an offset equal to the minimum, but values which are all negative are
// probably going to grow in the negative direction, leading to many re-compressions when
// inserting
hasNegative = true;
} else if (min < 0) {
bitWidth = static_cast<uint8_t>(std::bit_width((U)std::max(abs<T>(min), abs<T>(max)))) + 1;
hasNegative = true;
} else {
return BitpackHeader{static_cast<uint8_t>(std::bit_width(max)), false /*hasNegative*/};
bitWidth = static_cast<uint8_t>(std::bit_width((U)std::max(abs<T>(min), abs<T>(max))));
hasNegative = false;
}
return BitpackHeader{bitWidth, hasNegative, offset};
}

template<typename T>
bool IntegerBitpacking<T>::canUpdateInPlace(T value, const BitpackHeader& header) {
T adjustedValue = value - (T)header.offset;
// If there are negatives, the effective bit width is smaller
auto valueSize = std::bit_width((U)abs<T>(value));
if (!header.hasNegative && value < 0) {
auto valueSize = std::bit_width((U)abs<T>(adjustedValue));
if (!header.hasNegative && adjustedValue < 0) {
return false;
}
if ((header.hasNegative && valueSize > header.bitWidth - 1) ||
Expand Down Expand Up @@ -269,7 +287,7 @@

U chunk[CHUNK_SIZE];
fastunpack(chunkStart, chunk, header.bitWidth);
chunk[posInChunk] = (U)value;
chunk[posInChunk] = (U)(value - (T)header.offset);
fastpack(chunk, chunkStart, header.bitWidth);
}

Expand All @@ -284,6 +302,11 @@
if (header.hasNegative) {
SignExtend<T, U, CHUNK_SIZE>((uint8_t*)chunk, header.bitWidth);
}
if (header.offset != 0) {
for (int i = pos; i < pos + numValuesToRead; i++) {
chunk[i] = (U)((T)chunk[i] + (T)header.offset);
}
}
memcpy(dst, &chunk[pos], sizeof(T) * numValuesToRead);
}

Expand Down Expand Up @@ -313,17 +336,38 @@
assert(dstBufferSize >= sizeToCompress);
// This might overflow the source buffer if there are fewer values remaining than the chunk size
// so we stop at the end of the last full chunk and use a temporary array to avoid overflow.
auto lastFullChunkEnd = numValuesToCompress - numValuesToCompress % CHUNK_SIZE;
for (auto i = 0ull; i < lastFullChunkEnd; i += CHUNK_SIZE) {
fastpack((const U*)srcBuffer + i, dstBuffer + i * bitWidth / 8, bitWidth);
}
// Pack last partial chunk, avoiding overflows
if (numValuesToCompress % CHUNK_SIZE > 0) {
// TODO(bmwinger): optimize to remove temporary array
U chunk[CHUNK_SIZE] = {0};
memcpy(chunk, (const U*)srcBuffer + lastFullChunkEnd,
numValuesToCompress % CHUNK_SIZE * sizeof(U));
fastpack(chunk, dstBuffer + lastFullChunkEnd * bitWidth / 8, bitWidth);
if (header.offset == 0) {
auto lastFullChunkEnd = numValuesToCompress - numValuesToCompress % CHUNK_SIZE;
for (auto i = 0ull; i < lastFullChunkEnd; i += CHUNK_SIZE) {
fastpack((const U*)srcBuffer + i, dstBuffer + i * bitWidth / 8, bitWidth);
}
// Pack last partial chunk, avoiding overflows
if (numValuesToCompress % CHUNK_SIZE > 0) {
// TODO(bmwinger): optimize to remove temporary array
U chunk[CHUNK_SIZE] = {0};
memcpy(chunk, (const U*)srcBuffer + lastFullChunkEnd,
numValuesToCompress % CHUNK_SIZE * sizeof(U));
fastpack(chunk, dstBuffer + lastFullChunkEnd * bitWidth / 8, bitWidth);
}
} else {
U tmp[CHUNK_SIZE];
auto lastFullChunkEnd = numValuesToCompress - numValuesToCompress % CHUNK_SIZE;
for (auto i = 0ull; i < lastFullChunkEnd; i += CHUNK_SIZE) {
for (int j = 0; j < CHUNK_SIZE; j++) {
tmp[j] = (U)(((T*)srcBuffer)[i + j] - (T)header.offset);
}
fastpack(tmp, dstBuffer + i * bitWidth / 8, bitWidth);
}
// Pack last partial chunk, avoiding overflows
auto remainingValues = numValuesToCompress % CHUNK_SIZE;
if (remainingValues > 0) {
memcpy(tmp, (const U*)srcBuffer + lastFullChunkEnd, remainingValues * sizeof(U));
for (int i = 0; i < remainingValues; i++) {
tmp[i] = (U)((T)tmp[i] - (T)header.offset);
}
memset(tmp + remainingValues, 0, CHUNK_SIZE - remainingValues);
fastpack(tmp, dstBuffer + lastFullChunkEnd * bitWidth / 8, bitWidth);
}
}
srcBuffer += numValuesToCompress * sizeof(U);
return sizeToCompress;
Expand Down Expand Up @@ -358,6 +402,11 @@
if (header.hasNegative) {
SignExtend<T, U, CHUNK_SIZE>(dstBuffer + dstIndex * sizeof(U), header.bitWidth);
}
if (header.offset != 0) {
for (int i = 0; i < CHUNK_SIZE; i++) {
((T*)dstBuffer)[dstIndex + i] += (T)header.offset;
}
}
srcCursor += bytesPerChunk;
}
// Copy remaining values from within the last chunk.
Expand Down Expand Up @@ -409,18 +458,21 @@
}
}

uint8_t BitpackHeader::getDataByte() const {
uint8_t data = bitWidth;
std::array<uint8_t, CompressionMetadata::DATA_SIZE> BitpackHeader::getData() const {
std::array<uint8_t, CompressionMetadata::DATA_SIZE> data = {bitWidth};
*(uint64_t*)&data[1] = offset;
if (hasNegative) {
data |= NEGATIVE_FLAG;
data[0] |= NEGATIVE_FLAG;
}
return data;
}

BitpackHeader BitpackHeader::readHeader(uint8_t data) {
BitpackHeader BitpackHeader::readHeader(
const std::array<uint8_t, CompressionMetadata::DATA_SIZE>& data) {
BitpackHeader header;
header.bitWidth = data & BITWIDTH_MASK;
header.hasNegative = data & NEGATIVE_FLAG;
header.bitWidth = data[0] & BITWIDTH_MASK;
header.hasNegative = data[0] & NEGATIVE_FLAG;
header.offset = *(uint64_t*)&data[1];
return header;
}

Expand Down
32 changes: 24 additions & 8 deletions test/storage/compression_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ using namespace kuzu::storage;

template<typename T>
void test_compression(CompressionAlg& alg, std::vector<T> src) {
// Force offset of 0 for bitpacking
src[0] = 0;
auto pageSize = 4096;
std::vector<uint8_t> dest(pageSize);

Expand All @@ -20,7 +22,7 @@ void test_compression(CompressionAlg& alg, std::vector<T> src) {
std::vector<T> decompressed(src.size());
alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata);
EXPECT_EQ(src, decompressed);
// works with all bit widths
// works with all bit widths (but not all offsets)
T value = 0;
alg.setValueFromUncompressed((uint8_t*)&value, 0, (uint8_t*)dest.data(), 1, metadata);
alg.decompressFromPage(dest.data(), 0, (uint8_t*)decompressed.data(), 0, src.size(), metadata);
Expand Down Expand Up @@ -73,8 +75,6 @@ TEST(CompressionTests, IntegerPackingTest32) {
src[i] = i;
}
auto alg = IntegerBitpacking<int32_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth,
std::bit_width(static_cast<uint32_t>(length - 1)));
test_compression(alg, src);
}

Expand All @@ -85,31 +85,26 @@ TEST(CompressionTests, IntegerPackingTest32Small) {
src[i] = i;
}
auto alg = IntegerBitpacking<int32_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth,
std::bit_width(static_cast<uint32_t>(length - 1)));
test_compression(alg, src);
}

TEST(CompressionTests, IntegerPackingTest64) {
std::vector<int64_t> src(128, 6);
auto alg = IntegerBitpacking<int64_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth, std::bit_width(6u));
test_compression(alg, src);
}

TEST(CompressionTests, IntegerPackingTestNegative32) {
std::vector<int32_t> src(128, -6);
src[5] = 20;
auto alg = IntegerBitpacking<int32_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth, std::bit_width(20u) + 1);
test_compression(alg, src);
}

TEST(CompressionTests, IntegerPackingTestNegative64) {
std::vector<int64_t> src(128, -6);
src[5] = 20;
auto alg = IntegerBitpacking<int64_t>();
ASSERT_EQ(alg.getBitWidth((uint8_t*)src.data(), src.size()).bitWidth, std::bit_width(20u) + 1);
test_compression(alg, src);
}

Expand Down Expand Up @@ -268,3 +263,24 @@ TEST(CompressionTests, IntegerPackingMultiPageUnsigned8) {

integerPackingMultiPage(src);
}

// Tests with a large fixed offset
TEST(CompressionTests, OffsetIntegerPackingMultiPageUnsigned64) {
int64_t numValues = 100;
std::vector<uint64_t> src(numValues);
for (int i = 0; i < numValues; i++) {
src[i] = 10000000 + i;
}

integerPackingMultiPage(src);
}

TEST(CompressionTests, OffsetIntegerPackingMultiPageSigned64) {
int64_t numValues = 100;
std::vector<int64_t> src(numValues);
for (int i = 0; i < numValues; i++) {
src[i] = -10000000 - i;
}

integerPackingMultiPage(src);
}