Skip to content

Commit

Permalink
Replaced CompressedColumnChunk with a flushBuffer functor
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Sep 18, 2023
1 parent bedb71b commit 5f695d0
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 130 deletions.
2 changes: 1 addition & 1 deletion src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = fileInfo->getFileSize();
if (fileSize == -1) {
throw Exception(StringUtils::string_format("File {} not open.", fileInfo->path));
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileUtils {
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset);
FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes, uint64_t offset);
// This function is a no-op if either file, from or to, does not exist.
static void overwriteFile(const std::string& from, const std::string& to);
static void copyFile(const std::string& from, const std::string& to,
Expand Down
19 changes: 4 additions & 15 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ColumnChunk {
virtual void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);
ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);
Expand Down Expand Up @@ -164,6 +164,9 @@ class ColumnChunk {
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
const common::CopyDescription* copyDescription;
uint64_t numValues;
std::function<ColumnChunkMetadata(
const uint8_t*, uint64_t, uint64_t, BMFileHandle*, common::page_idx_t)>
flushBufferFunction;
};

template<>
Expand All @@ -178,18 +181,6 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
return common::NullMask::isNull((uint64_t*)buffer.get(), pos);
}

// Column chunk which is compressed during flushBuffer, but otherwise maintained uncompressed
class CompressedColumnChunk : public ColumnChunk {
public:
explicit CompressedColumnChunk(std::unique_ptr<CompressionAlg> alg,
common::LogicalType dataType, common::CopyDescription* copyDescription,
bool hasNullChunk = true);
ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

protected:
std::unique_ptr<CompressionAlg> alg;
};

// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
Expand All @@ -207,8 +198,6 @@ class BoolColumnChunk : public ColumnChunk {

void resize(uint64_t capacity) final;

ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

protected:
inline uint64_t numBytesForValues(common::offset_t numValues) const {
// 8 values per byte, and we need a buffer size which is a multiple of 8 bytes
Expand Down
53 changes: 30 additions & 23 deletions src/include/storage/copier/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ enum class CompressionType : uint8_t {
};

struct CompressionMetadata {
uint64_t numValuesPerPage;
CompressionType compression;
// Extra data to be used to store codec-specific information
uint8_t data;
explicit CompressionMetadata(uint64_t numValuesPerPage = UINT64_MAX,
explicit CompressionMetadata(
CompressionType compression = CompressionType::UNCOMPRESSED, uint8_t data = 0)
: numValuesPerPage{numValuesPerPage}, compression{compression}, data{data} {}
: compression{compression}, data{data} {}

// Returns the number of values which will be stored in the given data size
// This must be consistent with the compression implementation for the given size
uint64_t numValues(uint64_t dataSize, const common::LogicalType& dataType) const;
};

class CompressionAlg {
Expand All @@ -57,7 +60,7 @@ class CompressionAlg {
// Returns the number of values per page (currently this must be consistent across a
// ColumnChunk)
virtual CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) = 0;
const uint8_t* srcBuffer, uint64_t numValues) const = 0;

// Takes uncompressed data from the srcBuffer and compresses it into the dstBuffer
//
Expand All @@ -75,7 +78,8 @@ class CompressionAlg {
// returns the size in bytes of the compressed data within the page (rounded up to the nearest
// byte)
virtual uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) = 0;
uint8_t* dstBuffer, uint64_t dstBufferSize,
const struct CompressionMetadata& metadata) const = 0;

// Takes compressed data from the srcBuffer and decompresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
Expand Down Expand Up @@ -108,13 +112,18 @@ class CopyCompression : public CompressionAlg {
numBytesPerValue);
}

static inline uint64_t numValues(uint64_t dataSize, const common::LogicalType& logicalType) {
return dataSize / getDataTypeSizeInChunk(logicalType);
}

inline CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return CompressionMetadata(pageSize / numBytesPerValue);
const uint8_t* srcBuffer, uint64_t numValues) const override {
return CompressionMetadata();
}

