Skip to content

Commit

Permalink
Merge pull request #95 from objectbox/data-visitor
Browse files Browse the repository at this point in the history
Data visitor
  • Loading branch information
vaind committed Mar 5, 2020
2 parents 17fc5e1 + 713d59a commit 315de60
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 23 deletions.
8 changes: 8 additions & 0 deletions lib/src/bindings/bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class _ObjectBoxBindings {
// common functions
void Function(Pointer<Int32> major, Pointer<Int32> minor, Pointer<Int32> patch) obx_version;
Pointer<Utf8> Function() obx_version_string;
int Function() obx_supports_bytes_array;

obx_free_dart_t<OBX_bytes_array> obx_bytes_array_free;
obx_free_dart_t<OBX_id_array> obx_id_array_free;
Expand Down Expand Up @@ -68,6 +69,10 @@ class _ObjectBoxBindings {
int Function(Pointer<Void> box, int id, Pointer<Pointer<Uint8>> data, Pointer<IntPtr> size) obx_box_get;
Pointer<OBX_bytes_array> Function(Pointer<Void> box, Pointer<OBX_id_array> ids) obx_box_get_many;
Pointer<OBX_bytes_array> Function(Pointer<Void> box) obx_box_get_all;
int Function(Pointer<Void> box, Pointer<OBX_id_array> ids, Pointer<NativeFunction<obx_data_visitor_native_t>> visitor,
Pointer<Void> user_data) obx_box_visit_many;
int Function(Pointer<Void> box, Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data)
obx_box_visit_all;
int Function(Pointer<Void> box, int id_or_zero) obx_box_id_for_put;
int Function(Pointer<Void> box, int count, Pointer<Uint64> out_first_id) obx_box_ids_for_put;
int Function(Pointer<Void> box, int id, Pointer<Uint8> data, int size, int mode) obx_box_put;
Expand Down Expand Up @@ -162,6 +167,7 @@ class _ObjectBoxBindings {
// common functions
obx_version = _fn<obx_version_native_t>("obx_version").asFunction();
obx_version_string = _fn<obx_version_string_native_t>("obx_version_string").asFunction();
obx_supports_bytes_array = _fn<obx_supports_bytes_array_native_t>("obx_supports_bytes_array").asFunction();
obx_bytes_array_free = _fn<obx_free_native_t<Pointer<OBX_bytes_array>>>("obx_bytes_array_free").asFunction();
obx_id_array_free = _fn<obx_free_native_t<Pointer<OBX_id_array>>>("obx_id_array_free").asFunction();
// obx_string_array_free = _fn<obx_free_native_t<Pointer<>>>("obx_string_array_free").asFunction();
Expand Down Expand Up @@ -215,6 +221,8 @@ class _ObjectBoxBindings {
obx_box_get = _fn<obx_box_get_native_t>("obx_box_get").asFunction();
obx_box_get_many = _fn<obx_box_get_many_native_t>("obx_box_get_many").asFunction();
obx_box_get_all = _fn<obx_box_get_all_native_t>("obx_box_get_all").asFunction();
obx_box_visit_many = _fn<obx_box_visit_many_native_t>("obx_box_visit_many").asFunction();
obx_box_visit_all = _fn<obx_box_visit_all_native_t>("obx_box_visit_all").asFunction();
obx_box_id_for_put = _fn<obx_box_id_for_put_native_t>("obx_box_id_for_put").asFunction();
obx_box_ids_for_put = _fn<obx_box_ids_for_put_native_t>("obx_box_ids_for_put").asFunction();
obx_box_put = _fn<obx_box_put_native_t>("obx_box_put").asFunction();
Expand Down
72 changes: 72 additions & 0 deletions lib/src/bindings/data_visitor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import 'dart:ffi';
import 'signatures.dart';
import "package:ffi/ffi.dart" show allocate, free;

/// This file implements C call forwarding using a trampoline approach.
///
/// When you want to pass a dart callback to a C function you cannot use lambdas and instead the callback must be
/// a static function, otherwise `Pointer.fromFunction()` called with your function won't compile.
/// Since static functions don't have any state, you must either rely on a global state or use a "userData" pointer
/// pass-through functionality provided by a C function.
///
/// The DataVisitor class tries to alleviate the burden of managing this and instead allows using lambdas from
/// user-code, internally mapping the C calls to the appropriate lambda.
///
/// Sample usage:
/// final results = <T>[];
/// final visitor = DataVisitor((Pointer<Uint8> dataPtr, int length) {
/// final bytes = dataPtr.asTypedList(length);
/// results.add(_fbManager.unmarshal(bytes));
/// return true; // return value usually indicates to the C function whether it should continue.
/// });
///
/// final err = bindings.obx_query_visit(_cQuery, visitor.fn, visitor.userData, offset, limit);
/// visitor.close(); // make sure to close the visitor, unregistering the callback it from the forwarder
/// checkObx(err);
int _lastId = 0;
final _callbacks = <int, bool Function(Pointer<Uint8> dataPtr, int length)>{};

// called from C, forwards calls to the actual callback registered at the given ID
int _forwarder(Pointer<Void> callbackId, Pointer<Uint8> dataPtr, int size) {
if (callbackId == null || callbackId.address == 0) {
throw Exception("Data-visitor callback issued with NULL user_data (callback ID)");
}

return _callbacks[callbackId.cast<Int64>().value](dataPtr, size) ? 1 : 0;
}

/// A data visitor wrapper/forwarder to be used where obx_data_visitor is expected.
class DataVisitor {
int _id;
Pointer<Int64> _idPtr;

Pointer<NativeFunction<obx_data_visitor_native_t>> get fn => Pointer.fromFunction(_forwarder, 0);

Pointer<Void> get userData => _idPtr.cast<Void>();

DataVisitor(bool Function(Pointer<Uint8> dataPtr, int length) callback) {
// cycle through ids until we find an empty slot
_lastId++;
var initialId = _lastId;
while (_callbacks.containsKey(_lastId)) {
_lastId++;

if (initialId == _lastId) {
throw Exception("Data-visitor callbacks queue full - can't allocate another");
}
}
// register the visitor
_id = _lastId;
_callbacks[_id] = callback;

_idPtr = allocate<Int64>();
_idPtr.value = _id;
}

void close() {
// unregister the visitor
_callbacks.remove(_id);
free(_idPtr);
}
}
17 changes: 12 additions & 5 deletions lib/src/bindings/signatures.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import 'package:ffi/ffi.dart';
// common functions
typedef obx_version_native_t = Void Function(Pointer<Int32> major, Pointer<Int32> minor, Pointer<Int32> patch);
typedef obx_version_string_native_t = Pointer<Utf8> Function();
typedef obx_supports_bytes_array_native_t = Uint8 Function();

typedef obx_free_dart_t<T extends NativeType> = void Function(Pointer<T> ptr);
typedef obx_free_native_t<T extends NativeType> = Void Function(T ptr); // no Pointer<T>, code analysis fails on usage

typedef obx_data_visitor_native_t = Uint8 Function(Pointer<Void> user_data, Pointer<Uint8> data, IntPtr size);

// error info
typedef obx_last_error_code_native_t = Int32 Function();
typedef obx_last_error_message_native_t = Pointer<Utf8> Function();
Expand Down Expand Up @@ -58,6 +61,10 @@ typedef obx_box_get_native_t = Int32 Function(
Pointer<Void> box, Uint64 id, Pointer<Pointer<Uint8>> data, Pointer<IntPtr> size);
typedef obx_box_get_many_native_t = Pointer<OBX_bytes_array> Function(Pointer<Void> box, Pointer<OBX_id_array> ids);
typedef obx_box_get_all_native_t = Pointer<OBX_bytes_array> Function(Pointer<Void> box);
typedef obx_box_visit_many_native_t = Int32 Function(Pointer<Void> box, Pointer<OBX_id_array> ids,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data);
typedef obx_box_visit_all_native_t = Int32 Function(
Pointer<Void> box, Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data);
typedef obx_box_id_for_put_native_t = Uint64 Function(Pointer<Void> box, Uint64 id_or_zero);
typedef obx_box_ids_for_put_native_t = Int32 Function(Pointer<Void> box, Uint64 count, Pointer<Uint64> out_first_id);
typedef obx_box_put_native_t = Int32 Function(
Expand All @@ -76,7 +83,7 @@ typedef obx_box_is_empty_native_t = Int32 Function(Pointer<Void> box, Pointer<Ui
// typedef Pointer<Int8> -> char[]
// typedef Pointer<Int32> -> int (e.g. obx_qb_cond);

// query builider
// query builder
typedef obx_query_builder_native_t = Pointer<Void> Function(Pointer<Void> store, Uint32 entity_id);
typedef obx_query_builder_dart_t = Pointer<Void> Function(Pointer<Void> store, int entity_id);

Expand Down Expand Up @@ -147,10 +154,10 @@ typedef obx_query_count_dart_t = int Function(Pointer<Void> query, Pointer<Uint6

typedef obx_query_describe_t = Pointer<Utf8> Function(Pointer<Void> query);

typedef obx_query_visit_native_t = Int32 Function(
Pointer<Void> query, Pointer<Void> visitor, Pointer<Void> user_data, Uint64 offset, Uint64 limit);
typedef obx_query_visit_dart_t = int Function(
Pointer<Void> query, Pointer<Void> visitor, Pointer<Void> user_data, int offset, int limit);
typedef obx_query_visit_native_t = Int32 Function(Pointer<Void> query,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data, Uint64 offset, Uint64 limit);
typedef obx_query_visit_dart_t = int Function(Pointer<Void> query,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data, int offset, int limit);

// Utilities

Expand Down
55 changes: 43 additions & 12 deletions lib/src/box.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'common.dart';
import "store.dart";
import "bindings/bindings.dart";
import "bindings/constants.dart";
import "bindings/data_visitor.dart";
import "bindings/flatbuffers.dart";
import "bindings/helpers.dart";
import "bindings/structs.dart";
Expand All @@ -24,8 +25,9 @@ class Box<T> {
ModelEntity _modelEntity;
ObjectReader<T> _entityReader;
OBXFlatbuffersManager<T> _fbManager;
final bool _supportsBytesArrays;

Box(this._store) {
Box(this._store) : _supportsBytesArrays = bindings.obx_supports_bytes_array() == 1 {
EntityDefinition<T> entityDefs = _store.entityDef<T>();
_modelEntity = entityDefs.model;
_entityReader = entityDefs.reader;
Expand Down Expand Up @@ -160,34 +162,63 @@ class Box<T> {
}
}

List<T> _getMany(bool allowMissing, Pointer<OBX_bytes_array> Function() cCall) {
List<T> _getMany(
bool allowMissing, Pointer<OBX_bytes_array> Function() cGetArray, void Function(DataVisitor) cVisit) {
return _store.runInTransaction(TxMode.Read, () {
final bytesArray = cCall();
try {
return _fbManager.unmarshalArray(bytesArray, allowMissing: allowMissing);
} finally {
bindings.obx_bytes_array_free(bytesArray);
if (_supportsBytesArrays) {
final bytesArray = cGetArray();
try {
return _fbManager.unmarshalArray(bytesArray, allowMissing: allowMissing);
} finally {
bindings.obx_bytes_array_free(bytesArray);
}
} else {
final results = <T>[];
final visitor = DataVisitor((Pointer<Uint8> dataPtr, int length) {
if (dataPtr == null || dataPtr.address == 0 || length == 0) {
if (allowMissing) {
results.add(null);
return true;
} else {
throw Exception('Object not found');
}
}
final bytes = dataPtr.asTypedList(length);
results.add(_fbManager.unmarshal(bytes));
return true;
});

try {
cVisit(visitor);
} finally {
visitor.close();
}
return results;
}
});
}

/// Returns a list of [ids.length] Objects of type T, each corresponding to the location of its ID in [ids].
/// Non-existant IDs become null.
/// Non-existent IDs become null.
List<T> getMany(List<int> ids) {
if (ids.isEmpty) return [];

const bool allowMissing = true; // returns null if null is encountered in the data found
const bool allowMissing = true; // result includes null if an object is missing
return OBX_id_array.executeWith(
ids,
(ptr) => _getMany(allowMissing,
() => checkObxPtr(bindings.obx_box_get_many(_cBox, ptr), "failed to get many objects from box")));
(ptr) => _getMany(
allowMissing,
() => checkObxPtr(bindings.obx_box_get_many(_cBox, ptr), "failed to get many objects from box"),
(DataVisitor visitor) => checkObx(bindings.obx_box_visit_many(_cBox, ptr, visitor.fn, visitor.userData))));
}

/// Returns all stored objects in this Box.
List<T> getAll() {
const bool allowMissing = false; // throw if null is encountered in the data found
return _getMany(
allowMissing, () => checkObxPtr(bindings.obx_box_get_all(_cBox), "failed to get all objects from box"));
allowMissing,
() => checkObxPtr(bindings.obx_box_get_all(_cBox), "failed to get all objects from box"),
(DataVisitor visitor) => checkObx(bindings.obx_box_visit_all(_cBox, visitor.fn, visitor.userData)));
}

/// Returns a builder to create queries for Object matching supplied criteria.
Expand Down
25 changes: 20 additions & 5 deletions lib/src/query/query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "../store.dart";
import "../common.dart";
import "../bindings/bindings.dart";
import "../bindings/constants.dart";
import "../bindings/data_visitor.dart";
import "../bindings/flatbuffers.dart";
import "../bindings/helpers.dart";
import "../bindings/structs.dart";
Expand Down Expand Up @@ -571,11 +572,25 @@ class Query<T> {

List<T> find({int offset = 0, int limit = 0}) {
return _store.runInTransaction(TxMode.Read, () {
final bytesArray = checkObxPtr(bindings.obx_query_find(_cQuery, offset, limit), "find");
try {
return _fbManager.unmarshalArray(bytesArray);
} finally {
bindings.obx_bytes_array_free(bytesArray);
if (bindings.obx_supports_bytes_array() == 1) {
final bytesArray = checkObxPtr(bindings.obx_query_find(_cQuery, offset, limit), "find");
try {
return _fbManager.unmarshalArray(bytesArray);
} finally {
bindings.obx_bytes_array_free(bytesArray);
}
} else {
final results = <T>[];
final visitor = DataVisitor((Pointer<Uint8> dataPtr, int length) {
final bytes = dataPtr.asTypedList(length);
results.add(_fbManager.unmarshal(bytes));
return true;
});

final err = bindings.obx_query_visit(_cQuery, visitor.fn, visitor.userData, offset, limit);
visitor.close();
checkObx(err);
return results;
}
});
}
Expand Down
24 changes: 23 additions & 1 deletion test/box_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,29 @@ void main() {
}
});

test(".getMany correctly handles non-existant items", () {
test(".getAll/getMany works on large arrays", () {
// This would fail on 32-bit system if objectbox-c obx_supports_bytes_array() wasn't respected
final length = 10 * 1000;
final largeString = 'A' * length;
expect(largeString.length, length);

box.put(TestEntity(tString: largeString));
box.put(TestEntity(tString: largeString));

List<TestEntity> items = box.getAll();
expect(items.length, 2);
expect(items[0].tString, largeString);
expect(items[1].tString, largeString);

box.put(TestEntity(tString: largeString));

items = box.getMany([1, 2]);
expect(items.length, 2);
expect(items[0].tString, largeString);
expect(items[1].tString, largeString);
});

test(".getMany correctly handles non-existent items", () {
final List<TestEntity> items = ["One", "Two"].map((s) => TestEntity(tString: s)).toList();
final List<int> ids = box.putMany(items);
int otherId = 1;
Expand Down
16 changes: 16 additions & 0 deletions test/query_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ void main() {
q.close();
});

test(".find works on large arrays", () {
// This would fail on 32-bit system if objectbox-c obx_supports_bytes_array() wasn't respected
final length = 10 * 1000;
final largeString = 'A' * length;
expect(largeString.length, length);

box.put(TestEntity(tString: largeString));
box.put(TestEntity(tString: largeString));
box.put(TestEntity(tString: largeString));

List<TestEntity> items = box.query(TestEntity_.id.lessThan(3)).build().find();
expect(items.length, 2);
expect(items[0].tString, largeString);
expect(items[1].tString, largeString);
});

test(".count items after grouping with and/or", () {
box.put(TestEntity(tString: "Hello"));
box.put(TestEntity(tString: "Goodbye"));
Expand Down

0 comments on commit 315de60

Please sign in to comment.