Skip to content

Commit

Permalink
Expose table properties collector & reader to C API
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Jul 3, 2024
1 parent 1f589a3 commit 232953c
Show file tree
Hide file tree
Showing 3 changed files with 456 additions and 0 deletions.
247 changes: 247 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/universal_compaction.h"
#include "rocksdb/utilities/backup_engine.h"
#include "rocksdb/utilities/checkpoint.h"
Expand Down Expand Up @@ -74,6 +75,7 @@ using ROCKSDB_NAMESPACE::CuckooTableOptions;
using ROCKSDB_NAMESPACE::DB;
using ROCKSDB_NAMESPACE::DBOptions;
using ROCKSDB_NAMESPACE::DbPath;
using ROCKSDB_NAMESPACE::EntryType;
using ROCKSDB_NAMESPACE::Env;
using ROCKSDB_NAMESPACE::EnvOptions;
using ROCKSDB_NAMESPACE::FileLock;
Expand Down Expand Up @@ -108,6 +110,7 @@ using ROCKSDB_NAMESPACE::Range;
using ROCKSDB_NAMESPACE::RateLimiter;
using ROCKSDB_NAMESPACE::ReadOptions;
using ROCKSDB_NAMESPACE::RestoreOptions;
using ROCKSDB_NAMESPACE::SequenceNumber;
using ROCKSDB_NAMESPACE::SequentialFile;
using ROCKSDB_NAMESPACE::Slice;
using ROCKSDB_NAMESPACE::SliceParts;
Expand All @@ -117,12 +120,16 @@ using ROCKSDB_NAMESPACE::SstFileMetaData;
using ROCKSDB_NAMESPACE::SstFileWriter;
using ROCKSDB_NAMESPACE::Status;
using ROCKSDB_NAMESPACE::StderrLogger;
using ROCKSDB_NAMESPACE::TableProperties;
using ROCKSDB_NAMESPACE::TablePropertiesCollection;
using ROCKSDB_NAMESPACE::TablePropertiesCollector;
using ROCKSDB_NAMESPACE::TablePropertiesCollectorFactory;
using ROCKSDB_NAMESPACE::Transaction;
using ROCKSDB_NAMESPACE::TransactionDB;
using ROCKSDB_NAMESPACE::TransactionDBOptions;
using ROCKSDB_NAMESPACE::TransactionLogIterator;
using ROCKSDB_NAMESPACE::TransactionOptions;
using ROCKSDB_NAMESPACE::UserCollectedProperties;
using ROCKSDB_NAMESPACE::WaitForCompactOptions;
using ROCKSDB_NAMESPACE::WALRecoveryMode;
using ROCKSDB_NAMESPACE::WritableFile;
Expand Down Expand Up @@ -568,6 +575,98 @@ struct rocksdb_callback_logger_t : public Logger {
void* priv_ = nullptr;
};

void AddUserCollectedProperty(void* raw, const char* key_data, size_t key_len,
const char* value_data, size_t value_len) {
UserCollectedProperties* properties =
reinterpret_cast<UserCollectedProperties*>(raw);
std::string key(key_data, key_len);
std::string value(value_data, value_len);
properties->insert_or_assign(std::move(key), std::move(value));
}

struct rocksdb_table_properties_collector_t : public TablePropertiesCollector {
void* state_;
void (*destructor_)(void*);
const char* (*name_)(void*);
void (*add_user_key_)(void*, const char* key_data, size_t key_len,
const char* value_data, size_t value_len,
int entry_type, uint64_t seq, uint64_t file_size);
void (*block_add_)(void*, uint64_t block_uncomp_bytes,
uint64_t block_compressed_bytes_fast,
uint64_t block_compressed_bytes_slow);
void (*finish_properties_)(void*, void* properties,
rocksdb_add_user_collected_properties adder);
void (*get_readable_properties_)(void*, void* properties,
rocksdb_add_user_collected_properties adder);

~rocksdb_table_properties_collector_t() override { (*destructor_)(state_); }

Status AddUserKey(const Slice& key, const Slice& value, EntryType type,
SequenceNumber seq, uint64_t file_size) override {
add_user_key_(state_, key.data(), key.size(), value.data(), value.size(),
type, seq, file_size);
return Status::OK();
}

void BlockAdd(uint64_t block_uncomp_bytes,
uint64_t block_compressed_bytes_fast,
uint64_t block_compressed_bytes_slow) override {
(*block_add_)(state_, block_uncomp_bytes, block_compressed_bytes_fast,
block_compressed_bytes_slow);
}

Status Finish(UserCollectedProperties* properties) override {
(*finish_properties_)(state_, properties, AddUserCollectedProperty);
return Status::OK();
}

UserCollectedProperties GetReadableProperties() const override {
UserCollectedProperties properties;
(*get_readable_properties_)(state_, &properties, AddUserCollectedProperty);
return properties;
}

const char* Name() const override { return (*name_)(state_); };
};

