Skip to content

Commit

Permalink
Function to handle after_update metadata tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Sep 20, 2023
1 parent 547bf18 commit a252ae6
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 6 deletions.
28 changes: 23 additions & 5 deletions core/rs/core/src/compare_values.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use alloc::format;
use alloc::string::String;
use core::ffi::c_int;
use sqlite::value;
use sqlite::Value;
use sqlite_nostd as sqlite;

#[no_mangle]
pub extern "C" fn crsql_compare_sqlite_values(
l: *mut sqlite::value,
r: *mut sqlite::value,
) -> c_int {
// TODO: add an integration test that ensures NULL == NULL!
pub fn crsql_compare_sqlite_values(l: *mut sqlite::value, r: *mut sqlite::value) -> c_int {
let l_type = l.value_type();
let r_type = r.value_type();

Expand Down Expand Up @@ -43,3 +43,21 @@ pub extern "C" fn crsql_compare_sqlite_values(
sqlite::ColumnType::Text => l.text().cmp(r.text()) as c_int,
}
}

pub fn any_value_changed(left: &[*mut value], right: &[*mut value]) -> Result<bool, String> {
if left.len() != right.len() {
return Err(format!(
"left and right values must have the same length: {} != {}",
left.len(),
right.len()
));
}

for (l, r) in left.iter().zip(right.iter()) {
if crsql_compare_sqlite_values(*l, *r) != 0 {
return Ok(true);
}
}

Ok(false)
}
1 change: 1 addition & 0 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod tableinfo;
mod teardown;
#[cfg(feature = "test")]
pub mod test_exports;
mod trigger_fns;
mod triggers;
mod unpack_columns_vtab;
mod util;
Expand Down
43 changes: 43 additions & 0 deletions core/rs/core/src/tableinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct TableInfo {
pub pks: Vec<ColumnInfo>,
pub non_pks: Vec<ColumnInfo>,

// For merges --
set_winner_clock_stmt: RefCell<Option<ManagedStmt>>,
local_cl_stmt: RefCell<Option<ManagedStmt>>,
col_version_stmt: RefCell<Option<ManagedStmt>>,
Expand All @@ -41,6 +42,9 @@ pub struct TableInfo {
// This also means that col_version is not always >= 1. A resurrected column,
// which missed a delete event, will have a 0 version.
zero_clocks_on_resurrect_stmt: RefCell<Option<ManagedStmt>>,

// For local writes --
insert_or_update_sentinal_stmt: RefCell<Option<ManagedStmt>>,
}

impl TableInfo {
Expand Down Expand Up @@ -183,6 +187,42 @@ impl TableInfo {
Ok(self.zero_clocks_on_resurrect_stmt.try_borrow()?)
}

pub fn get_insert_or_update_sentinel_stmt(
&self,
db: *mut sqlite3,
) -> Result<Ref<Option<ManagedStmt>>, ResultCode> {
if self.insert_or_update_sentinal_stmt.try_borrow()?.is_none() {
let sql = format!(
"INSERT INTO \"{table_name}__crsql_clock\" (
{pk_list},
__crsql_col_name,
__crsql_col_version,
__crsql_db_version,
__crsql_seq,
__crsql_site_id
) SELECT
{pk_bind_slots},
'{sentinel}',
2,
?,
?,
NULL WHERE true
ON CONFLICT DO UPDATE SET
__crsql_col_version = 1 + __crsql_col_version,
__crsql_db_version = ?,
__crsql_seq = ?,
__crsql_site_id = NULL",
table_name = crate::util::escape_ident(&self.tbl_name),
pk_list = crate::util::as_identifier_list(&self.pks, None)?,
pk_bind_slots = crate::util::binding_list(self.pks.len()),
sentinel = crate::c::DELETE_SENTINEL,
);
let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?;
*self.insert_or_update_sentinal_stmt.try_borrow_mut()? = Some(ret);
}
Ok(self.insert_or_update_sentinal_stmt.try_borrow()?)
}

pub fn get_col_value_stmt(
&self,
db: *mut sqlite3,
Expand Down Expand Up @@ -226,6 +266,8 @@ impl TableInfo {
stmt.take();
let mut stmt = self.zero_clocks_on_resurrect_stmt.try_borrow_mut()?;
stmt.take();
let mut stmt = self.insert_or_update_sentinal_stmt.try_borrow_mut()?;
stmt.take();

// primary key columns shouldn't have statements? right?
for col in &self.non_pks {
Expand Down Expand Up @@ -474,6 +516,7 @@ pub fn pull_table_info(
merge_delete_stmt: RefCell::new(None),
merge_delete_drop_clocks_stmt: RefCell::new(None),
zero_clocks_on_resurrect_stmt: RefCell::new(None),
insert_or_update_sentinal_stmt: RefCell::new(None),
});
}

Expand Down
171 changes: 171 additions & 0 deletions core/rs/core/src/trigger_fns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use core::ffi::c_int;
use core::mem::ManuallyDrop;

use alloc::boxed::Box;
use alloc::format;
use alloc::slice;
use alloc::string::String;
use alloc::vec;
use alloc::vec::Vec;
use sqlite::{sqlite3, value, Context, ResultCode, Value};
use sqlite_nostd as sqlite;

use crate::c::crsql_getDbVersion;
use crate::{c::crsql_ExtData, tableinfo::TableInfo};

#[no_mangle]
pub unsafe extern "C" fn crsql_after_update(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) -> c_int {
if argc < 1 {
ctx.result_error("expected at least 1 argument");
return ResultCode::ERROR as c_int;
}

let values = sqlite::args!(argc, argv);
let ext_data = sqlite::user_data(ctx) as *mut crsql_ExtData;
let table_infos =
ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec<TableInfo>));

let table_name = values[0].text();
let table_info = match table_infos.iter().find(|t| t.tbl_name == table_name) {
Some(t) => t,
None => {
ctx.result_error(&format!("table {} not found", table_name));
return ResultCode::ERROR as c_int;
}
};

let (pks_new, pks_old, non_pks_new, non_pks_old) =
match partition_values(values, 1, table_info.pks.len(), table_info.non_pks.len()) {
Ok((pks_new, pks_old, non_pks_new, non_pks_old)) => {
(pks_new, pks_old, non_pks_new, non_pks_old)
}
Err(msg) => {
ctx.result_error(&msg);
return ResultCode::ERROR as c_int;
}
};

match after_update(
ctx.db_handle(),
ext_data,
table_info,
pks_new,
pks_old,
non_pks_new,
non_pks_old,
) {
Ok(code) => code as c_int,
Err(msg) => {
ctx.result_error(&msg);
ResultCode::ERROR as c_int
}
}
}

fn partition_values<T>(
values: &[T],
offset: usize,
num_pks: usize,
num_non_pks: usize,
) -> Result<(&[T], &[T], &[T], &[T]), String> {
let expected_len = offset + num_pks * 2 + num_non_pks * 2;
if values.len() != expected_len {
return Err(format!(
"expected {} values, got {}",
expected_len,
values.len()
));
}
Ok((
&values[offset..num_pks + offset],
&values[num_pks + offset..num_pks * 2 + offset],
&values[num_pks * 2 + offset..num_pks * 2 + num_non_pks + offset],
&values[num_pks * 2 + num_non_pks + offset..],
))
}

fn after_update(
db: *mut sqlite3,
ext_data: *mut crsql_ExtData,
tbl_info: &TableInfo,
pks_new: &[*mut value],
pks_old: &[*mut value],
non_pks_new: &[*mut value],
non_pks_old: &[*mut value],
) -> Result<ResultCode, String> {
let seq = increment_and_get_seq();
let db_version = crsql_getDbVersion(db, ext_data, err_msg)
// Check if any PK value changed
// If so,
// 1. insert or update a sentinel for the old thing
// 2. delete all the non senintels
if crate::compare_values::any_value_changed(pks_new, pks_old)? {
// insert_or_update_sentinel(tbl_info, pks_old)?;
// delete_non_sentinels();
}

Ok(ResultCode::OK)
}

fn insert_or_update_sentinel(db: *mut sqlite3, tbl_info: &TableInfo, pks: &[*mut value]) {
let insert_or_update_sentinel_stmt_ref = tbl_info.get_insert_or_update_sentinel_stmt(db)?;
let insert_or_update_sentinel_stmt = insert_or_update_sentinel_stmt_ref
.as_ref()
.ok_or(ResultCode::ERROR)?;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_partition_values() {
let values1 = vec!["tbl", "pk.new", "pk.old", "c.new", "c.old"];
let values2 = vec!["tbl", "pk.new", "pk.old"];
let values3 = vec!["tbl", "pk1.new", "pk2.new", "pk1.old", "pk2.old"];
let values4 = vec![
"tbl", "pk1.new", "pk2.new", "pk1.old", "pk2.old", "c.new", "d.new", "c.old", "d.old",
];

assert_eq!(
partition_values(&values1, 1, 1, 1),
Ok((
&["pk.new"] as &[&str],
&["pk.old"] as &[&str],
&["c.new"] as &[&str],
&["c.old"] as &[&str]
))
);
assert_eq!(
partition_values(&values2, 1, 1, 0),
Ok((
&["pk.new"] as &[&str],
&["pk.old"] as &[&str],
&[] as &[&str],
&[] as &[&str]
))
);
assert_eq!(
partition_values(&values3, 1, 2, 0),
Ok((
&["pk1.new", "pk2.new"] as &[&str],
&["pk1.old", "pk2.old"] as &[&str],
&[] as &[&str],
&[] as &[&str]
))
);
assert_eq!(
partition_values(&values4, 1, 2, 2),
Ok((
&["pk1.new", "pk2.new"] as &[&str],
&["pk1.old", "pk2.old"] as &[&str],
&["c.new", "d.new"] as &[&str],
&["c.old", "d.old"] as &[&str]
))
);
}
}
1 change: 0 additions & 1 deletion core/src/rust.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
// These are those definitions.

int crsql_is_crr(sqlite3 *db, const char *tblName);
int crsql_compare_sqlite_values(const sqlite3_value *l, const sqlite3_value *r);
int crsql_remove_crr_triggers_if_exist(sqlite3 *db, const char *tblName);

int crsql_init_site_id(sqlite3 *db, unsigned char *ret);
Expand Down
24 changes: 24 additions & 0 deletions notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,27 @@ function crsql_next_db_version(arg?) {
```
On commit, pending becomes actual.
# Trigger opt
- function to `insert and save` a lookaside in `insert_trigger`
- function to `get and save` a lookaside in `update_trigger`
- replace these lookups: `(SELECT __crsql_key FROM \"{table_name}__crsql_pks\" WHERE {pk_where_list})`
-

- Test changing pks in update creates key lookaside correctly.

---

Assuming we re-write to use a function...

```ts
function after_update(table, cols_new, cols_old) {
// cols_new and cols_old are _in order_ as according to table_info

// 1. Lookup the old record with `cols_old`
// 2. Do the pk delete stuff if there exists a record with old
// 3. If any pk is different, record a create record (sqlite value compare)
// 4. For each non_pk, record the clock metadata
}
```

0 comments on commit a252ae6

Please sign in to comment.