Skip to content

Commit

Permalink
enhancement: add find_many (#1469)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman authored Nov 29, 2023
1 parent fbe5355 commit ba28add
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 68 deletions.
4 changes: 2 additions & 2 deletions packages/fuel-indexer-api-server/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub(crate) fn check_wasm_toolchain_version(data: Vec<u8>) -> anyhow::Result<Stri
Function::new_typed(&mut store, |_: i64, _: i32, _: i32| 0i32),
);
exports.insert(
"ff_single_select".to_string(),
Function::new_typed(&mut store, |_: i64, _: i32, _: i32| 0i32),
"ff_find_many".to_string(),
Function::new_typed(&mut store, |_: i64, _: i64, _: i32, _: i32| 0i32),
);
exports.insert(
"ff_early_exit".to_string(),
Expand Down
13 changes: 13 additions & 0 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ pub async fn get_object(
Ok(row.get(0))
}

/// Fetch multiple blobs of serialized `FtColumn`s from the database.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn get_objects(
conn: &mut PoolConnection<Postgres>,
query: String,
) -> sqlx::Result<Vec<Vec<u8>>> {
let mut builder = sqlx::QueryBuilder::new(query);
let query = builder.build();
let rows = query.fetch_all(conn).await?;
let objects = rows.iter().map(|r| r.get(0)).collect::<Vec<Vec<u8>>>();
Ok(objects)
}

/// Run database migrations.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn run_migration(conn: &mut PoolConnection<Postgres>) -> sqlx::Result<()> {
Expand Down
10 changes: 10 additions & 0 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ pub async fn get_object(
}
}

/// Fetch multiple blobs of serialized `FtColumns` from the database.
pub async fn get_objects(
conn: &mut IndexerConnection,
query: String,
) -> sqlx::Result<Vec<Vec<u8>>> {
match conn {
IndexerConnection::Postgres(ref mut c) => postgres::get_objects(c, query).await,
}
}

/// Run an arbitrary query and fetch all results.
///
/// Note that if the results of the query can't be converted to `JsonValue`, this function
Expand Down
196 changes: 158 additions & 38 deletions packages/fuel-indexer-plugin/src/find.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,134 @@
use fuel_indexer_types::scalar::{Boolean, UID};
use sqlparser::ast as sql;

/// Represents a filter that returns a single results.
pub struct SingleFilter<T> {
filter: String,
phantom: std::marker::PhantomData<T>,
}

impl<T> std::fmt::Display for SingleFilter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} LIMIT 1", self.filter)?;
Ok(())
}
}
/// Represents a filter with a an optional LIMIT clause that returns many
/// results.
pub struct ManyFilter<T> {
filter: String,
limit: Option<usize>,
phantom: std::marker::PhantomData<T>,
}

impl<T> ManyFilter<T> {
pub fn limit(&self) -> Option<usize> {
self.limit
}
}

impl<T> std::fmt::Display for ManyFilter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.filter)?;
if let Some(limit) = self.limit {
write!(f, " LIMIT {limit}")?;
}
Ok(())
}
}

/// Represents `filter` and `order_by` parts of the `SELECT object from {table}
/// WHERE {filter} {order_by}` statement that is assembled by the indexer to
/// fetch an object from the database. The table name is not available to the
/// plugin and thus only a part of the statment is generated there. The indexer
/// maps the TYPE_ID to the tale name and assemles the full statemnt.
pub struct QueryFragment<T> {
pub struct OrderedFilter<T> {
filter: Filter<T>,
field: Option<String>,
order_by: Option<sql::OrderByExpr>,
order_by: sql::OrderByExpr,
}

impl<T> QueryFragment<T> {
impl<T> OrderedFilter<T> {
pub fn asc(mut self) -> Self {
if let Some(ref field) = self.field {
self.order_by = Some(sql::OrderByExpr {
expr: sql::Expr::Identifier(sql::Ident::new(field)),
asc: Some(true),
nulls_first: None,
});
}
self.order_by.asc = Some(true);
self
}

pub fn desc(mut self) -> Self {
if let Some(ref field) = self.field {
self.order_by = Some(sql::OrderByExpr {
expr: sql::Expr::Identifier(sql::Ident::new(field)),
asc: Some(false),
nulls_first: None,
});
}
self.order_by.asc = Some(false);
self
}

pub fn limit(self, limit: usize) -> ManyFilter<T> {
ManyFilter {
filter: self.to_string(),
limit: Some(limit),
phantom: std::marker::PhantomData,
}
}
}

/// Convert `QueryFragment` to `String`. `SELECT * from table_name` is later
/// Convert `OrderedFilter` to `String`. `SELECT * from table_name` is later
/// added by the Fuel indexer to generate the entire query.
impl<T> std::fmt::Display for QueryFragment<T> {
impl<T> std::fmt::Display for OrderedFilter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.filter)?;
if let Some(ref order_by) = self.order_by {
write!(f, " ORDER BY {}", order_by)?;
}
write!(f, "{} ORDER BY {}", self.filter, self.order_by)?;
Ok(())
}
}

