Skip to content

Commit

Permalink
Restructure to store compression metadata in columnchunkmetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Sep 8, 2023
1 parent 8c70dfe commit ac442d8
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 158 deletions.
7 changes: 4 additions & 3 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "common/copier_config/copier_config.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "compression.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"
Expand All @@ -29,13 +30,13 @@ struct BaseColumnChunkMetadata {

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
uint64_t numValuesPerPage;
CompressionMetadata compMeta;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages,
uint64_t numNodesInChunk, uint64_t numValuesPerPage)
uint64_t numNodesInChunk, CompressionMetadata compMeta)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk),
numValuesPerPage(numValuesPerPage) {}
compMeta(compMeta) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
Expand Down
92 changes: 54 additions & 38 deletions src/include/storage/copier/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,45 @@ namespace storage {
// Returns the size of the data type in bytes
uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType);

// Compression type is written to the data header both so we can usually catch issues when we
// decompress uncompressed data by mistake, and to allow for runtime-configurable compression in
// future.
enum class CompressionType : uint8_t {
COPY = 0,
INTEGER_BITPACKING = 1,
BOOLEAN = 2,
};

struct CompressionMetadata {
uint64_t numValuesPerPage;
CompressionType compression;
// Extra data to be used to store codec-specific information
uint8_t data;
explicit CompressionMetadata(uint64_t numValuesPerPage = UINT64_MAX,
CompressionType compression = CompressionType::COPY, uint8_t data = 0)
: numValuesPerPage{numValuesPerPage}, compression{compression}, data{data} {}
};

class CompressionAlg {
public:
virtual ~CompressionAlg() = default;

// Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst) = 0;
uint8_t* dstBuffer, common::offset_t posInDst, const CompressionMetadata& metadata) = 0;

// Reads a value from the buffer at the given position and stores it at the given memory address
// dst should point to an uncompressed value
virtual inline void getValue(
const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const = 0;
virtual inline void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst,
const CompressionMetadata& metadata) const = 0;

// TODO(bmwinger): this should probably be scoped. E.g. by having a separate class for handling
// compression which is returned by the compress function. Called when compression starts. Will
// always be called before compressNextPage
// Returns the number of values per page (currently this must be consistent across a
// ColumnChunk)
virtual uint64_t startCompression(
virtual CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) = 0;

// Takes uncompressed data from the srcBuffer and compresses it into the dstBuffer
Expand All @@ -55,15 +74,8 @@ class CompressionAlg {
// Offsets refer to value offsets, not byte offsets
// srcBuffer points to the beginning of a page
virtual void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset,
uint8_t* dstBuffer, uint64_t dstOffset, uint64_t numValues) = 0;
};

// Compression type is written to the data header both so we can usually catch issues when we
// decompress uncompressed data by mistake, and to allow for runtime-configurable compression in
// future.
enum class CompressionType : uint8_t {
COPY = 1,
INTEGER_BITPACKING = 2,
uint8_t* dstBuffer, uint64_t dstOffset, uint64_t numValues,
const CompressionMetadata& metadata) = 0;
};

// Compression alg which does not compress values and instead just copies them.
Expand All @@ -75,18 +87,19 @@ class CopyCompression : public CompressionAlg {
}

inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst) final {
uint8_t* dstBuffer, common::offset_t posInDst, const CompressionMetadata& metadata) final {
memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue,
numBytesPerValue);
}

inline void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const override {
inline void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst,
const CompressionMetadata& metadata) const override {
memcpy(dst, buffer + pos * numBytesPerValue, numBytesPerValue);
}

inline uint64_t startCompression(
inline CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return pageSize / numBytesPerValue;
return CompressionMetadata(pageSize / numBytesPerValue);
}

inline uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
Expand All @@ -100,7 +113,7 @@ class CopyCompression : public CompressionAlg {
}

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) override {
uint64_t dstOffset, uint64_t numValues, const CompressionMetadata& metadata) override {
std::memcpy(dstBuffer + dstOffset * numBytesPerValue,
srcBuffer + srcOffset * numBytesPerValue, numValues * numBytesPerValue);
}
Expand All @@ -119,15 +132,14 @@ struct BitpackHeader {
bool hasNegative;

// Writes the header to the buffer and advances the buffer pointer to the end of the header
void writeHeader(uint8_t*& buffer) const {
*(buffer++) = static_cast<uint8_t>(CompressionType::INTEGER_BITPACKING);
*buffer = bitWidth;
*buffer |= hasNegative << 7;
buffer++;
uint8_t getDataByte() const {
uint8_t data = bitWidth;
data |= hasNegative << 7;
return data;
}

// Reads the header from the buffer and advances the buffer pointer to the end of the header
static BitpackHeader readHeader(const uint8_t*& data);
static BitpackHeader readHeader(uint8_t data);

// Size of the header in bytes
static uint32_t size() { return 2; }
Expand All @@ -142,11 +154,12 @@ class IntegerBitpacking : public CompressionAlg {
IntegerBitpacking() = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst) final;
common::offset_t posInDst, const CompressionMetadata& metadata) final;

void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const final;
void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst,
const CompressionMetadata& metadata) const final;

