Skip to content

Commit

Permalink
Rewrite string storage to use index, offset and data columns
Browse files Browse the repository at this point in the history
This sets up the storage layout for compressing strings, and removes the
limit on string size in storage.
  • Loading branch information
benjaminwinger committed Nov 15, 2023
1 parent ba5b927 commit a92953d
Show file tree
Hide file tree
Showing 22 changed files with 654 additions and 332 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.12.1 LANGUAGES CXX C)
project(Kuzu VERSION 0.0.12.2 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
11 changes: 11 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,17 @@ void StringVector::addString(
}
}

ku_string_t& StringVector::reserveString(ValueVector* vector, uint32_t vectorPos, uint64_t length) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING);
auto stringBuffer = reinterpret_cast<StringAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
auto& dstStr = vector->getValue<ku_string_t>(vectorPos);
dstStr.len = length;
if (!ku_string_t::isShortString(length)) {
dstStr.overflowPtr = reinterpret_cast<uint64_t>(stringBuffer->allocateOverflow(length));
}
return dstStr;
}

void StringVector::addString(ValueVector* vector, ku_string_t& dstStr, ku_string_t& srcStr) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING);
auto stringBuffer = reinterpret_cast<StringAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.0.12.1";
constexpr char KUZU_VERSION[] = "v0.0.12.2";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class StringVector {
static void addString(ValueVector* vector, uint32_t vectorPos, ku_string_t& srcStr);
static void addString(
ValueVector* vector, uint32_t vectorPos, const char* srcStr, uint64_t length);
// Add empty string with space reserved for the provided size
// Returned value can be modified to set the string contents
static ku_string_t& reserveString(ValueVector* vector, uint32_t vectorPos, uint64_t length);
static void addString(ValueVector* vector, ku_string_t& dstStr, ku_string_t& srcStr);
static void addString(
ValueVector* vector, ku_string_t& dstStr, const char* srcStr, uint64_t length);
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ template<>
uint64_t CopyNode::appendToPKIndex<int64_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);
template<>
uint64_t CopyNode::appendToPKIndex<common::ku_string_t>(storage::PrimaryKeyIndexBuilder* pkIndex,
uint64_t CopyNode::appendToPKIndex<std::string>(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

} // namespace processor
Expand Down
44 changes: 30 additions & 14 deletions src/include/storage/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct CompressionMetadata {
// Returns true if and only if the provided value within the vector can be updated
// in this chunk in-place.
bool canUpdateInPlace(
const common::ValueVector& vector, uint32_t pos, common::PhysicalTypeID physicalType) const;
const uint8_t* data, uint32_t pos, common::PhysicalTypeID physicalType) const;
bool canAlwaysUpdateInPlace() const;
};

Expand All @@ -52,8 +52,8 @@ class CompressionAlg {

// Takes a single uncompressed value from the srcBuffer and compresses it into the dstBuffer
// Offsets refer to value offsets, not byte offsets
virtual void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst,
virtual void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& metadata) const = 0;

// Reads a value from the buffer at the given position and stores it at the given memory address
Expand Down Expand Up @@ -102,11 +102,11 @@ class Uncompressed : public CompressionAlg {

Uncompressed(const Uncompressed&) = default;

inline void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc,
uint8_t* dstBuffer, common::offset_t posInDst,
inline void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& /*metadata*/) const final {
memcpy(dstBuffer + posInDst * numBytesPerValue, srcBuffer + posInSrc * numBytesPerValue,
numBytesPerValue);
memcpy(dstBuffer + dstOffset * numBytesPerValue, srcBuffer + srcOffset * numBytesPerValue,
numBytesPerValue * numValues);
}

inline void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
Expand Down Expand Up @@ -185,8 +185,9 @@ class IntegerBitpacking : public CompressionAlg {
IntegerBitpacking() = default;
IntegerBitpacking(const IntegerBitpacking&) = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;
void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& metadata) const final;

