Skip to content

Commit

Permalink
implemented Abstract Compression interface and initial integer packing
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Sep 7, 2023
1 parent b7b10b0 commit c0713ec
Show file tree
Hide file tree
Showing 29 changed files with 21,836 additions and 234 deletions.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
21 changes: 18 additions & 3 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace kuzu {
namespace storage {

class NullColumnChunk;
class CompressionAlg;

struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
Expand All @@ -28,11 +29,12 @@ struct BaseColumnChunkMetadata {

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
uint64_t numValuesPerPage;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk, uint64_t numValuesPerPage)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk), numValuesPerPage(numValuesPerPage) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
Expand Down Expand Up @@ -89,7 +91,7 @@ class ColumnChunk {
virtual void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);
virtual ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);
Expand Down Expand Up @@ -173,6 +175,17 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
return common::NullMask::isNull((uint64_t*)buffer.get(), pos);
}

// Column chunk which is compressed during flushBuffer, but otherwise maintained uncompressed
class CompressedColumnChunk : public ColumnChunk {
public:
explicit CompressedColumnChunk(std::unique_ptr<CompressionAlg> alg, common::LogicalType dataType,
common::CopyDescription *copyDescription, bool hasNullChunk = true);
ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;
protected:
std::unique_ptr<CompressionAlg> alg;
};

// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(common::CopyDescription* copyDescription, bool hasNullChunk = true)
Expand All @@ -189,6 +202,8 @@ class BoolColumnChunk : public ColumnChunk {

void resize(uint64_t capacity) final;

ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

protected:
inline uint64_t numBytesForValues(common::offset_t numValues) const {
// 8 values per byte, and we need a buffer size which is a multiple of 8 bytes
Expand Down
201 changes: 201 additions & 0 deletions src/include/storage/copier/compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#pragma once

#include <cstdint>
#include <limits>

#include "common/types/types.h"

namespace kuzu {
namespace storage {

// Returns the size of the data type in bytes
uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType);

class CompressionAlg {
public:
virtual ~CompressionAlg() = default;

// Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst) = 0;

// Reads a value from the buffer at the given position and stores it at the given memory address
// dst should point to an uncompressed value
virtual inline void getValue(
const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const = 0;

// TODO(bmwinger): this should probably be scoped. E.g. by having a separate class for handling
// compression which is returned by the compress function. Called when compression starts. Will
// always be called before compressNextPage
// Returns the number of values per page (currently this must be consistent across a
// ColumnChunk)
virtual uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) = 0;

// Takes uncompressed data from the srcBuffer and compresses it into the dstBuffer
//
// stores only as much data in dstBuffer as will fit, and advances the srcBuffer pointer
// to the beginning of the next value to store.
// (This means that we can't start the next page on an unaligned value.
// Maybe instead we could use value offsets, but the compression algorithms
// usually work on aligned chunks anyway)
//
// dstBufferSize is the size in bytes
// numValuesRemaining is the number of values remaining in the srcBuffer to be compressed.
// compressNextPage must store the least of either the number of values per page
// (as returned by startCompression), or the remaining number of values.
//
// returns the size in bytes of the compressed data within the page (rounded up to the nearest
// byte)
virtual uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) = 0;

// Takes compressed data from the srcBuffer and decompresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
// srcBuffer points to the beginning of a page
virtual void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset,
uint8_t* dstBuffer, uint64_t dstOffset, uint64_t numValues) = 0;
};

// Compression type is written to the data header both so we can usually catch issues when we
// decompress uncompressed data by mistake, and to allow for runtime-configurable compression in
// future.
enum class CompressionType : uint8_t {
COPY = 1,
INTEGER_BITPACKING = 2,
};

// Compression alg which does not compress values and instead just copies them.
class CopyCompression : public CompressionAlg {
public:
explicit CopyCompression(const common::LogicalType& logicalType)
: logicalType{logicalType}, numBytesPerValue{getDataTypeSizeInChunk(this->logicalType)} {
assert(numBytesPerValue > 0);
}

inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst) final {
memcpy(dstBuffer + 1 /*header byte*/ + posInDst * numBytesPerValue,
srcBuffer + posInSrc * numBytesPerValue, numBytesPerValue);
}

inline void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const override {
memcpy(dst, buffer + 1 /*header byte*/ + pos * numBytesPerValue, numBytesPerValue);
}

inline uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return (pageSize - 1) / numBytesPerValue;
}

inline uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) override {
uint64_t numValues = std::min(numValuesRemaining, dstBufferSize / numBytesPerValue);
uint64_t sizeToCopy = numValues * numBytesPerValue;
assert(sizeToCopy <= dstBufferSize);
// First byte stores the type of compression
dstBuffer[0] = static_cast<uint8_t>(CompressionType::COPY);
std::memcpy(dstBuffer + 1 /*header byte*/, srcBuffer, sizeToCopy);
srcBuffer += sizeToCopy;
return sizeToCopy;
}

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) override;

protected:
common::LogicalType logicalType;
const uint32_t numBytesPerValue;
};

