Skip to content

Commit

Permalink
implemented Abstract Compression interface and initial integer packing
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Sep 8, 2023
1 parent d03d306 commit 827183c
Show file tree
Hide file tree
Showing 32 changed files with 10,224 additions and 227 deletions.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
21 changes: 18 additions & 3 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace kuzu {
namespace storage {

class NullColumnChunk;
class CompressionAlg;

struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
Expand All @@ -28,11 +29,12 @@ struct BaseColumnChunkMetadata {

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
uint64_t numValuesPerPage;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk, uint64_t numValuesPerPage)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk), numValuesPerPage(numValuesPerPage) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
Expand Down Expand Up @@ -89,7 +91,7 @@ class ColumnChunk {
virtual void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);
virtual ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);
Expand Down Expand Up @@ -173,6 +175,17 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
return common::NullMask::isNull((uint64_t*)buffer.get(), pos);
}

// Column chunk which is compressed during flushBuffer, but otherwise maintained uncompressed
class CompressedColumnChunk : public ColumnChunk {
public:
explicit CompressedColumnChunk(std::unique_ptr<CompressionAlg> alg, common::LogicalType dataType,
common::CopyDescription *copyDescription, bool hasNullChunk = true);
ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;
protected:
std::unique_ptr<CompressionAlg> alg;
};

// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(common::CopyDescription* copyDescription, bool hasNullChunk = true)
Expand All @@ -189,6 +202,8 @@ class BoolColumnChunk : public ColumnChunk {

void resize(uint64_t capacity) final;

ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

protected:
inline uint64_t numBytesForValues(common::offset_t numValues) const {
// 8 values per byte, and we need a buffer size which is a multiple of 8 bytes
Expand Down
203 changes: 203 additions & 0 deletions src/include/storage/copier/compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#pragma once

#include <cstdint>
#include <limits>

#include "common/types/types.h"

namespace kuzu {
namespace storage {

// Returns the size of the data type in bytes
uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType);

class CompressionAlg {
public:
virtual ~CompressionAlg() = default;

// Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst) = 0;

// Reads a value from the buffer at the given position and stores it at the given memory address
// dst should point to an uncompressed value
virtual inline void getValue(
const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const = 0;

// TODO(bmwinger): this should probably be scoped. E.g. by having a separate class for handling
// compression which is returned by the compress function. Called when compression starts. Will
// always be called before compressNextPage
// Returns the number of values per page (currently this must be consistent across a
// ColumnChunk)
virtual uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) = 0;

// Takes uncompressed data from the srcBuffer and compresses it into the dstBuffer
//
// stores only as much data in dstBuffer as will fit, and advances the srcBuffer pointer
// to the beginning of the next value to store.
// (This means that we can't start the next page on an unaligned value.
// Maybe instead we could use value offsets, but the compression algorithms
// usually work on aligned chunks anyway)
//
// dstBufferSize is the size in bytes
// numValuesRemaining is the number of values remaining in the srcBuffer to be compressed.
// compressNextPage must store the least of either the number of values per page
// (as returned by startCompression), or the remaining number of values.
//
// returns the size in bytes of the compressed data within the page (rounded up to the nearest
// byte)
virtual uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) = 0;

// Takes compressed data from the srcBuffer and decompresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
// srcBuffer points to the beginning of a page
virtual void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset,
uint8_t* dstBuffer, uint64_t dstOffset, uint64_t numValues) = 0;
};

// Compression type is written to the data header both so we can usually catch issues when we
// decompress uncompressed data by mistake, and to allow for runtime-configurable compression in
// future.
enum class CompressionType : uint8_t {
COPY = 1,
INTEGER_BITPACKING = 2,
};

// Compression alg which does not compress values and instead just copies them.
class CopyCompression : public CompressionAlg {
public:
explicit CopyCompression(const common::LogicalType& logicalType)
: logicalType{logicalType}, numBytesPerValue{getDataTypeSizeInChunk(this->logicalType)} {
assert(numBytesPerValue > 0);
}

inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst) final {
memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue,
numBytesPerValue);
}

inline void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const override {
memcpy(dst, buffer + pos * numBytesPerValue, numBytesPerValue);
}

inline uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return pageSize / numBytesPerValue;
}

inline uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) override {
uint64_t numValues = std::min(numValuesRemaining, dstBufferSize / numBytesPerValue);
uint64_t sizeToCopy = numValues * numBytesPerValue;
assert(sizeToCopy <= dstBufferSize);
std::memcpy(dstBuffer, srcBuffer, sizeToCopy);
srcBuffer += sizeToCopy;
return sizeToCopy;
}

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) override {
std::memcpy(dstBuffer + dstOffset * numBytesPerValue,
srcBuffer + srcOffset * numBytesPerValue, numValues * numBytesPerValue);
}

protected:
common::LogicalType logicalType;
const uint32_t numBytesPerValue;
};

// TODO(bmwinger): Move from data to ColumnChunkMetadata
// Six bits are needed for the bit width (fewer for smaller types, but the header byte is the same
// for simplicity) One bit (the eighth) is needed to indicate if there are negative values The
// seventh bit is unused
struct BitpackHeader {
uint8_t bitWidth;
bool hasNegative;

// Writes the header to the buffer and advances the buffer pointer to the end of the header
void writeHeader(uint8_t*& buffer) const {
*(buffer++) = static_cast<uint8_t>(CompressionType::INTEGER_BITPACKING);
*buffer = bitWidth;
*buffer |= hasNegative << 7;
buffer++;
}

// Reads the header from the buffer and advances the buffer pointer to the end of the header
static BitpackHeader readHeader(const uint8_t*& data);

// Size of the header in bytes
static uint32_t size() { return 2; }
};

template<typename T>
class IntegerBitpacking : public CompressionAlg {
static const common::LogicalType LOGICAL_TYPE;
using U = std::make_unsigned_t<T>;

public:
IntegerBitpacking() = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst) final;

void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const final;

std::pair<uint8_t, bool> getBitWidth(const uint8_t* srcBuffer, uint64_t numValues) const;

uint64_t numValuesPerPage(uint8_t bitWidth, uint64_t pageSize) {
auto numValues = (pageSize - BitpackHeader::size()) * 8 / bitWidth;
// Round down to nearest multiple of 32 to ensure that we don't write any extra values
// Rounding up could overflow the buffer
// TODO(bmwinger): Pack extra values into the space at the end. This will probably be
// slower, but only needs to be done once.
numValues -= numValues % 32;
return numValues;
}

uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
auto result = getBitWidth(srcBuffer, numValues);
bitWidth = result.first;
hasNegative = result.second;
return bitWidth == 0 ? std::numeric_limits<uint64_t>::max() :
numValuesPerPage(bitWidth, pageSize);
}

uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) final;

protected:
common::LogicalType mLogicalType;
uint8_t bitWidth;
bool hasNegative;
};

class BoolCompression : public CompressionAlg {
static const common::LogicalType LOGICAL_TYPE;

public:
void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst) final;

void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const final;

inline uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return pageSize * 8;
}
uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) final;
};

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit 827183c

Please sign in to comment.