Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract Compression interface and initial integer packing #2004

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
2 changes: 1 addition & 1 deletion src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = fileInfo->getFileSize();
if (fileSize == -1) {
throw Exception(StringUtils::string_format("File {} not open.", fileInfo->path));
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.0.8.7";
constexpr char KUZU_VERSION[] = "v0.0.8.11";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileUtils {
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset);
FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes, uint64_t offset);
// This function is a no-op if either file, from or to, does not exist.
static void overwriteFile(const std::string& from, const std::string& to);
static void copyFile(const std::string& from, const std::string& to,
Expand Down
6 changes: 4 additions & 2 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class LocalColumn {
virtual void prepareCommit();

virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
void commitLocalChunkInPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
Expand All @@ -104,8 +108,6 @@ class StringLocalColumn : public LocalColumn {
explicit StringLocalColumn(NodeColumn* column) : LocalColumn{column} {};

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

class VarListLocalColumn : public LocalColumn {
Expand Down
26 changes: 20 additions & 6 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/type_utils.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "compression.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"
Expand All @@ -16,6 +17,7 @@ namespace kuzu {
namespace storage {

class NullColumnChunk;
class CompressionAlg;

struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
Expand All @@ -29,11 +31,13 @@ struct BaseColumnChunkMetadata {

struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;
CompressionMetadata compMeta;

ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages,
uint64_t numNodesInChunk, CompressionMetadata compMeta)
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk),
compMeta(compMeta) {}
};

struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
Expand Down Expand Up @@ -77,8 +81,8 @@ class ColumnChunk {

virtual void resetToEmpty();

// Include pages for null and children segments.
virtual common::page_idx_t getNumPages() const;
// Note that the startPageIdx is not known, so it will always be common::INVALID_PAGE_IDX
virtual ColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector, common::offset_t startPosInChunk);

Expand All @@ -91,7 +95,8 @@ class ColumnChunk {
virtual void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);
ColumnChunkMetadata flushBuffer(
BMFileHandle* dataFH, common::page_idx_t startPageIdx, const ColumnChunkMetadata& metadata);

// Returns the size of the data type in bytes
static uint32_t getDataTypeSizeInChunk(common::LogicalType& dataType);
Expand Down Expand Up @@ -128,6 +133,8 @@ class ColumnChunk {

inline void setNumValues(uint64_t numValues_) { this->numValues = numValues_; }

virtual void update(common::ValueVector* vector, common::vector_idx_t vectorIdx);

protected:
// Initializes the data buffer. Is (and should be) only called in constructor.
virtual void initialize(common::offset_t capacity);
Expand Down Expand Up @@ -156,11 +163,17 @@ class ColumnChunk {
common::LogicalType dataType;
uint32_t numBytesPerValue;
uint64_t bufferSize;
uint64_t capacity;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
uint64_t numValues;
std::function<ColumnChunkMetadata(
const uint8_t*, uint64_t, BMFileHandle*, common::page_idx_t, const ColumnChunkMetadata&)>
flushBufferFunction;
std::function<ColumnChunkMetadata(const uint8_t*, uint64_t, uint64_t, uint64_t)>
getMetadataFunction;
};

template<>
Expand All @@ -175,6 +188,7 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {
return common::NullMask::isNull((uint64_t*)buffer.get(), pos);
}

// Stored as bitpacked booleans in-memory and on-disk
class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(
Expand Down
Loading