// Read a single value from the buffer
void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
Expand Down Expand Up @@ -241,8 +242,9 @@ class BooleanBitpacking : public CompressionAlg {
BooleanBitpacking() = default;
BooleanBitpacking(const BooleanBitpacking&) = default;

void setValueFromUncompressed(uint8_t* srcBuffer, common::offset_t posInSrc, uint8_t* dstBuffer,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;
void setValuesFromUncompressed(const uint8_t* srcBuffer, common::offset_t srcOffset,
uint8_t* dstBuffer, common::offset_t dstOffset, common::offset_t numValues,
const CompressionMetadata& metadata) const final;

void getValue(const uint8_t* buffer, common::offset_t posInBuffer, uint8_t* dst,
common::offset_t posInDst, const CompressionMetadata& metadata) const final;
Expand Down Expand Up @@ -294,14 +296,28 @@ class ReadCompressedValuesFromPage : public CompressedFunctor {
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata);
};

class WriteCompressedValueToPage : public CompressedFunctor {
class WriteCompressedValuesToPage : public CompressedFunctor {
public:
explicit WriteCompressedValueToPage(const common::LogicalType& logicalType)
explicit WriteCompressedValuesToPage(const common::LogicalType& logicalType)
: CompressedFunctor(logicalType) {}
WriteCompressedValueToPage(const WriteCompressedValueToPage&) = default;
WriteCompressedValuesToPage(const WriteCompressedValuesToPage&) = default;

void operator()(uint8_t* frame, uint16_t posInFrame, const uint8_t* data,
common::offset_t dataOffset, common::offset_t numValues,
const CompressionMetadata& metadata);
};

class WriteCompressedValueToPageFromVector {
public:
explicit WriteCompressedValueToPageFromVector(const common::LogicalType& logicalType)
: writeFunc(logicalType) {}
WriteCompressedValueToPageFromVector(const WriteCompressedValueToPageFromVector&) = default;

void operator()(uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector,
uint32_t posInVector, const CompressionMetadata& metadata);

private:
WriteCompressedValuesToPage writeFunc;
};

} // namespace storage
Expand Down
5 changes: 3 additions & 2 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.12.1", 24}, {"0.0.12", 23}, {"0.0.11", 23}, {"0.0.10", 23}, {"0.0.9", 23},
{"0.0.8", 17}, {"0.0.7", 15}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3", 1}};
return {{"0.0.12.2", 25}, {"0.0.12.1", 24}, {"0.0.12", 23}, {"0.0.11", 23}, {"0.0.10", 23},
{"0.0.9", 23}, {"0.0.8", 17}, {"0.0.7", 15}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7},
{"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
34 changes: 31 additions & 3 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@ using read_values_to_vector_func_t = std::function<void(uint8_t* frame,
uint32_t numValuesToRead, const CompressionMetadata& metadata)>;
using write_values_from_vector_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata)>;
using write_values_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
const uint8_t* data, common::offset_t dataOffset, common::offset_t numValues,
const CompressionMetadata& metadata)>;

using read_values_to_page_func_t =
std::function<void(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
uint32_t posInResult, uint64_t numValues, const CompressionMetadata& metadata)>;
// This is a special usage for the `batchLookup` interface.
using batch_lookup_func_t = read_values_to_page_func_t;

struct ReadState {
ColumnChunkMetadata metadata;
uint64_t numValuesPerPage;
};

class NullColumn;
class StructColumn;
class Column {
friend class LocalColumn;
friend class StringLocalColumn;
friend class StringColumn;
friend class VarListLocalColumn;
friend class StructColumn;

Expand Down Expand Up @@ -76,9 +85,16 @@ class Column {
}
inline InMemDiskArray<ColumnChunkMetadata>* getMetadataDA() const { return metadataDA.get(); }

virtual void scan(transaction::Transaction* transaction, const ReadState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result);

virtual void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom);

// Append values to the end of the node group, resizing it if necessary
common::offset_t appendValues(
common::node_group_idx_t nodeGroupIdx, const uint8_t* data, common::offset_t numValues);

protected:
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
Expand All @@ -96,16 +112,27 @@ class Column {
void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func);

virtual void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom);
virtual void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
virtual void writeValue(
const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset, const uint8_t* data);

// Produces a page cursor for the offset relative to the given node group
PageElementCursor getPageCursorForOffsetInGroup(
common::offset_t nodeOffset, const ReadState& state);

ReadState getReadState(
transaction::TransactionType transactionType, common::node_group_idx_t nodeGroupIdx) const;

