Skip to content

Commit

Permalink
database: actually implement upserts
Browse files Browse the repository at this point in the history
Fixes #55 - it was the code that should have been there
in the first place, but I forgot to `git add`...
  • Loading branch information
psarna committed Jun 13, 2023
1 parent ec64508 commit d7c59bd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
54 changes: 30 additions & 24 deletions mvcc-rs/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl<Clock: LogicalClock> Database<Clock> {
.unwrap_or(0);
if versions.len() - position > 3 {
tracing::debug!(
"Inserting an element {} positions from the end",
"Inserting a row version {} positions from the end",
versions.len() - position
);
}
Expand All @@ -339,7 +339,7 @@ impl<Clock: LogicalClock> Database<Clock> {
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let mut tx = tx.value().write().unwrap();
assert!(tx.state == TransactionState::Active);
assert_eq!(tx.state, TransactionState::Active);
let id = row.id;
let row_version = RowVersion {
begin: TxTimestampOrID::TxID(tx.tx_id),
Expand Down Expand Up @@ -379,9 +379,9 @@ impl<Clock: LogicalClock> Database<Clock> {
}

/// Inserts a row in the database with new values, previously deleting
/// any old data if it existed.
/// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
self.delete(tx_id, row.id).ok();
self.delete(tx_id, row.id)?;
self.insert(tx_id, row)
}

Expand Down Expand Up @@ -409,7 +409,7 @@ impl<Clock: LogicalClock> Database<Clock> {
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let tx = tx.value().read().unwrap();
assert!(tx.state == TransactionState::Active);
assert_eq!(tx.state, TransactionState::Active);
if is_write_write_conflict(&self.txs, &tx, rv) {
drop(row_versions);
drop(row_versions_opt);
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<Clock: LogicalClock> Database<Clock> {
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read().unwrap();
assert!(tx.state == TransactionState::Active);
assert_eq!(tx.state, TransactionState::Active);
if let Some(row_versions) = self.rows.get(&id) {
let row_versions = row_versions.value().read().unwrap();
for rv in row_versions.iter().rev() {
Expand Down Expand Up @@ -520,7 +520,7 @@ impl<Clock: LogicalClock> Database<Clock> {
match tx.state.load() {
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
_ => {
assert!(tx.state == TransactionState::Active);
assert_eq!(tx.state, TransactionState::Active);
}
}
tx.state.store(TransactionState::Preparing);
Expand Down Expand Up @@ -660,7 +660,7 @@ impl<Clock: LogicalClock> Database<Clock> {
pub fn rollback_tx(&self, tx_id: TxID) {
let tx_unlocked = self.txs.get(&tx_id).unwrap();
let tx = tx_unlocked.value().write().unwrap();
assert!(tx.state == TransactionState::Active);
assert_eq!(tx.state, TransactionState::Active);
tx.state.store(TransactionState::Aborted);
tracing::trace!("ABORT {tx}");
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
Expand All @@ -677,6 +677,9 @@ impl<Clock: LogicalClock> Database<Clock> {
let tx = tx_unlocked.value().write().unwrap();
tx.state.store(TransactionState::Terminated);
tracing::trace!("TERMINATE {tx}");
// FIXME: verify that we can already remove the transaction here!
// Maybe it's fine for snapshot isolation, but too early for serializable?
self.txs.remove(&tx_id);
}

/// Generates next unique transaction id
Expand All @@ -693,34 +696,41 @@ impl<Clock: LogicalClock> Database<Clock> {
/// which sometimes leaves versions intact for too long.
/// Returns the number of removed versions.
pub fn drop_unused_row_versions(&self) -> usize {
tracing::debug!(
"transactions: {}; rows: {}",
tracing::trace!(
"Dropping unused row versions. Database stats: transactions: {}; rows: {}",
self.txs.len(),
self.rows.len()
);
let mut dropped = 0;
let mut to_remove = Vec::new();
for entry in self.rows.iter() {
let mut row_versions = entry.value().write().unwrap();
tracing::debug!("versions: {}", row_versions.len());
row_versions.retain(|rv| {
tracing::debug!("inspecting {rv:?}");
// FIXME: should take rv.begin into account as well
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
// a transaction started before this row version ended,
// ergo row version is needed
// a transaction started before this row version ended, ergo row version is needed
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
self.txs
.iter()
.any(|tx| version_end_ts >= tx.value().read().unwrap().begin_ts)
self.txs.iter().any(|tx| {
let tx = tx.value().read().unwrap();
// FIXME: verify!
match tx.state.load() {
TransactionState::Active | TransactionState::Preparing => {
version_end_ts > tx.begin_ts
}
_ => false,
}
})
}
// Let's skip potentially complex logic if the transaction is still
// Let's skip potentially complex logic if the transafction is still
// active/tracked. We will drop the row version when the transaction
// gets garbage-collected itself, it will always happen eventually.
Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
// this row version is current, ergo visible
None => true,
};
if !should_stay {
dropped += 1;
tracing::trace!(
"Dropping row version {:?} {:?}-{:?}",
entry.key(),
Expand All @@ -734,7 +744,6 @@ impl<Clock: LogicalClock> Database<Clock> {
to_remove.push(*entry.key());
}
}
let dropped = to_remove.len();
for id in to_remove {
self.rows.remove(&id);
}
Expand Down Expand Up @@ -766,11 +775,8 @@ pub(crate) fn is_write_write_conflict(
let te = txs.get(&rv_end).unwrap();
let te = te.value().read().unwrap();
match te.state.load() {
TransactionState::Active => tx.tx_id != te.tx_id,
TransactionState::Preparing => todo!(),
TransactionState::Committed(_end_ts) => todo!(),
TransactionState::Aborted => todo!(),
TransactionState::Terminated => todo!(),
TransactionState::Active | TransactionState::Preparing => tx.tx_id != te.tx_id,
_ => false,
}
}
Some(TxTimestampOrID::Timestamp(_)) => false,
Expand Down
11 changes: 10 additions & 1 deletion mvcc-rs/tests/concurrency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
let work = |prefix: &'static str| {
let db = db.clone();
std::thread::spawn(move || {
let mut failed_upserts = 0;
for i in 0..iterations {
if i % 1000 == 0 {
tracing::debug!("{prefix}: {i}");
Expand All @@ -100,11 +101,19 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
id,
data: format!("{prefix} @{tx}"),
};
db.upsert(tx, row.clone()).unwrap();
if let Err(e) = db.upsert(tx, row.clone()) {
tracing::trace!("upsert failed: {e}");
failed_upserts += 1;
continue;
}
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
tracing::info!(
"{prefix}'s failed upserts: {failed_upserts}/{iterations} {:.2}%",
(failed_upserts * 100) as f64 / iterations as f64
);
})
};

Expand Down

0 comments on commit d7c59bd

Please sign in to comment.