Skip to content

Commit

Permalink
implemented Abstract Compression interface and initial integer packing
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Sep 25, 2023
1 parent c22396f commit 4f36e53
Show file tree
Hide file tree
Showing 34 changed files with 4,869 additions and 272 deletions.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
2 changes: 1 addition & 1 deletion src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = fileInfo->getFileSize();
if (fileSize == -1) {
throw Exception(StringUtils::string_format("File {} not open.", fileInfo->path));
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.0.8.7";
constexpr char KUZU_VERSION[] = "v0.0.8.8";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileUtils {
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset);
FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes, uint64_t offset);
// This function is a no-op if either file, from or to, does not exist.
static void overwriteFile(const std::string& from, const std::string& to);
static void copyFile(const std::string& from, const std::string& to,
Expand Down
6 changes: 4 additions & 2 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class LocalColumn {
virtual void prepareCommit();

virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
void commitLocalChunkInPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
Expand All @@ -104,8 +108,6 @@ class StringLocalColumn : public LocalColumn {
explicit StringLocalColumn(NodeColumn* column) : LocalColumn{column} {};

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

class VarListLocalColumn : public LocalColumn {
Expand Down
34 changes: 28 additions & 6 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/type_utils.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "compression.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"
Expand All @@ -16,6 +17,16 @@ namespace kuzu {
namespace storage {

class NullColumnChunk;
class CompressionAlg;

struct PreliminaryColumnChunkMetadata {
common::page_idx_t numPages;
uint64_t numValues;
CompressionMetadata compMeta;
PreliminaryColumnChunkMetadata(
common::page_idx_t numPages, common::page_idx_t numValues, CompressionMetadata compMeta)
: numPages{numPages}, numValues{numValues}, compMeta{compMeta} {}
};

struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
Expand All @@ -29,11 +40,13 @@ struct BaseColumnChunkMetadata {

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
CompressionMetadata compMeta;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages,
uint64_t numNodesInChunk, CompressionMetadata compMeta)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk),
compMeta(compMeta) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
Expand Down Expand Up @@ -77,8 +90,7 @@ class ColumnChunk {

virtual void resetToEmpty();

// Include pages for null and children segments.
virtual common::page_idx_t getNumPages() const;
virtual PreliminaryColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector, common::offset_t startPosInChunk);

Expand All @@ -91,7 +103,8 @@ class ColumnChunk {
virtual void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);
ColumnChunkMetadata flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx,
const PreliminaryColumnChunkMetadata& metadata);

// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);
Expand Down Expand Up @@ -128,6 +141,8 @@ class ColumnChunk {

inline void setNumValues(uint64_t numValues_) { this->numValues = numValues_; }

virtual void update(common::ValueVector* vector, common::vector_idx_t vectorIdx);

protected:
// Initializes the data buffer. Is (and should be) only called in constructor.
virtual void initialize(common::offset_t capacity);
Expand Down Expand Up @@ -156,11 +171,17 @@ class ColumnChunk {
common::LogicalType dataType;
uint32_t numBytesPerValue;
uint64_t bufferSize;
uint64_t capacity;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
uint64_t numValues;
std::function<ColumnChunkMetadata(const uint8_t*, uint64_t, BMFileHandle*, common::page_idx_t,
const PreliminaryColumnChunkMetadata&)>
flushBufferFunction;
std::function<PreliminaryColumnChunkMetadata(const uint8_t*, uint64_t, uint64_t, uint64_t)>
getMetadataFunction;
};

template<>
Expand All @@ -175,6 +196,7 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
return common::NullMask::isNull((uint64_t*)buffer.get(), pos);
}

// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(
Expand Down
Loading

0 comments on commit 4f36e53

Please sign in to comment.