std::pair<uint8_t, bool> getBitWidth(const uint8_t* srcBuffer, uint64_t numValues) const;
BitpackHeader getBitWidth(const uint8_t* srcBuffer, uint64_t numValues) const;

uint64_t numValuesPerPage(uint8_t bitWidth, uint64_t pageSize) {

Check warning on line 164 in src/include/storage/copier/compression.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/compression.h#L164

Added line #L164 was not covered by tests
auto numValues = (pageSize - BitpackHeader::size()) * 8 / bitWidth;
Expand All @@ -158,20 +171,22 @@ class IntegerBitpacking : public CompressionAlg {
return numValues;

Check warning on line 171 in src/include/storage/copier/compression.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/compression.h#L171

Added line #L171 was not covered by tests
}

uint64_t startCompression(
CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
auto result = getBitWidth(srcBuffer, numValues);
bitWidth = result.first;
hasNegative = result.second;
return bitWidth == 0 ? std::numeric_limits<uint64_t>::max() :
numValuesPerPage(bitWidth, pageSize);
bitWidth = result.bitWidth;
hasNegative = result.hasNegative;
CompressionMetadata metadata{bitWidth == 0 ? std::numeric_limits<uint64_t>::max() :
numValuesPerPage(bitWidth, pageSize),
CompressionType::INTEGER_BITPACKING, result.getDataByte()};
return metadata;
}

uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) final;
uint64_t dstOffset, uint64_t numValues, const struct CompressionMetadata& metadata) final;

protected:
common::LogicalType mLogicalType;
Expand All @@ -184,19 +199,20 @@ class BoolCompression : public CompressionAlg {

public:
void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst) final;
common::offset_t posInDst, const CompressionMetadata& metadata) final;

void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst) const final;
void getValue(const uint8_t* buffer, common::offset_t pos, uint8_t* dst,
const CompressionMetadata& metadata) const final;

inline uint64_t startCompression(
inline CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return pageSize * 8;
return CompressionMetadata{pageSize * 8, CompressionType::BOOLEAN, 0};
}
uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues) final;
uint64_t dstOffset, uint64_t numValues, const CompressionMetadata& metadata) final;
};

} // namespace storage
Expand Down
47 changes: 27 additions & 20 deletions src/include/storage/copier/physical_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace storage {

class PageElementCursor;
class CompressionAlg;
struct CompressionMetadata;

// Functions to map logical data in a ValueVector to physical data on disk or in a buffer
class PhysicalMapping {
Expand All @@ -27,13 +28,15 @@ class PhysicalMapping {
virtual inline const common::LogicalType& logicalType() const = 0;

virtual void readValuesFromPage(const uint8_t* frame, const PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) = 0;
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) = 0;

virtual void writeValueToPage(uint8_t* frame, uint16_t posInFrame,
const common::ValueVector* vector, uint32_t posInVector) = 0;
const common::ValueVector* vector, uint32_t posInVector,
const CompressionMetadata& metadata) = 0;

virtual void getValue(
const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult) = 0;
virtual void getValue(const uint8_t* frame, uint16_t posInFrame, uint8_t* result,
uint32_t posInResult, const CompressionMetadata& metadata) = 0;
};

class CompressedMapping : public PhysicalMapping {
Expand All @@ -44,13 +47,14 @@ class CompressedMapping : public PhysicalMapping {
inline const common::LogicalType& logicalType() const override { return mLogicalType; }

void readValuesFromPage(const uint8_t* frame, const PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) override;
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) override;

void writeValueToPage(uint8_t* frame, uint16_t posInFrame, const common::ValueVector* vector,
uint32_t posInVector) override;
uint32_t posInVector, const CompressionMetadata& metadata) override;

void getValue(
const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult) override;
void getValue(const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult,
const CompressionMetadata& metadata) override;

protected:
std::unique_ptr<CompressionAlg> alg;
Expand All @@ -66,13 +70,14 @@ class NullMapping : public PhysicalMapping {
inline const common::LogicalType& logicalType() const final { return LOGICAL_TYPE; }

void readValuesFromPage(const uint8_t* frame, const PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) override;
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) override;

void writeValueToPage(uint8_t* frame, uint16_t posInFrame, const common::ValueVector* vector,
uint32_t posInVector) override;
uint32_t posInVector, const CompressionMetadata& metadata) override;

void getValue(
const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult) override;
void getValue(const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult,
const CompressionMetadata& metadata) override;
};

// Compression alg which does not compress values and instead just copies them.
Expand All @@ -83,13 +88,14 @@ class FixedValueMapping : public PhysicalMapping {
inline const common::LogicalType& logicalType() const override { return mLogicalType; }

void writeValueToPage(uint8_t* frame, uint16_t posInFrame, const common::ValueVector* vector,
uint32_t posInVector) override;
uint32_t posInVector, const CompressionMetadata& metadata) override;

