Skip to content

Commit

Permalink
start going through to convert to causal length rather than delete wins
Browse files Browse the repository at this point in the history
Still need to:
1. Process the `causal_length` value on non-causal length insertions to see if it requires resurrection

We do not need to process `causal_length` values on non-causal length insertions in order to kick off a delete given it would be impossible for these records to exist post deletion of a row. How could a user update a deleted row on their node in order to generate these records?
  • Loading branch information
tantaman committed Jun 27, 2023
1 parent dd8c267 commit 0517657
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 61 deletions.
11 changes: 7 additions & 4 deletions core/src/changes-vtab-write.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ int crsql_checkForLocalDelete(sqlite3 *db, const char *tblName,
char *zSql = sqlite3_mprintf(
"SELECT count(*) FROM \"%s__crsql_clock\" WHERE %s AND "
"__crsql_col_name "
"= %Q",
tblName, pkWhereList, DELETE_CID_SENTINEL);
"= %Q AMD __crsql_col_version % 2 == 0",
tblName, pkWhereList, CAUSAL_LENGTH_COL);
sqlite3_stmt *pStmt;
int rc = sqlite3_prepare(db, zSql, -1, &pStmt, 0);
sqlite3_free(zSql);
Expand Down Expand Up @@ -195,7 +195,7 @@ sqlite3_int64 crsql_mergePkOnlyInsert(sqlite3 *db, crsql_TableInfo *tblInfo,

// TODO: if insert was ignored, no reason to change clock
return crsql_setWinnerClock(db, tblInfo, pkIdentifiers, pkValsStr,
PKS_ONLY_CID_SENTINEL, remoteColVersion,
CAUSAL_LENGTH_COL, remoteColVersion,
remoteDbVersion, remoteSiteId, remoteSiteIdLen);
}

Expand All @@ -221,7 +221,7 @@ sqlite3_int64 crsql_mergeDelete(sqlite3 *db, crsql_TableInfo *tblInfo,
}

return crsql_setWinnerClock(db, tblInfo, pkIdentifiers, pkValsStr,
DELETE_CID_SENTINEL, remoteColVersion,
CAUSAL_LENGTH_COL, remoteColVersion,
remoteDbVersion, remoteSiteId, remoteSiteIdLen);
}

Expand Down Expand Up @@ -301,6 +301,8 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
return SQLITE_ERROR;
}

// need to check on the causal length col_versions
// we need to check if we deleted the row already in the given pass?
int isDelete = strcmp(DELETE_CID_SENTINEL, insertColName) == 0;
int isPkOnly = strcmp(PKS_ONLY_CID_SENTINEL, insertColName) == 0;

Expand All @@ -313,6 +315,7 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
}

