Skip to content

Commit

Permalink
swap out to trigger fns
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Sep 26, 2023
1 parent 4bc7a5f commit cf6c194
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 143 deletions.
16 changes: 16 additions & 0 deletions core/rs/core/src/tableinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ impl TableInfo {
stmt.take();
let mut stmt = self.mark_locally_created_stmt.try_borrow_mut()?;
stmt.take();
let mut stmt = self.mark_locally_updated_stmt.try_borrow_mut()?;
stmt.take();

// primary key columns shouldn't have statements? right?
for col in &self.non_pks {
Expand All @@ -374,6 +376,13 @@ impl TableInfo {
}
}

impl Drop for TableInfo {
fn drop(&mut self) {
// we'll leak rather than panic
let _ = self.clear_stmts();
}
}

pub struct ColumnInfo {
pub cid: i32,
pub name: String,
Expand Down Expand Up @@ -463,6 +472,13 @@ impl ColumnInfo {
}
}

impl Drop for ColumnInfo {
fn drop(&mut self) {
// we'll leak rather than panic
let _ = self.clear_stmts();
}
}

#[no_mangle]
pub extern "C" fn crsql_init_table_info_vec(ext_data: *mut crsql_ExtData) {
let vec: Vec<TableInfo> = vec![];
Expand Down
30 changes: 18 additions & 12 deletions core/rs/core/src/trigger_fns.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::ffi::c_char;
use core::ffi::c_int;
use core::mem::ManuallyDrop;

Expand All @@ -13,6 +14,7 @@ use sqlite_nostd as sqlite;

use crate::compare_values::crsql_compare_sqlite_values;
use crate::stmt_cache::reset_cached_stmt;
use crate::tableinfo::crsql_ensure_table_infos_are_up_to_date;
use crate::tableinfo::ColumnInfo;
use crate::{c::crsql_ExtData, tableinfo::TableInfo};

Expand All @@ -21,23 +23,29 @@ pub unsafe extern "C" fn crsql_after_update(
ctx: *mut sqlite::context,
argc: c_int,
argv: *mut *mut sqlite::value,
) -> c_int {
) {
if argc < 1 {
ctx.result_error("expected at least 1 argument");
return ResultCode::ERROR as c_int;
return;
}

let values = sqlite::args!(argc, argv);
let ext_data = sqlite::user_data(ctx) as *mut crsql_ExtData;
let mut inner_err: *mut c_char = core::ptr::null_mut();
let outer_err: *mut *mut c_char = &mut inner_err;

// TODO: check err. if err, return err
crsql_ensure_table_infos_are_up_to_date(ctx.db_handle(), ext_data, outer_err);

let table_infos =
ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec<TableInfo>));

let table_name = values[0].text();
let table_info = match table_infos.iter().find(|t| t.tbl_name == table_name) {
let table_info = match table_infos.iter().find(|t| &(t.tbl_name) == table_name) {
Some(t) => t,
None => {
ctx.result_error(&format!("table {} not found", table_name));
return ResultCode::ERROR as c_int;
return;
}
};

Expand All @@ -48,7 +56,7 @@ pub unsafe extern "C" fn crsql_after_update(
}
Err(msg) => {
ctx.result_error(&msg);
return ResultCode::ERROR as c_int;
return;
}
};

Expand All @@ -61,12 +69,12 @@ pub unsafe extern "C" fn crsql_after_update(
non_pks_new,
non_pks_old,
) {
Ok(code) => code as c_int,
Ok(code) => ctx.result_int64(1),
Err(msg) => {
ctx.result_error(&msg);
ResultCode::ERROR as c_int
}
}
return;
}

