Skip to content

Commit

Permalink
Add compression option
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger authored and ray6080 committed Oct 2, 2023
1 parent f1f7b75 commit 5164b3e
Show file tree
Hide file tree
Showing 53 changed files with 417 additions and 260 deletions.
5 changes: 2 additions & 3 deletions examples/c/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "c_api/kuzu.h"

int main() {
kuzu_database* db = kuzu_database_init("" /* fill db path */, 0);
kuzu_database* db = kuzu_database_init("" /* fill db path */, kuzu_default_system_config());
kuzu_connection* conn = kuzu_connection_init(db);

kuzu_query_result* result;
Expand All @@ -18,8 +18,7 @@ int main() {
kuzu_query_result_destroy(result);

// Execute a simple query.
result = kuzu_connection_query(
conn, "MATCH (a:Person) RETURN a.name AS NAME, a.age AS AGE;");
result = kuzu_connection_query(conn, "MATCH (a:Person) RETURN a.name AS NAME, a.age AS AGE;");

// Fetch each value.
while (kuzu_query_result_has_next(result)) {
Expand Down
4 changes: 2 additions & 2 deletions examples/rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use kuzu::{Connection, Database, Error};
use kuzu::{Connection, Database, Error, SystemConfig};

fn main() -> Result<(), Error> {
let db = Database::new(
std::env::args()
.nth(1)
.expect("The first CLI argument should be the database path"),
0,
SystemConfig::default(),
)?;
let connection = Connection::new(&db)?;

Expand Down
12 changes: 8 additions & 4 deletions src/c_api/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
using namespace kuzu::main;
using namespace kuzu::common;

kuzu_database* kuzu_database_init(const char* database_path, uint64_t buffer_pool_size) {
kuzu_database* kuzu_database_init(const char* database_path, kuzu_system_config config) {
auto database = (kuzu_database*)malloc(sizeof(kuzu_database));
std::string database_path_str = database_path;
try {
database->_database = buffer_pool_size == 0 ?
new Database(database_path_str) :
new Database(database_path_str, SystemConfig(buffer_pool_size));
database->_database =
new Database(database_path_str, SystemConfig(config.buffer_pool_size,
config.max_num_threads, config.enable_compression));
} catch (Exception& e) {
free(database);
return nullptr;
Expand All @@ -32,3 +32,7 @@ void kuzu_database_destroy(kuzu_database* database) {
void kuzu_database_set_logging_level(const char* logging_level) {
Database::setLoggingLevel(logging_level);
}

kuzu_system_config kuzu_default_system_config() {
return {0 /*bufferPoolSize*/, 0 /*maxNumThreads*/, true /*enableCompression*/};
}
19 changes: 18 additions & 1 deletion src/include/c_api/kuzu.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ struct ArrowArray {
#define KUZU_C_API KUZU_API
#endif

/**
* @brief Stores runtime configuration for creating or opening a Database
*/
KUZU_C_API typedef struct {
// bufferPoolSize Max size of the buffer pool in bytes.
// The larger the buffer pool, the more data from the database files is kept in memory,
// reducing the amount of File I/O
uint64_t buffer_pool_size;
// The maximum number of threads to use during query execution
uint64_t max_num_threads;
// Whether or not to compress data on-disk for supported types
bool enable_compression;
} kuzu_system_config;

/**
* @brief kuzu_database manages all database components.
*/
Expand Down Expand Up @@ -235,7 +249,8 @@ KUZU_C_API typedef enum {
* @param buffer_pool_size The size of the buffer pool in bytes.
* @return The database instance.
*/
KUZU_C_API kuzu_database* kuzu_database_init(const char* database_path, uint64_t buffer_pool_size);
KUZU_C_API kuzu_database* kuzu_database_init(
const char* database_path, kuzu_system_config system_config);
/**
* @brief Destroys the kuzu database instance and frees the allocated memory.
* @param database The database instance to destroy.
Expand All @@ -248,6 +263,8 @@ KUZU_C_API void kuzu_database_destroy(kuzu_database* database);
*/
KUZU_C_API void kuzu_database_set_logging_level(const char* logging_level);

KUZU_C_API kuzu_system_config kuzu_default_system_config();

// Connection
/**
* @brief Allocates memory and creates a connection to the database. Caller is responsible for
Expand Down
14 changes: 8 additions & 6 deletions src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ namespace kuzu {
namespace main {

/**
* @brief Stores buffer pool size and max number of threads configurations.
* @brief Stores runtime configuration for creating or opening a Database
*/
struct KUZU_API SystemConfig {
/**
* @brief Creates a SystemConfig object with default buffer pool size and max num of threads.
*/
explicit SystemConfig();
/**
* @brief Creates a SystemConfig object.
* @param bufferPoolSize Max size of the buffer pool in bytes.
* The larger the buffer pool, the more data from the database files is kept in memory,
* reducing the amount of File I/O
* @param maxNumThreads The maximum number of threads to use during query execution
* @param enableCompression Whether or not to compress data on-disk for supported types
*/
explicit SystemConfig(uint64_t bufferPoolSize);
explicit SystemConfig(
uint64_t bufferPoolSize = -1u, uint64_t maxNumThreads = 0, bool enableCompression = true);

uint64_t bufferPoolSize;
uint64_t maxNumThreads;
bool enableCompression;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
localNodeGroup = std::make_unique<storage::NodeGroup>(
sharedState->tableSchema, sharedState->csvReaderConfig.get());
localNodeGroup = std::make_unique<storage::NodeGroup>(sharedState->tableSchema,
sharedState->csvReaderConfig.get(), sharedState->table->compressionEnabled());
}

inline bool canParallel() const final { return !copyNodeInfo.containsSerial; }
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/local_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class LocalStorage {
std::map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
storage::NodesStore* nodesStore;
storage::MemoryManager* mm;
bool enableCompression;
};

} // namespace storage
Expand Down
19 changes: 13 additions & 6 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class LocalColumnChunk {

class LocalColumn {
public:
explicit LocalColumn(NodeColumn* column) : column{column} {};
explicit LocalColumn(NodeColumn* column, bool enableCompression)
: column{column}, enableCompression{enableCompression} {};
virtual ~LocalColumn() = default;

virtual void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
Expand All @@ -101,25 +102,28 @@ class LocalColumn {
protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
NodeColumn* column;
bool enableCompression;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(NodeColumn* column) : LocalColumn{column} {};
explicit StringLocalColumn(NodeColumn* column, bool enableCompression)
: LocalColumn{column, enableCompression} {};

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class VarListLocalColumn : public LocalColumn {
public:
explicit VarListLocalColumn(NodeColumn* column) : LocalColumn{column} {};
explicit VarListLocalColumn(NodeColumn* column, bool enableCompression)
: LocalColumn{column, enableCompression} {};

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class StructLocalColumn : public LocalColumn {
public:
explicit StructLocalColumn(NodeColumn* column);
explicit StructLocalColumn(NodeColumn* column, bool enableCompression);

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
Expand All @@ -135,12 +139,14 @@ class StructLocalColumn : public LocalColumn {
};

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(NodeColumn* column);
static std::unique_ptr<LocalColumn> createLocalColumn(
NodeColumn* column, bool enableCompression);
};

class LocalTable {
public:
explicit LocalTable(NodeTable* table) : table{table} {};
explicit LocalTable(NodeTable* table, bool enableCompression)
: table{table}, enableCompression{enableCompression} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
Expand All @@ -157,6 +163,7 @@ class LocalTable {
private:
std::map<common::column_id_t, std::unique_ptr<LocalColumn>> columns;
NodeTable* table;
bool enableCompression;
};

} // namespace storage
Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace storage {

class StorageManager {
public:
StorageManager(catalog::Catalog& catalog, MemoryManager& memoryManager, WAL* wal);
StorageManager(
catalog::Catalog& catalog, MemoryManager& memoryManager, WAL* wal, bool enableCompression);

~StorageManager() = default;

Expand Down Expand Up @@ -39,6 +40,8 @@ class StorageManager {
inline BMFileHandle* getDataFH() const { return dataFH.get(); }
inline BMFileHandle* getMetadataFH() const { return metadataFH.get(); }

inline bool compressionEnabled() const { return enableCompression; }

private:
std::unique_ptr<BMFileHandle> dataFH;
std::unique_ptr<BMFileHandle> metadataFH;
Expand All @@ -47,6 +50,7 @@ class StorageManager {
WAL* wal;
std::unique_ptr<RelsStore> relsStore;
std::unique_ptr<NodesStore> nodesStore;
bool enableCompression;
};

} // namespace storage
Expand Down
20 changes: 12 additions & 8 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class ColumnChunk {
// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
explicit ColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true);
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression = true,
bool hasNullChunk = true);

virtual ~ColumnChunk() = default;

Expand Down Expand Up @@ -176,7 +177,8 @@ class BoolColumnChunk : public ColumnChunk {
BoolColumnChunk(
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL), std::move(csvReaderConfig),
hasNullChunk) {}
// Booleans are always compressed
false /* enableCompression */, hasNullChunk) {}

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

Expand Down Expand Up @@ -232,9 +234,10 @@ class NullColumnChunk : public BoolColumnChunk {

class FixedListColumnChunk : public ColumnChunk {
public:
FixedListColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
: ColumnChunk(std::move(dataType), std::move(csvReaderConfig), true /* hasNullChunk */) {}
FixedListColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression)
: ColumnChunk(std::move(dataType), std::move(csvReaderConfig), enableCompression,
true /* hasNullChunk */) {}

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
Expand All @@ -251,7 +254,8 @@ class SerialColumnChunk : public ColumnChunk {
public:
SerialColumnChunk()
: ColumnChunk{common::LogicalType(common::LogicalTypeID::SERIAL),
nullptr /* copyDescription */, false /* hasNullChunk */} {}
nullptr /* copyDescription */, false /*enableCompression*/,
false /* hasNullChunk */} {}

inline void initialize(common::offset_t numValues) final {
numBytesPerValue = 0;
Expand All @@ -262,8 +266,8 @@ class SerialColumnChunk : public ColumnChunk {
};

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(
const common::LogicalType& dataType, common::CSVReaderConfig* csvReaderConfig = nullptr);
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, common::CSVReaderConfig* csvReaderConfig = nullptr);
};

// BOOL
Expand Down
10 changes: 6 additions & 4 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class NodeColumn {
NodeColumn(common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats PropertyStatistics,
bool requireNullColumn = true);
bool enableCompression, bool requireNullColumn = true);
virtual ~NodeColumn() = default;

// Expose for feature store
Expand Down Expand Up @@ -130,14 +130,15 @@ class NodeColumn {
read_values_to_page_func_t readToPageFunc;
batch_lookup_func_t batchLookupFunc;
RWPropertyStats propertyStatistics;
bool enableCompression;
};

class BoolNodeColumn : public NodeColumn {
public:
BoolNodeColumn(const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
bool requireNullColumn = true);
bool enableCompression, bool requireNullColumn = true);
};

class NullNodeColumn : public NodeColumn {
Expand All @@ -146,7 +147,8 @@ class NullNodeColumn : public NodeColumn {
public:
NullNodeColumn(common::page_idx_t metaDAHPageIdx, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics);
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
bool enableCompression);

void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
Expand Down Expand Up @@ -182,7 +184,7 @@ struct NodeColumnFactory {
static std::unique_ptr<NodeColumn> createNodeColumn(const common::LogicalType& dataType,
const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics);
RWPropertyStats propertyStatistics, bool enableCompression);
};

} // namespace storage
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class TableData;

class NodeGroup {
public:
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig);
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig,
bool enableCompression);
explicit NodeGroup(TableData* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
Expand Down
4 changes: 3 additions & 1 deletion src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class NodeTable {
public:
NodeTable(BMFileHandle* dataFH, BMFileHandle* metadataFH,
NodesStoreStatsAndDeletedIDs* nodesStatisticsAndDeletedIDs, BufferManager& bufferManager,
WAL* wal, catalog::NodeTableSchema* nodeTableSchema);
WAL* wal, catalog::NodeTableSchema* nodeTableSchema, bool enableCompression);

void initializePKIndex(catalog::NodeTableSchema* nodeTableSchema);

Expand Down Expand Up @@ -72,6 +72,8 @@ class NodeTable {
void checkpointInMemory();
void rollbackInMemory();

inline bool compressionEnabled() const { return tableData->compressionEnabled(); }

private:
void insertPK(common::ValueVector* nodeIDVector, common::ValueVector* primaryKeyVector);
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
Expand Down
6 changes: 4 additions & 2 deletions src/include/storage/store/nodes_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace storage {
class NodesStore {
public:
NodesStore(BMFileHandle* dataFH, BMFileHandle* metadataFH, const catalog::Catalog& catalog,
BufferManager& bufferManager, WAL* wal);
BufferManager& bufferManager, WAL* wal, bool enableCompression);

inline PrimaryKeyIndex* getPKIndex(common::table_id_t tableID) {
return nodeTables[tableID]->getPKIndex();
Expand All @@ -31,7 +31,8 @@ class NodesStore {
nodeTables[tableID] = std::make_unique<NodeTable>(dataFH, metadataFH,
nodesStatisticsAndDeletedIDs.get(), *bufferManager, wal,
reinterpret_cast<catalog::NodeTableSchema*>(
catalog->getReadOnlyVersion()->getTableSchema(tableID)));
catalog->getReadOnlyVersion()->getTableSchema(tableID)),
enableCompression);
}
inline void removeNodeTable(common::table_id_t tableID) {
nodeTables.erase(tableID);
Expand Down Expand Up @@ -72,6 +73,7 @@ class NodesStore {
WAL* wal;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
bool enableCompression;
};

} // namespace storage
Expand Down
Loading

0 comments on commit 5164b3e

Please sign in to comment.