// Six bits are needed for the bit width (fewer for smaller types, but the header byte is the same
// for simplicity) One bit (the eighth) is needed to indicate if there are negative values The
// seventh bit is unused
struct BitpackHeader {
uint8_t bitWidth;
bool hasNegative;

// Writes the header to the buffer and advances the buffer pointer to the end of the header
void writeHeader(uint8_t*& buffer) const {
*(buffer++) = static_cast<uint8_t>(CompressionType::INTEGER_BITPACKING);
*buffer = bitWidth;
*buffer |= hasNegative << 7;
buffer++;
}

// Reads the header from the buffer and advances the buffer pointer to the end of the header
static BitpackHeader readHeader(const uint8_t*& data);

// Size of the header in bytes
static uint32_t size() { return 2; }
};

template<typename T>
class IntegerBitpacking : public CompressionAlg {
static const common::LogicalType LOGICAL_TYPE;
using U = std::make_unsigned_t<T>;

public:
IntegerBitpacking() = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst) final;

void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const final;

std::pair<uint8_t, bool> getBitWidth(const uint8_t* srcBuffer, uint64_t numValues) const;

uint64_t numValuesPerPage(uint8_t bitWidth, uint64_t pageSize) {
auto numValues = (pageSize - BitpackHeader::size()) * 8 / bitWidth;
// Round down to nearest multiple of 32 to ensure that we don't write any extra values
// Rounding up could overflow the buffer
// TODO(bmwinger): Pack extra values into the space at the end. This will probably be
// slower, but only needs to be done once.
numValues -= numValues % 32;
return numValues;
}

uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
auto result = getBitWidth(srcBuffer, numValues);
bitWidth = result.first;
hasNegative = result.second;
return bitWidth == 0 ? std::numeric_limits<uint64_t>::max() :
numValuesPerPage(bitWidth, pageSize);
}

uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) final;

protected:
common::LogicalType mLogicalType;
uint8_t bitWidth;
bool hasNegative;
};

class BoolCompression : public CompressionAlg {
static const common::LogicalType LOGICAL_TYPE;

public:
void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst) final;

void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const final;

inline uint64_t startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return pageSize * 8;
}
uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) final;
};

} // namespace storage
} // namespace kuzu
2 changes: 2 additions & 0 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class VarListColumnChunk : public ColumnChunk {
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

inline void resizeDataColumnChunk(uint64_t numBytesForBuffer) {
// TODO(bmwinger): This won't work properly for booleans (will be one eighth as many values
// as could fit)
varListDataColumnChunk.resizeBuffer(
numBytesForBuffer / varListDataColumnChunk.dataColumnChunk->getNumBytesPerValue());
}
Expand Down
55 changes: 13 additions & 42 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,7 @@ class TransactionTests;

namespace storage {

using read_node_column_func_t = std::function<void(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead)>;
using write_node_column_func_t = std::function<void(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector)>;

struct FixedSizedNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead);
static void writeValueToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVecto);

static void readInternalIDValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead);
static void writeInternalIDValueToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVecto);
};

struct NullNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead);
static void writeValueToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector);
};

struct BoolNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead);
static void writeValueToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector);
};
class PhysicalMapping;

class NullNodeColumn;
class StructNodeColumn;
Expand All @@ -56,14 +27,12 @@ class NodeColumn {
friend class StructNodeColumn;

public:
NodeColumn(const catalog::Property& property, BMFileHandle* dataFH, BMFileHandle* metadataFH,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool requireNullColumn = true);
NodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
NodeColumn(std::unique_ptr<PhysicalMapping> physicalMapping,
const catalog::MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats PropertyStatistics,
bool requireNullColumn);
virtual ~NodeColumn() = default;
bool requireNullColumn = true);
virtual ~NodeColumn();

// Expose for feature store
virtual void batchLookup(transaction::Transaction* transaction,
Expand Down Expand Up @@ -103,9 +72,11 @@ class NodeColumn {
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, common::ValueVector* resultVector, uint64_t startPosInVector = 0);
uint64_t numValuesToScan, common::ValueVector* resultVector, uint64_t numValuesPerPage,
uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
common::ValueVector* nodeIDVector, common::ValueVector* resultVector,
uint64_t numValuesPerPage);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
Expand All @@ -131,18 +102,18 @@ class NodeColumn {
protected:
StorageStructureID storageStructureID;
common::LogicalType dataType;
// TODO(bmwinger): Remove. Only used by var_list_column_chunk for something which should be
// rewritten
uint32_t numBytesPerFixedSizedValue;
uint32_t numValuesPerPage;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
BufferManager* bufferManager;
WAL* wal;
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> metadataDA;
std::unique_ptr<NodeColumn> nullColumn;
std::vector<std::unique_ptr<NodeColumn>> childrenColumns;
read_node_column_func_t readNodeColumnFunc;
write_node_column_func_t writeNodeColumnFunc;
RWPropertyStats propertyStatistics;
std::unique_ptr<PhysicalMapping> physicalMapping;
};

class BoolNodeColumn : public NodeColumn {
Expand Down
Loading

0 comments on commit c0713ec

Please sign in to comment.