inline uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) override {
uint8_t* dstBuffer, uint64_t dstBufferSize,
const struct CompressionMetadata& metadata) const override {
uint64_t numValues = std::min(numValuesRemaining, dstBufferSize / numBytesPerValue);
uint64_t sizeToCopy = numValues * numBytesPerValue;
assert(sizeToCopy <= dstBufferSize);
Expand Down Expand Up @@ -175,8 +184,8 @@ class IntegerBitpacking : public CompressionAlg {

BitpackHeader getBitWidth(const uint8_t* srcBuffer, uint64_t numValues) const;

uint64_t numValuesPerPage(uint8_t bitWidth, uint64_t pageSize) {
auto numValues = pageSize * 8 / bitWidth;
static inline uint64_t numValues(uint64_t dataSize, const BitpackHeader& header) {
auto numValues = dataSize * 8 / header.bitWidth;
// Round down to nearest multiple of CHUNK_SIZE to ensure that we don't write any extra
// values Rounding up could overflow the buffer
// TODO(bmwinger): Pack extra values into the space at the end. This will probably be
Expand All @@ -186,18 +195,15 @@ class IntegerBitpacking : public CompressionAlg {
}

CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
auto result = getBitWidth(srcBuffer, numValues);
bitWidth = result.bitWidth;
hasNegative = result.hasNegative;
CompressionMetadata metadata{bitWidth == 0 ? std::numeric_limits<uint64_t>::max() :
numValuesPerPage(bitWidth, pageSize),
CompressionType::INTEGER_BITPACKING, result.getDataByte()};
const uint8_t* srcBuffer, uint64_t numValues) const override {
auto header = getBitWidth(srcBuffer, numValues);
CompressionMetadata metadata{CompressionType::INTEGER_BITPACKING, header.getDataByte()};
return metadata;
}

uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;
uint8_t* dstBuffer, uint64_t dstBufferSize,
const struct CompressionMetadata& metadata) const final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues,
Expand All @@ -214,8 +220,6 @@ class IntegerBitpacking : public CompressionAlg {
}

common::LogicalType mLogicalType;
uint8_t bitWidth;
bool hasNegative;
};

class BooleanBitpacking : public CompressionAlg {
Expand All @@ -231,12 +235,15 @@ class BooleanBitpacking : public CompressionAlg {
void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;

static inline uint64_t numValues(uint64_t dataSize) { return dataSize * 8; }

inline CompressionMetadata startCompression(
const uint8_t* srcBuffer, uint64_t numValues, uint64_t pageSize) override {
return CompressionMetadata{pageSize * 8, CompressionType::BOOLEAN_BITPACKING, 0};
const uint8_t* srcBuffer, uint64_t numValues) const override {
return CompressionMetadata{CompressionType::BOOLEAN_BITPACKING, 0};
}
uint64_t compressNextPage(const uint8_t*& srcBuffer, uint64_t numValuesRemaining,
uint8_t* dstBuffer, uint64_t dstBufferSize) final;
uint8_t* dstBuffer, uint64_t dstBufferSize,
const struct CompressionMetadata& metadata) const final;

void decompressFromPage(const uint8_t* srcBuffer, uint64_t srcOffset, uint8_t* dstBuffer,
uint64_t dstOffset, uint64_t numValues, const CompressionMetadata& metadata) const final;
Expand Down
115 changes: 64 additions & 51 deletions src/storage/copier/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,74 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace storage {

ColumnChunkMetadata fixedSizedFlushBuffer(const uint8_t* buffer, uint64_t bufferSize,
uint64_t numValues, BMFileHandle* dataFH, page_idx_t startPageIdx) {
FileUtils::writeToFile(dataFH->getFileInfo(), buffer, bufferSize,
startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
return ColumnChunkMetadata(startPageIdx, ColumnChunk::getNumPagesForBytes(bufferSize),
numValues, CompressionMetadata());
}

ColumnChunkMetadata booleanFlushBuffer(const uint8_t* buffer, uint64_t bufferSize,
uint64_t numValues, BMFileHandle* dataFH, page_idx_t startPageIdx) {
// Since we compress into memory, storage is the same as fixed-sized values,
// but we need to mark it as being boolean compressed.
FileUtils::writeToFile(dataFH->getFileInfo(), buffer, bufferSize,
startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
return ColumnChunkMetadata(startPageIdx, ColumnChunk::getNumPagesForBytes(bufferSize),
numValues, CompressionMetadata(CompressionType::BOOLEAN_BITPACKING));
}

class CompressedFlushBuffer {
std::shared_ptr<CompressionAlg> alg;
const LogicalType& dataType;

public:
CompressedFlushBuffer(std::unique_ptr<CompressionAlg> alg, LogicalType& dataType)
: alg{std::move(alg)}, dataType{dataType} {}

CompressedFlushBuffer(const CompressedFlushBuffer& other) = default;

ColumnChunkMetadata operator()(const uint8_t* buffer, uint64_t bufferSize, uint64_t numValues,
BMFileHandle* dataFH, page_idx_t startPageIdx) {
int64_t valuesRemaining = numValues;
const uint8_t* bufferStart = buffer;
auto compressedBuffer = std::make_unique<uint8_t[]>(BufferPoolConstants::PAGE_4KB_SIZE);
auto numPages = 0;
auto metadata = alg->startCompression(buffer, numValues);
auto numValuesPerPage = metadata.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
do {
auto compressedSize = alg->compressNextPage(bufferStart, valuesRemaining,
compressedBuffer.get(), BufferPoolConstants::PAGE_4KB_SIZE, metadata);
// Avoid underflows
if (numValuesPerPage > valuesRemaining) {
valuesRemaining = 0;
} else {
valuesRemaining -= numValuesPerPage;
}
FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), compressedSize,
(startPageIdx + numPages) * BufferPoolConstants::PAGE_4KB_SIZE);
numPages++;
} while (valuesRemaining > 0);
return ColumnChunkMetadata(startPageIdx, numPages, numValues, metadata);
}
};

ColumnChunk::ColumnChunk(LogicalType dataType, CopyDescription* copyDescription, bool hasNullChunk)
: dataType{std::move(dataType)}, numBytesPerValue{getDataTypeSizeInChunk(this->dataType)},
copyDescription{copyDescription}, numValues{0} {
if (hasNullChunk) {
nullChunk = std::make_unique<NullColumnChunk>();
}
switch (this->dataType.getPhysicalType()) {
case PhysicalTypeID::BOOL: {
flushBufferFunction = booleanFlushBuffer;
break;
}
default: {
flushBufferFunction = fixedSizedFlushBuffer;
}
}
}

void ColumnChunk::initialize(offset_t capacity) {
Expand Down Expand Up @@ -336,21 +398,7 @@ page_idx_t ColumnChunk::getNumPages() const {
}

ColumnChunkMetadata ColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
FileUtils::writeToFile(dataFH->getFileInfo(), buffer.get(), bufferSize,
startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
auto numBytesPerFixedSizedValue = getDataTypeSizeInChunk(dataType);
auto numValuesPerPage =
numBytesPerFixedSizedValue == 0 ?
0 :
PageUtils::getNumElementsInAPage(numBytesPerFixedSizedValue, false /*hasNull */);
return ColumnChunkMetadata(
startPageIdx, getNumPagesForBuffer(), numValues, CompressionMetadata(numValuesPerPage));
}

ColumnChunkMetadata BoolColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
auto chunkMeta = ColumnChunk::flushBuffer(dataFH, startPageIdx);
chunkMeta.compMeta.numValuesPerPage = BufferPoolConstants::PAGE_4KB_SIZE * 8;
return chunkMeta;
return flushBufferFunction(buffer.get(), bufferSize, numValues, dataFH, startPageIdx);
}

uint32_t ColumnChunk::getDataTypeSizeInChunk(LogicalType& dataType) {
Expand Down Expand Up @@ -479,14 +527,7 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
case PhysicalTypeID::INT64:
case PhysicalTypeID::INT32:
case PhysicalTypeID::INT16:
case PhysicalTypeID::INT8: {
if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
chunk = std::make_unique<SerialColumnChunk>();
} else {
chunk = std::make_unique<CompressedColumnChunk>(
std::make_unique<CopyCompression>(dataType), dataType, copyDescription);
}
} break;
case PhysicalTypeID::INT8:
case PhysicalTypeID::DOUBLE:
case PhysicalTypeID::FLOAT:
case PhysicalTypeID::INTERVAL: {
Expand Down Expand Up @@ -601,33 +642,5 @@ void BoolColumnChunk::resize(uint64_t capacity) {
}
}

CompressedColumnChunk::CompressedColumnChunk(std::unique_ptr<CompressionAlg> alg,
LogicalType dataType, common::CopyDescription* copyDescription, bool hasNullChunk)
: ColumnChunk(dataType, copyDescription, hasNullChunk), alg(std::move(alg)) {}

ColumnChunkMetadata CompressedColumnChunk::flushBuffer(
BMFileHandle* dataFH, page_idx_t startPageIdx) {
int64_t valuesRemaining = numValues;
const uint8_t* bufferStart = buffer.get();
auto compressedBuffer = std::make_unique<uint8_t[]>(BufferPoolConstants::PAGE_4KB_SIZE);
auto numPages = 0;
auto metadata =
alg->startCompression(buffer.get(), numValues, BufferPoolConstants::PAGE_4KB_SIZE);
do {
auto compressedSize = alg->compressNextPage(bufferStart, valuesRemaining,
compressedBuffer.get(), BufferPoolConstants::PAGE_4KB_SIZE);
// Avoid underflows
if (metadata.numValuesPerPage > valuesRemaining) {
valuesRemaining = 0;
} else {
valuesRemaining -= metadata.numValuesPerPage;
}
FileUtils::writeToFile(dataFH->getFileInfo(), compressedBuffer.get(), compressedSize,
(startPageIdx + numPages) * BufferPoolConstants::PAGE_4KB_SIZE);
numPages++;
} while (valuesRemaining > 0);
return ColumnChunkMetadata(startPageIdx, numPages, numValues, metadata);
}

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit 5f695d0

Please sign in to comment.