struct rocksdb_table_properties_collector_factory_context_t
: public TablePropertiesCollectorFactory::Context {};

struct TablePropertiesCollectorFactoryWrapper
: public TablePropertiesCollectorFactory {
void* state_;
void (*destructor_)(void*);
rocksdb_table_properties_collector_t* (*create_table_properties_collector_)(
void*, const rocksdb_table_properties_collector_factory_context_t* ctx);
const char* (*name_)(void*);

~TablePropertiesCollectorFactoryWrapper() override { (*destructor_)(state_); }

TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context arg) override {
rocksdb_table_properties_collector_factory_context_t context;
context.column_family_id = arg.column_family_id;
context.level_at_creation = arg.level_at_creation;
return create_table_properties_collector_(state_, &context);
}

const char* Name() const override { return (*name_)(state_); };
};

struct rocksdb_table_properties_collector_factory_t {
std::shared_ptr<TablePropertiesCollectorFactoryWrapper> rep;
};

struct rocksdb_table_properties_collection_t {
TablePropertiesCollection rep;
TablePropertiesCollection::iterator it;
};

struct rocksdb_table_properties_t {
std::string name;
std::shared_ptr<const TableProperties> properties;
};

static bool SaveError(char** errptr, const Status& s) {
assert(errptr != nullptr);
if (s.ok()) {
Expand Down Expand Up @@ -6937,6 +7036,12 @@ void rocksdb_options_set_avoid_unnecessary_blocking_io(rocksdb_options_t* opt,
opt->rep.avoid_unnecessary_blocking_io = val;
}

void rocksdb_options_add_table_properties_collector_factory(
rocksdb_options_t* opt,
rocksdb_table_properties_collector_factory_t* factory) {
opt->rep.table_properties_collector_factories.push_back(factory->rep);
}

unsigned char rocksdb_options_get_avoid_unnecessary_blocking_io(
rocksdb_options_t* opt) {
return opt->rep.avoid_unnecessary_blocking_io;
Expand Down Expand Up @@ -7014,6 +7119,148 @@ double rocksdb_statistics_histogram_data_get_min(
return data->rep.min;
}

rocksdb_table_properties_collector_factory_t*
rocksdb_table_properties_collector_factory_create(
void* state, void (*destructor)(void*),
rocksdb_table_properties_collector_t* (*create_table_properties_collector)(
void*, const rocksdb_table_properties_collector_factory_context_t* ctx),
const char* (*name)(void*)) {
auto* factory = new rocksdb_table_properties_collector_factory_t;
factory->rep = std::make_shared<TablePropertiesCollectorFactoryWrapper>();
factory->rep->name_ = name;
factory->rep->state_ = state;
factory->rep->destructor_ = destructor;
factory->rep->create_table_properties_collector_ =
create_table_properties_collector;
return factory;
}

void rocksdb_table_properties_collector_factory_destroy(
rocksdb_table_properties_collector_factory_t* factory) {
delete factory;
}

int rocksdb_table_properties_collector_factory_context_level_at_creation(
const rocksdb_table_properties_collector_factory_context_t* ctx) {
return ctx->level_at_creation;
}

rocksdb_table_properties_collector_t* rocksdb_table_properties_collector_create(
void* state, void (*destructor)(void*), const char* (*name)(void*),
void (*add_user_key)(void*, const char* key_data, size_t key_len,
const char* value_data, size_t value_len,
int entry_type, uint64_t seq, uint64_t file_size),
void (*block_add)(void*, uint64_t block_uncomp_bytes,
uint64_t block_compressed_bytes_fast,
uint64_t block_compressed_bytes_slow),
void (*finish_properties)(void*, void* properties,
rocksdb_add_user_collected_properties adder),
void (*get_readable_properties)(
void*, void* properties, rocksdb_add_user_collected_properties adder)) {
auto* collector = new rocksdb_table_properties_collector_t;
collector->state_ = state;
collector->destructor_ = destructor;
collector->add_user_key_ = add_user_key;
collector->block_add_ = block_add;
collector->finish_properties_ = finish_properties;
collector->get_readable_properties_ = get_readable_properties;
collector->name_ = name;
return collector;
}

void rocksdb_table_properties_collector_destroy(
rocksdb_table_properties_collector_t* collector) {
delete collector;
}

rocksdb_table_properties_collection_t* rocksdb_get_properties_of_all_range(
rocksdb_t* db, rocksdb_column_family_handle_t* cf, char** errptr) {
TablePropertiesCollection collection;
if (SaveError(errptr, db->rep->GetPropertiesOfAllTables(
cf ? cf->rep : db->rep->DefaultColumnFamily(),
&collection))) {
return nullptr;
}

auto* ret = new rocksdb_table_properties_collection_t;
ret->rep = std::move(collection);
ret->it = ret->rep.begin();
return ret;
}

rocksdb_table_properties_collection_t*
rocksdb_get_properties_of_tables_in_range(
rocksdb_t* db, rocksdb_column_family_handle_t* cf, size_t num_ranges,
const char* const* range_start_key, const size_t* range_start_key_len,
const char* const* range_limit_key, const size_t* range_limit_key_len,
char** errptr) {
std::vector<Range> ranges;
for (size_t i = 0; i < num_ranges; ++i) {
ranges.push_back(Range(Slice(range_start_key[i], range_start_key_len[i]),
Slice(range_limit_key[i], range_limit_key_len[i])));
}

TablePropertiesCollection collection;
if (SaveError(errptr, db->rep->GetPropertiesOfTablesInRange(
cf ? cf->rep : db->rep->DefaultColumnFamily(),
ranges.data(), ranges.size(), &collection))) {
return nullptr;
}

auto* ret = new rocksdb_table_properties_collection_t;
ret->rep = std::move(collection);
ret->it = ret->rep.begin();
return ret;
}

void rocksdb_table_properties_collection_destroy(
rocksdb_table_properties_collection_t* collection) {
delete collection;
}

rocksdb_table_properties_t* rocksdb_table_properties_collection_next(
rocksdb_table_properties_collection_t* collection) {
if (collection->it == collection->rep.end()) {
return nullptr;
}

auto* properties = new rocksdb_table_properties_t;
properties->name = collection->it->first;
properties->properties = collection->it->second;

collection->it++;

return properties;
}

void rocksdb_table_properties_destroy(rocksdb_table_properties_t* properties) {
delete properties;
}

void rocksdb_table_properties_user_collected(
rocksdb_table_properties_t* table_properties, void* state,
void (*property_reader)(void* state, const char* key_data, size_t key_ken,
const char* value_data, size_t value_len)) {
for (auto&& it : table_properties->properties->user_collected_properties) {
property_reader(state, it.first.data(), it.first.size(), it.second.data(),
it.second.size());
}
}

void rocksdb_table_properties_readable(
rocksdb_table_properties_t* table_properties, void* state,
void (*property_reader)(void* state, const char* key_data, size_t key_ken,
const char* value_data, size_t value_len)) {
for (auto&& it : table_properties->properties->readable_properties) {
property_reader(state, it.first.data(), it.first.size(), it.second.data(),
it.second.size());
}
}
const char* rocksdb_table_properties_table_name(
rocksdb_table_properties_t* table_properties) {
return table_properties->name.c_str();
}

void rocksdb_wait_for_compact(rocksdb_t* db,
rocksdb_wait_for_compact_options_t* options,
char** errptr) {
Expand Down
Loading

0 comments on commit 232953c

Please sign in to comment.