Skip to content

Commit

Permalink
Rewrite string storage to use index, offset and data columns
Browse files Browse the repository at this point in the history
This sets up the storage layout for compressing strings, and removes the
limit on string size in storage.
  • Loading branch information
benjaminwinger committed Nov 1, 2023
1 parent 80620e7 commit 21fb0d9
Show file tree
Hide file tree
Showing 19 changed files with 653 additions and 393 deletions.
2 changes: 1 addition & 1 deletion src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ template<>
uint64_t CopyNode::appendToPKIndex<int64_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);
template<>
uint64_t CopyNode::appendToPKIndex<common::ku_string_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
uint64_t CopyNode::appendToPKIndex<std::string>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

} // namespace processor
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ class LocalVector {
class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, totalStringLength{
0} {};

void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;

uint64_t ovfStringLength;
uint64_t totalStringLength;
};

class StructLocalVector : public LocalVector {
Expand Down
46 changes: 10 additions & 36 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,53 +1,29 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/type_utils.h"
#include <functional>

#include "common/constants.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "compression.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"

namespace arrow {
class Array;
}

namespace kuzu {
namespace storage {

class NullColumnChunk;
class CompressionAlg;

struct BaseColumnChunkMetadata {
struct ColumnChunkMetadata {
common::page_idx_t pageIdx;
common::page_idx_t numPages;

BaseColumnChunkMetadata() : BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0} {}
BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
: pageIdx(pageIdx), numPages(numPages) {}
virtual ~BaseColumnChunkMetadata() = default;
};

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
CompressionMetadata compMeta;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0}, numValues{UINT64_MAX} {}
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages,
uint64_t numNodesInChunk, CompressionMetadata compMeta)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk),
compMeta(compMeta) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
common::offset_t lastOffsetInPage;

OverflowColumnChunkMetadata()
: BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {}
OverflowColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage)
: BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {}
uint64_t numNodesInChunk, const CompressionMetadata& compMeta)
: pageIdx(pageIdx), numPages(numPages), numValues(numNodesInChunk), compMeta(compMeta) {}
};

// Base data segment covers all fixed-sized data types.
Expand All @@ -60,8 +36,9 @@ class ColumnChunk {

// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
explicit ColumnChunk(
common::LogicalType dataType, bool enableCompression = true, bool hasNullChunk = true);
explicit ColumnChunk(common::LogicalType dataType, bool enableCompression = true,
bool hasNullChunk = true,
common::offset_t capacity = common::StorageConstants::NODE_GROUP_SIZE);

virtual ~ColumnChunk() = default;

Expand All @@ -88,9 +65,6 @@ class ColumnChunk {
ColumnChunkMetadata flushBuffer(
BMFileHandle* dataFH, common::page_idx_t startPageIdx, const ColumnChunkMetadata& metadata);

// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);

static inline common::page_idx_t getNumPagesForBytes(uint64_t numBytes) {
return (numBytes + common::BufferPoolConstants::PAGE_4KB_SIZE - 1) /
common::BufferPoolConstants::PAGE_4KB_SIZE;
Expand Down
43 changes: 29 additions & 14 deletions src/include/storage/store/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <array>
#include <cstdint>
#include <limits>

#include "common/types/types.h"

Expand Down Expand Up @@ -52,8 +51,8 @@ class CompressionAlg {

// Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst,
virtual void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& metadata) const = 0;

// Reads a value from the buffer at the given position and stores it at the given memory address
Expand Down Expand Up @@ -102,11 +101,11 @@ class Uncompressed : public CompressionAlg {

Uncompressed(const Uncompressed&) = default;

inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst,
inline void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& /*metadata*/) const final {
memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue,
numBytesPerValue);
memcpy(dstBuffer + dstOffset * numBytesPerValue, srcBuffer + srcOffset * numBytesPerValue,
numBytesPerValue * numValues);
}

inline void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
Expand Down Expand Up @@ -185,8 +184,9 @@ class IntegerBitpacking : public CompressionAlg {
IntegerBitpacking() = default;
IntegerBitpacking(const IntegerBitpacking&) = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;
void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& metadata) const final;

// Read a single value from the buffer
void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
Expand Down Expand Up @@ -241,8 +241,9 @@ class BooleanBitpacking : public CompressionAlg {
BooleanBitpacking() = default;
BooleanBitpacking(const BooleanBitpacking&) = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;
void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& metadata) const final;

void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;
Expand Down Expand Up @@ -294,14 +295,28 @@ class ReadCompressedValuesFromPage : public CompressedFunctor {
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata);
};

class WriteCompressedValueToPage : public CompressedFunctor {
class WriteCompressedValuesToPage : public CompressedFunctor {
public:
explicit WriteCompressedValueToPage(const common::LogicalType& logicalType)
explicit WriteCompressedValuesToPage(const common::LogicalType& logicalType)
: CompressedFunctor(logicalType) {}
WriteCompressedValueToPage(const WriteCompressedValueToPage&) = default;
WriteCompressedValuesToPage(const WriteCompressedValuesToPage&) = default;

void operator()(uint8_t* frame, uint16_t posInFrame, const uint8_t* data,
common::offset_t dataOffset, common::offset_t numValues,
const CompressionMetadata& metadata);
};

class WriteCompressedValueToPageFromVector {
public:
explicit WriteCompressedValueToPageFromVector(const common::LogicalType& logicalType)
: writeFunc(logicalType) {}
WriteCompressedValueToPageFromVector(const WriteCompressedValueToPageFromVector&) = default;

void operator()(uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector,
uint32_t posInVector, const CompressionMetadata& metadata);

private:
WriteCompressedValuesToPage writeFunc;
};

} // namespace storage
Expand Down
118 changes: 86 additions & 32 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ using read_values_to_vector_func_t = std::function<void(uint8_t* frame,
uint32_t numValuesToRead, const CompressionMetadata& metadata)>;
using write_values_from_vector_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata)>;
using write_values_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
const uint8_t* data, common::offset_t dataOffset, common::offset_t numValues,
const CompressionMetadata& metadata)>;