fn partition_values<T>(
Expand Down Expand Up @@ -113,7 +121,8 @@ fn after_update(
after_update__mark_old_pk_row_deleted(db, tbl_info, pks_old, next_db_version, next_seq)?;
after_update__move_non_sentinels(db, tbl_info, pks_new, pks_old)?;
// Record a create of the row identified by the new primary keys
// if no rows were moved.
// if no rows were moved. This is related to the optimization to not save
// sentinels unless required.
if db.changes64() > 0 {
after_update__mark_new_pk_row_created(
db,
Expand All @@ -126,10 +135,7 @@ fn after_update(
}

// now for each non_pk_col we need to do an insert
// where new is not old
// ln 268 in triggers.rs
// note that triggers will pass us cols in table-info order
// when any non_pk is different, run the statement
// where new value is not old value
for ((new, old), col_info) in non_pks_new
.iter()
.zip(non_pks_old.iter())
Expand Down
151 changes: 20 additions & 131 deletions core/rs/core/src/triggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,146 +156,35 @@ fn create_update_trigger(
) -> Result<ResultCode, ResultCode> {
let table_name = &table_info.tbl_name;
let pk_columns = &table_info.pks;
let pk_list = crate::util::as_identifier_list(pk_columns, None)?;
let non_pk_columns = &table_info.non_pks;
let pk_new_list = crate::util::as_identifier_list(pk_columns, Some("NEW."))?;
let pk_old_list = crate::util::as_identifier_list(pk_columns, Some("OLD."))?;
let pk_where_list = crate::util::pk_where_list(pk_columns, Some("OLD."))?;
let mut any_pk_differs = vec![];
for c in pk_columns {
any_pk_differs.push(format!(
"NEW.\"{col_name}\" IS NOT OLD.\"{col_name}\"",
col_name = crate::util::escape_ident(&c.name)
));
}
let any_pk_differs = any_pk_differs.join(" OR ");

// Changing a primary key to a new value is the same as deleting the row previously
// identified by that primary key. TODO: share this code with `create_delete_trigger`
for col in pk_columns {
let col_name = &col.name;
db.exec_safe(&format!(
"CREATE TRIGGER IF NOT EXISTS \"{tbl_name}_{col_name}__crsql_utrig\"
AFTER UPDATE OF \"{col_name}\" ON \"{tbl_name}\"
WHEN crsql_internal_sync_bit() = 0 AND NEW.\"{col_name}\" IS NOT OLD.\"{col_name}\"
BEGIN
INSERT INTO \"{table_name}__crsql_clock\" (
{pk_list},
__crsql_col_name,
__crsql_col_version,
__crsql_db_version,
__crsql_seq,
__crsql_site_id
) SELECT
{pk_old_list},
'{sentinel}',
2,
crsql_next_db_version(),
crsql_increment_and_get_seq(),
NULL WHERE true
ON CONFLICT DO UPDATE SET
__crsql_col_version = 1 + __crsql_col_version,
__crsql_db_version = crsql_next_db_version(),
__crsql_seq = crsql_get_seq() - 1,
__crsql_site_id = NULL;
DELETE FROM \"{table_name}__crsql_clock\"
WHERE {pk_where_list} AND __crsql_col_name != '{sentinel}';
END;",
tbl_name = crate::util::escape_ident(table_name),
col_name = crate::util::escape_ident(col_name),
pk_list = pk_list,
let trigger_body = if non_pk_columns.is_empty() {
format!(
"SELECT crsql_after_update('{table_name}', {pk_new_list}, {pk_old_list})",
table_name = crate::util::escape_ident_as_value(table_name),
pk_new_list = pk_new_list,
pk_old_list = pk_old_list,
sentinel = crate::c::DELETE_SENTINEL,
))?;
}

let trigger_body =
update_trigger_body(table_info, table_name, pk_list, pk_new_list, any_pk_differs)?;

let create_trigger_sql = format!(
)
} else {
format!(
"SELECT crsql_after_update('{table_name}', {pk_new_list}, {pk_old_list}, {non_pk_new_list}, {non_pk_old_list})",
table_name = crate::util::escape_ident_as_value(table_name),
pk_new_list = pk_new_list,
pk_old_list = pk_old_list,
non_pk_new_list = crate::util::as_identifier_list(non_pk_columns, Some("NEW."))?,
non_pk_old_list = crate::util::as_identifier_list(non_pk_columns, Some("OLD."))?
)
};
db.exec_safe(&format!(
"CREATE TRIGGER IF NOT EXISTS \"{table_name}__crsql_utrig\"
AFTER UPDATE ON \"{table_name}\" WHEN crsql_internal_sync_bit() = 0
BEGIN
{trigger_body}
{trigger_body};
END;",
table_name = crate::util::escape_ident(table_name),
trigger_body = trigger_body
);

db.exec_safe(&create_trigger_sql)
}

fn update_trigger_body(
table_info: &TableInfo,
table_name: &str,
pk_list: String,
pk_new_list: String,
any_pk_differs: String,
) -> Result<String, Utf8Error> {
let non_pk_columns = &table_info.non_pks;
let mut trigger_components = vec![];

// If any PK is different, record a create for the row
// as setting a PK to a _new value_ is like insertion or creating a row.
// If we have CL and we conflict.. and pk is not _dead_, ignore?
trigger_components.push(format!(
"INSERT INTO \"{table_name}__crsql_clock\" (
{pk_list},
__crsql_col_name,
__crsql_col_version,
__crsql_db_version,
__crsql_seq,
__crsql_site_id
) SELECT
{pk_new_list},
'{sentinel}',
1,
crsql_next_db_version(),
crsql_increment_and_get_seq(),
NULL
WHERE {any_pk_differs}
ON CONFLICT DO UPDATE SET
__crsql_col_version = CASE __crsql_col_version % 2 WHEN 0 THEN __crsql_col_version + 1 ELSE __crsql_col_version + 2 END,
__crsql_db_version = crsql_next_db_version(),
__crsql_seq = crsql_get_seq() - 1,
__crsql_site_id = NULL;",
table_name = crate::util::escape_ident(table_name),
pk_list = pk_list,
pk_new_list = pk_new_list,
sentinel = crate::c::INSERT_SENTINEL,
any_pk_differs = any_pk_differs
));

for col in non_pk_columns {
trigger_components.push(format!(
"INSERT INTO \"{table_name}__crsql_clock\" (
{pk_list},
__crsql_col_name,
__crsql_col_version,
__crsql_db_version,
__crsql_seq,
__crsql_site_id
) SELECT
{pk_new_list},
'{col_name_val}',
1,
crsql_next_db_version(),
crsql_increment_and_get_seq(),
NULL
WHERE NEW.\"{col_name_ident}\" IS NOT OLD.\"{col_name_ident}\"
ON CONFLICT DO UPDATE SET
__crsql_col_version = __crsql_col_version + 1,
__crsql_db_version = crsql_next_db_version(),
__crsql_seq = crsql_get_seq() - 1,
__crsql_site_id = NULL;",
table_name = crate::util::escape_ident(table_name),
pk_list = pk_list,
pk_new_list = pk_new_list,
col_name_val = crate::util::escape_ident_as_value(&col.name),
col_name_ident = crate::util::escape_ident(&col.name)
))
}

Ok(trigger_components.join("\n"))
))
}

fn create_delete_trigger(
Expand Down
6 changes: 6 additions & 0 deletions core/rs/integration_check/src/t/tableinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ fn test_create_clock_table_from_table_info() {
// todo: Check that clock tables have expected schema(s)
}

fn test_leak_condition() {
// updating table infos prepares stements
// re-pulling table infos should finalize those statements
// we do not use `Drop` given we cannot return error conditions from `Drop`
}

pub fn run_suite() {
test_ensure_table_infos_are_up_to_date();
test_pull_table_info();
Expand Down
6 changes: 6 additions & 0 deletions core/src/crsqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ __declspec(dllexport)
crsqlFinalize, 0, 0);
}

if (rc == SQLITE_OK) {
rc = sqlite3_create_function(db, "crsql_after_update", -1,
SQLITE_UTF8 | SQLITE_INNOCUOUS, pExtData,
crsql_after_update, 0, 0);
}

if (rc == SQLITE_OK) {
rc = sqlite3_create_function(db, "crsql_rows_impacted", 0,
SQLITE_UTF8 | SQLITE_INNOCUOUS, pExtData,
Expand Down
3 changes: 3 additions & 0 deletions core/src/rust.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ int crsql_fill_db_version_if_needed(sqlite3 *db, crsql_ExtData *pExtData,
char **errmsg);
sqlite_int64 crsql_next_db_version(sqlite3 *db, crsql_ExtData *pExtData,
sqlite3_int64 mergingVersion, char **errmsg);

void crsql_after_update(sqlite3_context *context, int argc,
sqlite3_value **argv);
#endif

0 comments on commit cf6c194

Please sign in to comment.