/// Automatic lifting of `Filter` into `QueryFragment` leaving `ORDER BY`
/// unspecified.
impl<T> From<Filter<T>> for QueryFragment<T> {
fn from(filter: Filter<T>) -> Self {
QueryFragment {
filter,
field: None,
order_by: None,
// Conversions between different filter structs.

impl<T> From<Filter<T>> for SingleFilter<T> {
fn from(filter: Filter<T>) -> SingleFilter<T> {
SingleFilter {
filter: filter.to_string(),
phantom: std::marker::PhantomData,
}
}
}

impl<T> From<OrderedFilter<T>> for SingleFilter<T> {
fn from(filter: OrderedFilter<T>) -> SingleFilter<T> {
SingleFilter {
filter: filter.to_string(),
phantom: std::marker::PhantomData,
}
}
}

impl<T> From<Filter<T>> for ManyFilter<T> {
fn from(filter: Filter<T>) -> ManyFilter<T> {
ManyFilter {
filter: filter.to_string(),
limit: None,
phantom: std::marker::PhantomData,
}
}
}

impl<T> From<OrderedFilter<T>> for ManyFilter<T> {
fn from(filter: OrderedFilter<T>) -> ManyFilter<T> {
ManyFilter {
filter: filter.to_string(),
limit: None,
phantom: std::marker::PhantomData,
}
}
}

impl<T> From<SingleFilter<T>> for ManyFilter<T> {
fn from(filter: SingleFilter<T>) -> ManyFilter<T> {
ManyFilter {
filter: filter.filter,
limit: Some(1),
phantom: std::marker::PhantomData,
}
}
}

/// Represents a WHERE clause of the SQL statement. Multiple `Filter`s can be
/// joined with `and` and `or` and also ordered, at which point they become
/// `QueryFragment`s.
/// `OrderedFilter`s.
pub struct Filter<T> {
filter: sql::Expr,
phantom: std::marker::PhantomData<T>,
Expand Down Expand Up @@ -106,18 +172,29 @@ impl<T> Filter<T> {
}
}

pub fn order_by<F>(self, f: Field<T, F>) -> QueryFragment<T> {
QueryFragment {
pub fn order_by<F>(self, f: Field<T, F>) -> OrderedFilter<T> {
OrderedFilter {
filter: self,
field: Some(f.field),
order_by: None,
order_by: sql::OrderByExpr {
expr: sql::Expr::Identifier(sql::Ident::new(f.field)),
asc: None,
nulls_first: None,
},
}
}

pub fn limit(self, limit: usize) -> ManyFilter<T> {
ManyFilter {
filter: self.to_string(),
limit: Some(limit),
phantom: std::marker::PhantomData,
}
}
}

/// A trait used to convert a value of scalar type into `sqlparser::ast::Value`.
/// That is, for injecting a value into the `sqlparser`'s representation which
/// we then use to generate a `QueryFragment`.
/// we then use to generate a `OrderedFilter`.
pub trait ToSQLValue
where
Self: Sized,
Expand Down Expand Up @@ -308,3 +385,46 @@ impl<T, F: ToSQLValue> OptionField<T, F> {
Filter::new(expr)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_find_query_generation() {
struct MyStruct {}

fn my_field() -> Field<MyStruct, fuel_indexer_types::scalar::I32> {
Field {
field: "my_field".to_string(),
phantom: std::marker::PhantomData,
}
}

let f: Filter<MyStruct> = my_field().gt(7);
assert_eq!(&f.to_string(), "my_field > 7");

let f: OrderedFilter<MyStruct> = my_field().gt(7).order_by(my_field()).asc();
assert_eq!(&f.to_string(), "my_field > 7 ORDER BY my_field ASC");

// Converting to SingleFilter imposes a LIMIT 1
let sf: SingleFilter<MyStruct> =
my_field().gt(7).order_by(my_field()).asc().into();
assert_eq!(
&sf.to_string(),
"my_field > 7 ORDER BY my_field ASC LIMIT 1"
);

// SingleFilter converted to ManyFilter retains the LIMIT 1
let mf: ManyFilter<MyStruct> = sf.into();
assert_eq!(
&mf.to_string(),
"my_field > 7 ORDER BY my_field ASC LIMIT 1"
);

// Converting to ManyFilter does not impose a LIMIT
let mf: ManyFilter<MyStruct> =
my_field().gt(7).order_by(my_field()).desc().into();
assert_eq!(&mf.to_string(), "my_field > 7 ORDER BY my_field DESC");
}
}
37 changes: 25 additions & 12 deletions packages/fuel-indexer-plugin/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ pub use hex::FromHex;
pub use sha2::{Digest, Sha256};
pub use std::collections::{HashMap, HashSet};

pub use crate::find::{Field, Filter, OptionField, QueryFragment};
pub use crate::find::{Field, Filter, ManyFilter, OptionField, SingleFilter};

// These are instantiated with functions which return
// `Result<T, WasmIndexerError>`. `wasmer` unwraps the `Result` and uses the
// `Err` variant for ealy exit.
// `Err` variant for early exit.
extern "C" {
fn ff_get_object(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
fn ff_single_select(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
fn ff_find_many(type_id: i64, ptr: *const u8, len: *mut u8) -> *mut u8;
fn ff_log_data(ptr: *const u8, len: u32, log_level: u32);
fn ff_put_object(type_id: i64, ptr: *const u8, len: u32);
fn ff_put_many_to_many_record(ptr: *const u8, len: u32);
Expand Down Expand Up @@ -69,7 +69,7 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
/// Convert database row representation into an instance of an entity.
fn from_row(vec: Vec<FtColumn>) -> Self;

/// Convert an instance of an entity into row representation for use in a database.
/// Convert an instance of an entity into a row representation for use in a database.
fn to_row(&self) -> Vec<FtColumn>;

/// Returns an entity's internal type ID.
Expand Down Expand Up @@ -128,22 +128,35 @@ pub trait Entity<'a>: Sized + PartialEq + Eq + std::fmt::Debug {
}

/// Finds the first entity that satisfies the given constraints.
fn find(query: impl Into<QueryFragment<Self>>) -> Option<Self> {
let query: QueryFragment<Self> = query.into();
fn find(filter: impl Into<SingleFilter<Self>>) -> Option<Self> {
let result = Self::find_many(filter.into());
result.into_iter().next()
}

/// Finds the entities that satisfy the given constraints.
fn find_many(filter: impl Into<ManyFilter<Self>>) -> Vec<Self> {
unsafe {
let buff = bincode::serialize(&query.to_string()).unwrap();
let filter: ManyFilter<Self> = filter.into();
let buff = bincode::serialize(&filter.to_string())
.expect("Failed to serialize query");
let mut bufflen = (buff.len() as u32).to_le_bytes();

let ptr =
ff_single_select(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr());
let ptr = ff_find_many(Self::TYPE_ID, buff.as_ptr(), bufflen.as_mut_ptr());

if !ptr.is_null() {
let len = u32::from_le_bytes(bufflen) as usize;
let bytes = Vec::from_raw_parts(ptr, len, len);
let data = deserialize(&bytes).unwrap();
Some(Self::from_row(data))
let data: Vec<Vec<u8>> =
deserialize(&bytes).expect("Failed to deserialize data");
data.iter()
.map(|x| {
Self::from_row(
deserialize(x).expect("Failed to deserialize data"),
)
})
.collect()
} else {
None
vec![]
}
}
}
Expand Down
Loading

0 comments on commit ba28add

Please sign in to comment.