// Produces a page cursor for the absolute node offset
PageElementCursor getPageCursorForOffset(
transaction::TransactionType transactionType, common::offset_t nodeOffset);
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset);

private:
static bool containsVarList(common::LogicalType& dataType);
bool canCommitInPlace(common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk);
virtual bool canCommitInPlace(
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk);
void commitLocalChunkInPlace(LocalVectorCollection* localChunk);
void commitLocalChunkOutOfPlace(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localChunk, bool isNewNodeGroup);
Expand Down Expand Up @@ -135,6 +162,7 @@ class Column {
std::unique_ptr<Column> nullColumn;
read_values_to_vector_func_t readToVectorFunc;
write_values_from_vector_func_t writeFromVectorFunc;
write_values_func_t writeFunc;
read_values_to_page_func_t readToPageFunc;
batch_lookup_func_t batchLookupFunc;
RWPropertyStats propertyStatistics;
Expand Down
31 changes: 5 additions & 26 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <functional>

#include "common/constants.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/bm_file_handle.h"
Expand All @@ -13,36 +14,16 @@ namespace storage {
class NullColumnChunk;
class CompressionAlg;

struct BaseColumnChunkMetadata {
struct ColumnChunkMetadata {
common::page_idx_t pageIdx;
common::page_idx_t numPages;

BaseColumnChunkMetadata()
: BaseColumnChunkMetadata{common::INVALID_PAGE_IDX, 0 /* numPages */} {}
BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
: pageIdx(pageIdx), numPages(numPages) {}
virtual ~BaseColumnChunkMetadata() = default;
};

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
CompressionMetadata compMeta;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0}, numValues{UINT64_MAX} {}
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages,
uint64_t numNodesInChunk, CompressionMetadata compMeta)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk),
compMeta(compMeta) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
common::offset_t lastOffsetInPage;

OverflowColumnChunkMetadata()
: BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {}
OverflowColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage)
: BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {}
uint64_t numNodesInChunk, const CompressionMetadata& compMeta)
: pageIdx(pageIdx), numPages(numPages), numValues(numNodesInChunk), compMeta(compMeta) {}
};

// Base data segment covers all fixed-sized data types.
Expand Down Expand Up @@ -120,8 +101,6 @@ class ColumnChunk {

private:
uint64_t getBufferSize() const;
// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);

protected:
common::LogicalType dataType;
Expand Down
34 changes: 28 additions & 6 deletions src/include/storage/store/string_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace storage {

class StringColumn : public Column {
public:
using string_offset_t = uint64_t;
using string_index_t = uint32_t;
StringColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics);
Expand All @@ -18,28 +20,48 @@ class StringColumn : public Column {

void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx) final;

void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;
void writeValue(const ColumnChunkMetadata& chunkMeta, common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) final;

inline InMemDiskArray<OverflowColumnChunkMetadata>* getOverflowMetadataDA() {
return overflowMetadataDA.get();
}
inline Column* getDataColumn() { return dataColumn.get(); }
inline Column* getOffsetColumn() { return offsetColumn.get(); }

void checkpointInMemory() final;
void rollbackInMemory() final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void scanUnfiltered(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup,
common::offset_t endOffsetInGroup, common::ValueVector* resultVector,
uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);

void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;

void scanValueToVector(transaction::Transaction* transaction, const ReadState& dataState,
uint64_t startOffset, uint64_t endOffset, common::ValueVector* resultVector,
uint64_t offsetInVector);
void scanOffsets(transaction::Transaction* transaction, const ReadState& state,
uint64_t* offsets, uint64_t index, uint64_t dataSize);

private:
void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr,
common::ValueVector* resultVector, common::page_idx_t overflowPageIdx);
bool canCommitInPlace(
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk) final;

private:
std::unique_ptr<InMemDiskArray<OverflowColumnChunkMetadata>> overflowMetadataDA;
// Main column stores indices of values in the dictionary
// The offset column stores the offsets for each index, and the data column stores the data in
// order. Values are never removed from the dictionary during in-place updates, only appended to
// the end.
std::unique_ptr<Column> dataColumn;
std::unique_ptr<Column> offsetColumn;
};

} // namespace storage
Expand Down
Loading

0 comments on commit a92953d

Please sign in to comment.