void readValuesFromPage(const uint8_t* frame, const PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) override;
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) override;

void getValue(
const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult) override {
void getValue(const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult,
const CompressionMetadata& metadata) override {
memcpy(result + posInResult * numBytesPerFixedSizedValue,
frame + (posInFrame * numBytesPerFixedSizedValue), numBytesPerFixedSizedValue);
}
Expand All @@ -107,13 +113,14 @@ class InternalIDMapping : public PhysicalMapping {
public:
inline const common::LogicalType& logicalType() const final { return LOGICAL_TYPE; }

Check warning on line 114 in src/include/storage/copier/physical_mapping.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/physical_mapping.h#L114

Added line #L114 was not covered by tests
void readValuesFromPage(const uint8_t* frame, const PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead) final;
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) final;

void writeValueToPage(uint8_t* frame, uint16_t posInFrame, const common::ValueVector* vector,
uint32_t posInVector) final;
uint32_t posInVector, const CompressionMetadata& metadata) final;

inline void getValue(
const uint8_t* frame, uint16_t posInFrame, uint8_t* result, uint32_t posInResult) override {
inline void getValue(const uint8_t* frame, uint16_t posInFrame, uint8_t* result,

Check warning on line 122 in src/include/storage/copier/physical_mapping.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/physical_mapping.h#L122

Added line #L122 was not covered by tests
uint32_t posInResult, const CompressionMetadata& metadata) override {
memcpy(result + posInResult * numBytesPerFixedSizedValue,
frame + (posInFrame * numBytesPerFixedSizedValue), numBytesPerFixedSizedValue);
}

Check warning on line 126 in src/include/storage/copier/physical_mapping.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/physical_mapping.h#L124-L126

Added lines #L124 - L126 were not covered by tests
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ class NodeColumn {
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, common::ValueVector* resultVector, uint64_t numValuesPerPage,
uint64_t startPosInVector = 0);
uint64_t numValuesToScan, common::ValueVector* resultVector,
const CompressionMetadata& compMeta, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector,
uint64_t numValuesPerPage);
const CompressionMetadata& compMeta);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
Expand Down
18 changes: 12 additions & 6 deletions src/storage/copier/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,13 @@ ColumnChunkMetadata ColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t st
numBytesPerFixedSizedValue == 0 ?
0 :
PageUtils::getNumElementsInAPage(numBytesPerFixedSizedValue, false /*hasNull */);
return ColumnChunkMetadata(startPageIdx, getNumPagesForBuffer(), numValues, numValuesPerPage);
return ColumnChunkMetadata(
startPageIdx, getNumPagesForBuffer(), numValues, CompressionMetadata(numValuesPerPage));
}

ColumnChunkMetadata BoolColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
auto chunkMeta = ColumnChunk::flushBuffer(dataFH, startPageIdx);
chunkMeta.numValuesPerPage = BufferPoolConstants::PAGE_4KB_SIZE * 8;
chunkMeta.compMeta.numValuesPerPage = BufferPoolConstants::PAGE_4KB_SIZE * 8;
return chunkMeta;
}

Expand Down Expand Up @@ -492,7 +493,7 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
} else {
chunk =
std::make_unique<CompressedColumnChunk>(std::make_unique<CopyCompression>(dataType),
// std::make_unique<IntegerBitpacking<int16_t, uint16_t>>(),
// std::make_unique<IntegerBitpacking<int16_t>>(),
dataType, copyDescription);
}
} break;
Expand Down Expand Up @@ -620,17 +621,22 @@ ColumnChunkMetadata CompressedColumnChunk::flushBuffer(
const uint8_t* bufferStart = buffer.get();
auto compressedBuffer = std::make_unique<uint8_t[]>(BufferPoolConstants::PAGE_4KB_SIZE);
auto numPages = 0;
auto numValuesPerPage =
auto metadata =
alg->startCompression(buffer.get(), numValues, BufferPoolConstants::PAGE_4KB_SIZE);
do {
auto compressedSize = alg->compressNextPage(bufferStart, valuesRemaining,
compressedBuffer.get(), BufferPoolConstants::PAGE_4KB_SIZE);
valuesRemaining -= numValuesPerPage;
// Avoid underflows
if (metadata.numValuesPerPage > valuesRemaining) {
valuesRemaining = 0;
} else {
valuesRemaining -= metadata.numValuesPerPage;
}
FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), compressedSize,
(startPageIdx + numPages) * BufferPoolConstants::PAGE_4KB_SIZE);
numPages++;
} while (valuesRemaining > 0);
return ColumnChunkMetadata(startPageIdx, numPages, numValues, numValuesPerPage);
return ColumnChunkMetadata(startPageIdx, numPages, numValues, metadata);
}

} // namespace storage
Expand Down
Loading

0 comments on commit ac442d8

Please sign in to comment.