using read_values_to_page_func_t =
std::function<void(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
Expand All @@ -27,26 +30,13 @@ class NullNodeColumn;
class StructNodeColumn;
// TODO(Guodong): This is intentionally duplicated with `Column`, as for now, we don't change rel
// tables. `Column` is used for rel tables only. Eventually, we should remove `Column`.
class NodeColumn {
class BaseNodeColumn {
public:
NodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BaseNodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats PropertyStatistics,
bool enableCompression, bool requireNullColumn = true);
virtual ~NodeColumn() = default;

// Expose for feature store
virtual void batchLookup(transaction::Transaction* transaction,
const common::offset_t* nodeOffsets, size_t size, uint8_t* result);

virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector);
virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk);
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual ~BaseNodeColumn() = default;

virtual void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx);

Expand All @@ -69,34 +59,24 @@ class NodeColumn {
return metadataDA->get(nodeGroupIdx, transaction);
}

virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom);
virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result);

protected:
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, common::ValueVector* resultVector,
const CompressionMetadata& compMeta, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector,
const CompressionMetadata& compMeta);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t posInVector);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func);

virtual void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom);

PageElementCursor getPageCursorForOffset(
transaction::TransactionType transactionType, common::offset_t nodeOffset);
// TODO(Guodong): This is mostly duplicated with
// StorageStructure::createWALVersionOfPageIfNecessaryForElement(). Should be cleared later.
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset);
// Produces a page cursor for the offset relative to the given node group
PageElementCursor getPageCursorForOffsetInGroup(transaction::TransactionType transactionType,
common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx);

protected:
StorageStructureID storageStructureID;
Expand All @@ -112,12 +92,86 @@ class NodeColumn {
std::unique_ptr<NodeColumn> nullColumn;
read_values_to_vector_func_t readToVectorFunc;
write_values_from_vector_func_t writeFromVectorFunc;
write_values_func_t writeFunc;
read_values_to_page_func_t readToPageFunc;
batch_lookup_func_t batchLookupFunc;
RWPropertyStats propertyStatistics;
bool enableCompression;
};

// NodeColumn where we assume it the underlying storage always stores NodeGroupSize values
// Data is indexed using a global offset (which is internally used to find the node group via the
// node group size)
class NodeColumn : public BaseNodeColumn {

Check warning on line 105 in src/include/storage/store/node_column.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/store/node_column.h#L105

Added line #L105 was not covered by tests
public:
NodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
bool enableCompression, bool requireNullColumn = true)
: BaseNodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager,
wal, transaction, propertyStatistics, enableCompression, requireNullColumn} {}

// Expose for feature store
virtual void batchLookup(transaction::Transaction* transaction,
const common::offset_t* nodeOffsets, size_t size, uint8_t* result);

virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector);
inline void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) override {
BaseNodeColumn::scan(nodeGroupIdx, columnChunk);
}
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);

virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom);

protected:
virtual void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
virtual void writeValue(
const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, const uint8_t* data);

virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t posInVector);

// TODO(Guodong): This is mostly duplicated with
// StorageStructure::createWALVersionOfPageIfNecessaryForElement(). Should be cleared later.
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset);

// Produces a page cursor for the absolute node offset
PageElementCursor getPageCursorForOffset(
transaction::TransactionType transactionType, common::offset_t nodeOffset);
// Produces a page cursor for the absolute node offset, given a node group
PageElementCursor getPageCursorForOffsetAndGroup(transaction::TransactionType transactionType,
common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx);
};

// NodeColumn for data adjacent to a NodeGroup
// Data is indexed using the node group identifier and the offset within the node group
class AuxiliaryNodeColumn : public BaseNodeColumn {
public:
AuxiliaryNodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
bool enableCompression, bool requireNullColumn = true)
: BaseNodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager,
wal, transaction, propertyStatistics, enableCompression, requireNullColumn} {}

// Append values to the end of the node group, resizing it if necessary
common::offset_t appendValues(
common::node_group_idx_t nodeGroupIdx, const uint8_t* data, common::offset_t numValues);

protected:
};

struct NodeColumnFactory {
static std::unique_ptr<NodeColumn> createNodeColumn(const common::LogicalType& dataType,
const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH,
Expand Down
Loading

0 comments on commit 21fb0d9

Please sign in to comment.