rc = crsql_checkForLocalDelete(db, tblInfo->tblName, pkWhereList);
// Delete doesn't win anymore. We need to check.
if (rc == DELETED_LOCALLY) {
rc = SQLITE_OK;
// delete wins. we're all done.
Expand Down
13 changes: 4 additions & 9 deletions core/src/changes-vtab.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,8 @@ static int changesNext(sqlite3_vtab_cursor *cur) {
return SQLITE_ERROR;
}

if (strcmp(DELETE_CID_SENTINEL, cid) == 0) {
pCur->rowType = ROW_TYPE_DELETE;
return SQLITE_OK;
} else if (strcmp(PKS_ONLY_CID_SENTINEL, cid) == 0) {
pCur->rowType = ROW_TYPE_PKONLY;
if (strcmp(CAUSAL_LENGTH_COL, cid) == 0) {
pCur->rowType = ROW_TYPE_CAUSAL_LENGTH;
return SQLITE_OK;
} else {
pCur->rowType = ROW_TYPE_UPDATE;
Expand Down Expand Up @@ -293,10 +290,8 @@ static int changesColumn(
}
break;
case CHANGES_SINCE_VTAB_CID:
if (pCur->rowType == ROW_TYPE_PKONLY) {
sqlite3_result_text(ctx, PKS_ONLY_CID_SENTINEL, -1, SQLITE_STATIC);
} else if (pCur->rowType == ROW_TYPE_DELETE || pCur->pRowStmt == 0) {
sqlite3_result_text(ctx, DELETE_CID_SENTINEL, -1, SQLITE_STATIC);
if (pCur->rowType == ROW_TYPE_CAUSAL_LENGTH) {
sqlite3_result_text(ctx, CAUSAL_LENGTH_COL, -1, SQLITE_STATIC);
} else {
sqlite3_result_value(ctx,
sqlite3_column_value(pCur->pChangesStmt, CID));
Expand Down
3 changes: 1 addition & 2 deletions core/src/changes-vtab.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ struct crsql_Changes_vtab {
* changesOpen and released in changesCrsrFinalize
*/
#define ROW_TYPE_UPDATE 0
#define ROW_TYPE_DELETE 1
#define ROW_TYPE_PKONLY 2
#define ROW_TYPE_CAUSAL_LENGTH 1

typedef struct crsql_Changes_cursor crsql_Changes_cursor;
struct crsql_Changes_cursor {
Expand Down
3 changes: 1 addition & 2 deletions core/src/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
// NB: crsql_quoteConcat
#define QC_DELIM '|'

#define DELETE_CID_SENTINEL "__crsql_del"
#define PKS_ONLY_CID_SENTINEL "__crsql_pko"
#define CAUSAL_LENGTH_COL "__crsql_cl"

#define CRR_SPACE 0
#define USER_SPACE 1
Expand Down
10 changes: 4 additions & 6 deletions core/src/crsqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,8 @@ int crsql_compactPostAlter(sqlite3 *db, const char *tblName,
// First delete entries that no longer have a column
zSql = sqlite3_mprintf(
"DELETE FROM \"%w__crsql_clock\" WHERE \"__crsql_col_name\" NOT IN "
"(SELECT name FROM pragma_table_info(%Q) UNION SELECT '%s' UNION "
"SELECT "
"'%s')",
tblName, tblName, DELETE_CID_SENTINEL, PKS_ONLY_CID_SENTINEL);
"(SELECT name FROM pragma_table_info(%Q) UNION SELECT '%s')",
tblName, tblName, CAUSAL_LENGTH_COL);
rc = sqlite3_exec(db, zSql, 0, 0, errmsg);
sqlite3_free(zSql);
if (rc != SQLITE_OK) {
Expand All @@ -600,9 +598,9 @@ int crsql_compactPostAlter(sqlite3 *db, const char *tblName,
sqlite3_str_appendf(
pDynStr,
"DELETE FROM \"%w__crsql_clock\" WHERE __crsql_col_name != "
"'__crsql_del' AND NOT EXISTS (SELECT 1 FROM "
"'%s' AND NOT EXISTS (SELECT 1 FROM "
"\"%w\" WHERE ",
tblName, tblName);
tblName, CAUSAL_LENGTH_COL, tblName);
// get table info
rc = crsql_ensureTableInfosAreUpToDate(db, pExtData, errmsg);
if (rc != SQLITE_OK) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/rows-impacted.test.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ static void testDeleteThatDoesNotChangeAnything() {
rc += sqlite3_exec(db, "BEGIN", 0, 0, 0);
rc += sqlite3_exec(db,
"INSERT INTO crsql_changes VALUES ('foo', 1, "
"'__crsql_del', NULL, 1, 2, NULL)",
"'__crsql_cl', NULL, 2, 2, NULL)",
0, 0, &err);
sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0);
sqlite3_step(pStmt);
Expand All @@ -262,7 +262,7 @@ static void testDelete() {
rc += sqlite3_exec(db, "BEGIN", 0, 0, 0);
rc += sqlite3_exec(db,
"INSERT INTO crsql_changes VALUES ('foo', 1, "
"'__crsql_del', NULL, 1, 2, NULL)",
"'__crsql_cl', NULL, 2, 2, NULL)",
0, 0, &err);
sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0);
sqlite3_step(pStmt);
Expand Down
35 changes: 14 additions & 21 deletions core/src/triggers.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,12 @@ int crsql_createInsertTrigger(sqlite3 *db, crsql_TableInfo *tableInfo,

char *crsql_insertTriggerQuery(crsql_TableInfo *tableInfo, char *pkList,
char *pkNewList) {
const int length = tableInfo->nonPksLen == 0 ? 1 : tableInfo->nonPksLen;
const int length = tableInfo->nonPksLen + 1;
char **subTriggers = sqlite3_malloc(length * sizeof(char *));
char *joinedSubTriggers;

// We need a CREATE_SENTINEL to stand in for the create event so we can
// replicate PKs If we have a create sentinel how will we insert the created
// rows without a requirement of nullability on every column? Keep some
// event data for create that represents the initial state of the row?
// Future improvement.
if (tableInfo->nonPksLen == 0) {
subTriggers[0] = sqlite3_mprintf(
"INSERT INTO \"%s__crsql_clock\" (\
subTriggers[0] = sqlite3_mprintf(
"INSERT INTO \"%s__crsql_clock\" (\
%s,\
__crsql_col_name,\
__crsql_col_version,\
Expand All @@ -71,8 +65,8 @@ char *crsql_insertTriggerQuery(crsql_TableInfo *tableInfo, char *pkList,
__crsql_db_version = crsql_nextdbversion(),\
__crsql_seq = crsql_get_seq() - 1,\
__crsql_site_id = NULL;\n",
tableInfo->tblName, pkList, pkNewList, PKS_ONLY_CID_SENTINEL);
}
tableInfo->tblName, pkList, pkNewList, CAUSAL_LENGTH_COL);

for (int i = 0; i < tableInfo->nonPksLen; ++i) {
subTriggers[i] = sqlite3_mprintf(
"INSERT INTO \"%s__crsql_clock\" (\
Expand Down Expand Up @@ -123,7 +117,7 @@ int crsql_createUpdateTrigger(sqlite3 *db, crsql_TableInfo *tableInfo,
char *pkList = 0;
char *pkNewList = 0;
int rc = SQLITE_OK;
const int length = tableInfo->nonPksLen == 0 ? 1 : tableInfo->nonPksLen;
const int length = tableInfo->nonPksLen + 1;
char **subTriggers = sqlite3_malloc(length * sizeof(char *));
char *joinedSubTriggers;

Expand All @@ -141,9 +135,8 @@ int crsql_createUpdateTrigger(sqlite3 *db, crsql_TableInfo *tableInfo,
//
// TODO: Do we not also need to record a creation event
// if a pk was changed for a non pk only table?
if (tableInfo->nonPksLen == 0) {
subTriggers[0] = sqlite3_mprintf(
"INSERT INTO \"%s__crsql_clock\" (\
subTriggers[0] = sqlite3_mprintf(
"INSERT INTO \"%s__crsql_clock\" (\
%s,\
__crsql_col_name,\
__crsql_col_version,\
Expand All @@ -162,10 +155,9 @@ int crsql_createUpdateTrigger(sqlite3 *db, crsql_TableInfo *tableInfo,
__crsql_db_version = crsql_nextdbversion(),\
__crsql_seq = crsql_get_seq() - 1,\
__crsql_site_id = NULL;\n",
tableInfo->tblName, pkList, pkNewList, PKS_ONLY_CID_SENTINEL);
}
tableInfo->tblName, pkList, pkNewList, CAUSAL_LENGTH_COL);

for (int i = 0; i < tableInfo->nonPksLen; ++i) {
for (int i = 1; i < tableInfo->nonPksLen; ++i) {
// updates are conditionally inserted on the new value not being
// the same as the old value.
subTriggers[i] = sqlite3_mprintf(
Expand Down Expand Up @@ -244,7 +236,7 @@ char *crsql_deleteTriggerQuery(crsql_TableInfo *tableInfo) {
) SELECT \
%s,\
%Q,\
1,\
2,\
crsql_nextdbversion(),\
crsql_increment_and_get_seq(),\
NULL\
Expand All @@ -254,10 +246,11 @@ char *crsql_deleteTriggerQuery(crsql_TableInfo *tableInfo) {
__crsql_seq = crsql_get_seq() - 1,\
__crsql_site_id = NULL;\
\
DELETE FROM \"%w__crsql_clock\" WHERE crsql_internal_sync_bit() = 0 AND %s AND __crsql_col_name != '__crsql_del';\
DELETE FROM \"%w__crsql_clock\" WHERE crsql_internal_sync_bit() = 0 AND %s AND __crsql_col_name != '%s';\
END; ",
tableInfo->tblName, tableInfo->tblName, tableInfo->tblName, pkList,
pkOldList, DELETE_CID_SENTINEL, tableInfo->tblName, pkWhereList);
pkOldList, CAUSAL_LENGTH_COL, tableInfo->tblName, pkWhereList,
CAUSAL_LENGTH_COL);

sqlite3_free(pkList);
sqlite3_free(pkOldList);
Expand Down
4 changes: 2 additions & 2 deletions core/src/triggers.test.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ static void testDeleteTriggerQuery() {
"ON \"foo\" BEGIN INSERT INTO \"foo__crsql_clock\" ( "
"\"a\", __crsql_col_name, __crsql_col_version, "
"__crsql_db_version, __crsql_seq, __crsql_site_id "
") SELECT OLD.\"a\", '__crsql_del', 1, "
") SELECT OLD.\"a\", '__crsql_cl', 2, "
"crsql_nextdbversion(), crsql_increment_and_get_seq(), "
"NULL WHERE crsql_internal_sync_bit() = 0 ON CONFLICT DO UPDATE "
"SET __crsql_col_version = __crsql_col_version + 1, "
"__crsql_db_version = crsql_nextdbversion(), __crsql_seq = "
"crsql_get_seq() - 1, __crsql_site_id = NULL; "
" DELETE FROM \"foo__crsql_clock\" WHERE crsql_internal_sync_bit() "
"= 0 AND \"a\" = OLD.\"a\" AND __crsql_col_name != '__crsql_del'; "
"= 0 AND \"a\" = OLD.\"a\" AND __crsql_col_name != '__crsql_cl'; "
" END; ",
query) == 0);

Expand Down
7 changes: 4 additions & 3 deletions py/correctness/tests/test_schema_modification.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,19 @@ def test_delete_sentinels_not_lost():
c = setup_alter_test()
c.execute("DELETE FROM todo WHERE id = 1;")
c.commit()
changes = c.execute(changes_query).fetchall()
changes = c.execute(
"SELECT [table], [pk], [cid], [val], [col_version] FROM crsql_changes").fetchall()

# starting off correctly
assert (changes == [('todo', '1', '__crsql_del', None)])
assert (changes == [('todo', '1', '__crsql_cl', None, 2)])

c.execute("SELECT crsql_begin_alter('todo');")
c.execute("ALTER TABLE todo RENAME name TO task;")
c.execute("SELECT crsql_commit_alter('todo');")
c.commit()

changes = c.execute(changes_query).fetchall()
assert (changes == [('todo', '1', '__crsql_del', None)])
assert (changes == [('todo', '1', '__crsql_cl', None, 2)])


def test_pk_only_sentinels():
Expand Down
18 changes: 8 additions & 10 deletions py/correctness/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,22 @@ def test_delete():

rows = get_changes_since(db, 1, 'FF')
siteid = None
# Deletes are marked with a sentinel id
assert (rows == [('component', '1', '__crsql_del', None, 1, 2, siteid)])
assert (rows == [('component', '1', '__crsql_cl', None, 2, 2, siteid)])

db.execute("DELETE FROM component")
db.execute("DELETE FROM deck")
db.execute("DELETE FROM slide")
db.commit()

rows = get_changes_since(db, 0, 'FF')
# TODO: should deletes not get a proper version? Would be better for ordering and chunking replications
assert (rows == [('user', '1', 'name', "'Javi'", 1, 1, None),
('component', '1', '__crsql_del', None, 1, 2, None),
('component', '2', '__crsql_del', None, 1, 3, None),
('component', '3', '__crsql_del', None, 1, 3, None),
('deck', '1', '__crsql_del', None, 1, 3, None),
('slide', '1', '__crsql_del', None, 1, 3, None),
('slide', '2', '__crsql_del', None, 1, 3, None),
('slide', '3', '__crsql_del', None, 1, 3, None)])
('component', '1', '__crsql_cl', None, 2, 2, None),
('component', '2', '__crsql_cl', None, 2, 3, None),
('component', '3', '__crsql_cl', None, 2, 3, None),
('deck', '1', '__crsql_cl', None, 2, 3, None),
('slide', '1', '__crsql_cl', None, 2, 3, None),
('slide', '2', '__crsql_cl', None, 2, 3, None),
('slide', '3', '__crsql_cl', None, 2, 3, None)])

# test insert

Expand Down

0 comments on commit 0517657

Please sign in to comment.