diff --git a/Cargo.lock b/Cargo.lock index e3ce0b2..2c6ecdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,6 +10,7 @@ dependencies = [ "once_cell", "tracing", "tracing-subscriber", + "vector-map", ] [[package]] @@ -36,6 +37,28 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "contracts" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9424f2ca1e42776615720e5746eed6efa19866fdbaac2923ab51c294ac4d1f2" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "hashbrown" version = "0.14.3" @@ -52,6 +75,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "libc" +version = "0.2.153" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + [[package]] name = "log" version = "0.4.20" @@ -86,6 +115,12 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.76" @@ -104,6 +139,47 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -119,6 +195,17 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.48" @@ -159,7 +246,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -209,12 +296,26 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vector-map" +version = "1.0.1" +dependencies = [ + "contracts", + "rand", +] + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "winapi" version = "0.3.9" @@ -254,5 +355,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] diff --git a/Cargo.toml b/Cargo.toml index be3a0cc..bd7bc34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,15 @@ edition = "2021" hashbrown = { version = "0.14.3", features = ["raw"] } tracing = "0.1.40" +# dev dependencies are not allowed to be optional +# kani doesnt use dev-dependencies :( +# https://github.com/model-checking/kani/issues/585 +# vector-map = { version = "1.0.1", optional = true } +vector-map = { path = "vec-map-rs", optional = true } + [dev-dependencies] once_cell = "1.19.0" tracing-subscriber = "0.3.18" + +[features] +verification = ["vector-map"] diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000..f933afc --- /dev/null +++ b/clippy.toml @@ -0,0 +1,6 @@ +disallowed-types = [ + { path = "std::collections::HashMap", reason = "Unsuitable for verification use crate::collections:Map that cfg's into the right version" }, + { path = "std::collections::HashSet", reason = "Unsuitable for verification use crate::collections:Set that cfg's into the right version" }, + { path = "hashbrown::HashMap", reason = "Unsuitable for verification use crate::collections:Map that cfg's into the right version" }, + { path = "hashbrown::HashSet", reason = "Unsuitable for verification use crate::collections:Set that cfg's into the right version" }, +] diff --git a/src/collections/mod.rs b/src/collections/mod.rs new file mode 100644 index 0000000..cdc8788 --- /dev/null +++ b/src/collections/mod.rs @@ -0,0 +1,224 @@ +#[allow(clippy::disallowed_types)] +#[cfg(not(kani))] +mod default { + use hashbrown::hash_map::Entry; + use hashbrown::raw::Bucket; + use hashbrown::{Equivalent, HashMap, HashSet}; + use std::hash::{BuildHasher, Hash}; + use std::marker::PhantomData; + + pub type Set = HashSet; + pub type Map = HashMap; + pub type MapEntry<'a, K, V, S, A> = Entry<'a, K, V, S, A>; + + // The problem this object aims to solve is that we cant mutably borrow different hashmap entities at the same time + // because rustc cant see that they are different thus it guards us from mutably borrowing one entity several times. + // Since we can guarantee that we wont borrow the same entity twice we can safely mutate different ones at the same time. + // TODO: HashMap during lookup can touch keys other than the passed one, thus it can dereference root key while searching + // for other keys, thus creating a shared borrow in addition to mutable one + // TODO + pub struct Lens<'a, 'b, K, V> { + root_key: K, + other_keys: &'a Set, + source: &'b mut Map, + } + + impl<'a, 'b, K, V> Lens<'a, 'b, K, V> + where + K: Eq + Hash, + { + // TODO there can be a trait that provides a method that returns id and a unique sequence that are guaranteed to be unique together + // I e Apply message can easily implement it, because transaction cant depend on itself thus Apply.txn_id + // and Apply.dependencies are guaranteed to be unique together + // TODO clarify that guards are needed because if these methods exist on Lens itself we must have shared reference as self parameter + // which is wrong because this way we'd be able to invoke several for_each_mut iterators inside of each other which would end + // up in multiple mutable references existing for the same memory location which violates safety + pub fn zoom( + root_key: K, + other_keys: &'a Set, + source: &'b mut Map, + ) -> Option<(LensRootGuard<'b, K, V>, LensIterGuard<'a, 'b, K, V>)> { + if other_keys.contains(&root_key) { + return None; + } + + let hash = source.hasher().hash_one(&root_key); + let root_bucket = source + .raw_table() + .find(hash, |(k, _)| k.equivalent(&root_key)) + .expect("TODO"); + + Some(( + LensRootGuard { + bucket: root_bucket, + _phantom: PhantomData, + }, + LensIterGuard { + other_keys, + source, + root_key, + }, + )) + } + } + + pub struct LensRootGuard<'a, K, V> { + bucket: Bucket<(K, V)>, + _phantom: PhantomData<&'a ()>, + } + + impl<'b, K, V> LensRootGuard<'b, K, V> + where + K: Eq + Hash, + { + pub fn with_mut(&mut self, f: impl FnOnce(&mut V) -> U) -> U { + unsafe { + let root = self.bucket.as_mut(); + f(&mut root.1) + } + } + } + + pub struct LensIterGuard<'a, 'b, K, V> { + root_key: K, + other_keys: &'a Set, + source: &'b mut Map, + } + impl<'a, 'b, K, V> LensIterGuard<'a, 'b, K, V> + where + K: Eq + Hash, + { + pub fn for_each_mut(&self, mut f: impl FnMut(&K, &mut V)) { + for key in self.other_keys { + let hash = self.source.hasher().hash_one(key); + let entry = self + .source + .raw_table() + .find(hash, |(k, _)| k.equivalent(key)) + .expect("TODO"); + + unsafe { + let (k, v) = entry.as_mut(); + f(k, v) + } + } + } + + pub fn exchange(self, other_keys: &'a Set) -> Option { + if other_keys.contains(&self.root_key) { + return None; + } + + Some(Self { + root_key: self.root_key, + other_keys, + source: self.source, + }) + } + } +} + +// #[cfg(test)] +#[cfg(kani)] +mod verification { + use std::marker::PhantomData; + + use vector_map::{set::VecSet, Entry, VecMap}; + + pub type Set = VecSet; + pub type Map = VecMap; + pub type MapEntry<'a, K, V> = Entry<'a, K, V>; + + pub struct Lens<'a, 'b, K, V> { + root_key: K, + other_keys: &'a Set, + source: &'b mut Map, + } + + impl<'a, 'b, K, V> Lens<'a, 'b, K, V> + where + K: Eq, + { + pub fn zoom( + root_key: K, + other_keys: &'a Set, + source: &'b mut Map, + ) -> Option<(LensRootGuard<'b, K, V>, LensIterGuard<'a, 'b, K, V>)> { + if other_keys.contains(&root_key) { + return None; + } + + let root_position = source.position(&root_key).unwrap(); + let root_ptr = unsafe { source.as_values_ptr().add(root_position) }; + + Some(( + LensRootGuard { + root_ptr, + _phantom: PhantomData, + }, + LensIterGuard { + other_keys, + source, + root_key, + }, + )) + } + } + + pub struct LensRootGuard<'a, K, V> { + root_ptr: *const V, + _phantom: PhantomData<&'a (K, V)>, + } + + impl<'b, K, V> LensRootGuard<'b, K, V> + where + K: Eq, + { + pub fn with_mut(&mut self, f: impl FnOnce(&mut V) -> U) -> U { + unsafe { + // This cant be unsound: https://github.com/rust-lang/rust/issues/66136 + let mut root = &mut *(self.root_ptr as *mut V); + f(&mut root) + } + } + } + + pub struct LensIterGuard<'a, 'b, K, V> { + root_key: K, + other_keys: &'a Set, + source: &'b mut Map, + } + impl<'a, 'b, K, V> LensIterGuard<'a, 'b, K, V> + where + K: Eq, + { + pub fn for_each_mut(&self, mut f: impl FnMut(&K, &mut V)) { + for key in self.other_keys { + let position = self.source.position(key).unwrap(); + unsafe { + let ptr = self.source.as_values_ptr().add(position); + let v = &mut *(ptr as *mut V); + f(key, v) + } + } + } + + pub fn exchange(self, other_keys: &'a Set) -> Option { + if other_keys.contains(&self.root_key) { + return None; + } + + Some(Self { + root_key: self.root_key, + other_keys, + source: self.source, + }) + } + } +} + +#[cfg(not(kani))] +pub use default::*; + +#[cfg(kani)] +pub use verification::*; diff --git a/src/lib.rs b/src/lib.rs index 777e3ce..84218a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,479 +1,2 @@ -use std::collections::HashSet; -use std::hash::{BuildHasher, Hash}; -use std::marker::PhantomData; - -use hashbrown::raw::Bucket; -use hashbrown::{Equivalent, HashMap}; - -mod coordinator; -pub mod messages; -pub mod node; -pub mod quorum_tracker; -mod replica; -pub mod timestamp; -pub mod topology; -pub mod transaction; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct NodeId(u16); - -impl std::fmt::Display for NodeId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("N{}", self.0)) - } -} - -// The problem this object aims to solve is that we cant mutably borrow different hashmap entities at the same time -// because rustc cant see that they are different thus it guards us from mutably borrowing one entity several times. -// Since we can guarantee that we wont borrow the same entity twice we can safely mutate different ones at the same time. -struct Lens<'a, 'b, K, V> { - root_key: K, - other_keys: &'a HashSet, - source: &'b mut HashMap, -} - -impl<'a, 'b, K, V> Lens<'a, 'b, K, V> -where - K: Eq + Hash, -{ - // TODO there can be a trait that provides a method that returns id and a unique sequence that are guaranteed to be unique together - // I e Apply message can easily implement it, because transaction cant depend on itself thus Apply.txn_id - // and Apply.dependencies are guaranteed to be unique together - // TODO clarify that guards are needed because if these methods exist on Lens itself we must have shared reference as self parameter - // which is wrong because this way we'd be able to invoke several for_each_mut iterators inside of each other which would end - // up in multiple mutable references existing for the same memory location which violates safety - fn new( - root_key: K, - other_keys: &'a HashSet, - source: &'b mut HashMap, - ) -> Option<(LensRootGuard<'b, K, V>, LensIterGuard<'a, 'b, K, V>)> { - if other_keys.contains(&root_key) { - return None; - } - - let hash = source.hasher().hash_one(&root_key); - let root_bucket = source - .raw_table() - .find(hash, |(k, _)| k.equivalent(&root_key)) - .expect("TODO"); - - Some(( - LensRootGuard { - bucket: root_bucket, - _phantom: PhantomData, - }, - LensIterGuard { - other_keys, - source, - root_key, - }, - )) - } -} - -struct LensRootGuard<'a, K, V> { - bucket: Bucket<(K, V)>, - _phantom: PhantomData<&'a ()>, -} - -impl<'b, K, V> LensRootGuard<'b, K, V> -where - K: Eq + Hash, -{ - fn with_mut(&mut self, f: impl FnOnce(&mut V) -> U) -> U { - unsafe { - let root = self.bucket.as_mut(); - f(&mut root.1) - } - } -} - -struct LensIterGuard<'a, 'b, K, V> { - root_key: K, - other_keys: &'a HashSet, - source: &'b mut HashMap, -} -impl<'a, 'b, K, V> LensIterGuard<'a, 'b, K, V> -where - K: Eq + Hash, -{ - fn for_each_mut(&self, mut f: impl FnMut(&K, &mut V)) { - for key in self.other_keys { - let hash = self.source.hasher().hash_one(key); - let entry = self - .source - .raw_table() - .find(hash, |(k, _)| k.equivalent(key)) - .expect("TODO"); - - unsafe { - let (k, v) = entry.as_mut(); - f(k, v) - } - } - } - - fn exchange(self, other_keys: &'a HashSet) -> Option { - if other_keys.contains(&self.root_key) { - return None; - } - - Some(Self { - root_key: self.root_key, - other_keys: other_keys, - source: self.source, - }) - } -} - -#[cfg(any(test, kani))] -mod harness { - use std::collections::{HashMap, HashSet}; - - use once_cell::sync::OnceCell; - use tracing::info; - - pub use crate::{ - coordinator::Coordinator, - messages::{ - Accept, AcceptOk, Apply, Commit, CommitAndRead, EitherCommitOrAccept, NewTransaction, - PreAccept, PreAcceptOk, Read, ReadOk, - }, - node::{DataStore, Node}, - replica::Replica, - timestamp::TimestampProvider, - topology::{KeyRange, Shard, ShardId, Topology}, - transaction::{Key, TransactionBody, Value}, - NodeId, - }; - - static LOG_HANDLE: OnceCell<()> = OnceCell::new(); - - #[derive(Debug)] - pub enum Message { - NewTransaction(NewTransaction), - PreAccept(PreAccept), - PreAcceptOk(PreAcceptOk), - Commit(Commit), - Accept(Accept), - AcceptOk(AcceptOk), - Read(Read), - ReadOk(ReadOk), - Apply(Apply), - } - - pub struct Harness { - pub nodes: HashMap, - pub topology: Topology, - } - - impl Harness { - pub fn new() -> Self { - let shards = vec![ - Shard { - range: KeyRange { - lo: Key(0), - hi: Key(10), - }, - node_ids: HashSet::from([NodeId(1), NodeId(2), NodeId(3)]), - }, - Shard { - range: KeyRange { - lo: Key(10), - hi: Key(20), - }, - node_ids: HashSet::from([NodeId(4), NodeId(5), NodeId(6)]), - }, - ]; - - let topology = Topology::new( - shards, - HashMap::from([ - (NodeId(1), vec![ShardId(0)]), - (NodeId(2), vec![ShardId(0)]), - (NodeId(3), vec![ShardId(0)]), - (NodeId(4), vec![ShardId(1)]), - (NodeId(5), vec![ShardId(1)]), - (NodeId(6), vec![ShardId(1)]), - ]), - ); - - let mut nodes = HashMap::new(); - for id in 1..=6 { - let timestamp_provider = TimestampProvider::new(NodeId(id)); - - let node = Node::new( - NodeId(id), - topology.clone(), - timestamp_provider, - Coordinator::default(), - Replica::default(), - DataStore::default(), - ); - nodes.insert(NodeId(id), node); - } - - #[cfg(not(kani))] - LOG_HANDLE.get_or_init(|| { - tracing_subscriber::fmt::init(); - }); - - Harness { nodes, topology } - } - - fn decompose_commit_and_read( - messages: &mut Vec<(NodeId, NodeId, Message)>, - mut commit_and_read: CommitAndRead, - src_node: NodeId, - ) { - for (node_id, commit) in commit_and_read.commits.drain() { - messages.push((src_node, node_id, Message::Commit(commit))) - } - - for (node_id, read) in commit_and_read.reads.drain(..) { - messages.push((src_node, node_id, Message::Read(read))) - } - } - - pub fn run(&mut self, initial_messages: Vec<(NodeId, NodeId, Message)>) { - let mut messages = initial_messages; - - while !messages.is_empty() { - let mut new_messages = vec![]; - - for (src_node, dst_node, message) in messages.drain(..) { - info!("{:?} --> {:?}: {:?}", src_node, dst_node, message); - let node = self.nodes.get_mut(&dst_node).expect("cant be missing"); - match message { - Message::NewTransaction(new_transaction) => { - for (node_id, message) in node.receive_new_transaction(new_transaction) - { - new_messages.push((dst_node, node_id, Message::PreAccept(message))) - } - } - Message::PreAccept(pre_accept) => { - new_messages.push(( - dst_node, - src_node, - Message::PreAcceptOk(node.receive_pre_accept(pre_accept)), - )); - } - Message::PreAcceptOk(pre_accept_ok) => { - if let Some(reply) = node.receive_pre_accept_ok(src_node, pre_accept_ok) - { - match reply { - EitherCommitOrAccept::Commit(commit_and_read) => { - Self::decompose_commit_and_read( - &mut new_messages, - commit_and_read, - dst_node, - ) - } - EitherCommitOrAccept::Accept(mut accept) => { - for (node_id, commit) in accept.drain() { - new_messages.push(( - dst_node, - node_id, - Message::Accept(commit), - )) - } - } - } - } - } - Message::Commit(commit) => node.receive_commit(commit), - Message::Accept(accept) => new_messages.push(( - dst_node, - src_node, - Message::AcceptOk(node.receive_accept(accept)), - )), - Message::AcceptOk(accept_ok) => { - if let Some(commit_and_read) = - node.receive_accept_ok(src_node, accept_ok) - { - Self::decompose_commit_and_read( - &mut new_messages, - commit_and_read, - dst_node, - ) - } - } - Message::Read(read) => { - if let Some(read_ok) = node.receive_read(src_node, read) { - new_messages.push((dst_node, src_node, Message::ReadOk(read_ok))) - } - } - Message::ReadOk(read_ok) => { - if let Some(mut apply) = node.receive_read_ok(src_node, read_ok) { - { - let (_, apply) = apply.first().unwrap(); - info!( - "Responding to client: {:?}: {:?}", - apply.txn_id, apply.result - ); - } - for (node_id, apply) in apply.drain(..) { - new_messages.push((dst_node, node_id, Message::Apply(apply))) - } - } - } - Message::Apply(apply) => { - for (node_id, read_ok) in node.receive_apply(src_node, apply) { - new_messages.push((dst_node, node_id, Message::ReadOk(read_ok))) - } - } - } - } - - messages = new_messages; - } - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - - use super::harness::*; - - #[test] - fn one_transaction() { - let mut harness = Harness::new(); - for node_id in &harness.topology.shard_for_key(&Key(1)).node_ids { - harness - .nodes - .get_mut(node_id) - .unwrap() - .data_store_mut() - .insert(Key(1), Value(1)); - } - - for node_id in &harness.topology.shard_for_key(&Key(11)).node_ids { - harness - .nodes - .get_mut(node_id) - .unwrap() - .data_store_mut() - .insert(Key(11), Value(11)); - } - - harness.run(vec![( - NodeId(0), - NodeId(1), - Message::NewTransaction(NewTransaction { - body: TransactionBody { - keys: HashSet::from([Key(1), Key(11)]), - }, - }), - )]); - } - - #[test] - fn two_transactions() { - let mut harness = Harness::new(); - for node_id in &harness.topology.shard_for_key(&Key(1)).node_ids { - let data_store = harness.nodes.get_mut(node_id).unwrap().data_store_mut(); - data_store.insert(Key(1), Value(1)); - data_store.insert(Key(2), Value(2)); - data_store.insert(Key(3), Value(3)); - } - - for node_id in &harness.topology.shard_for_key(&Key(11)).node_ids { - harness - .nodes - .get_mut(node_id) - .unwrap() - .data_store_mut() - .insert(Key(11), Value(11)); - } - - harness.run(vec![ - ( - NodeId(0), - NodeId(1), - Message::NewTransaction(NewTransaction { - body: TransactionBody { - keys: HashSet::from([Key(1), Key(11)]), - }, - }), - ), - ( - NodeId(0), - NodeId(4), - Message::NewTransaction(NewTransaction { - body: TransactionBody { - keys: HashSet::from([Key(2), Key(11)]), - }, - }), - ), - ]); - } -} - -#[cfg(kani)] -mod verification { - use std::collections::{BTreeMap, BTreeSet}; - - use super::harness::*; - - #[kani::proof] - #[kani::unwind(1)] - fn one_transaction() { - let mut harness = Harness::new(); - for node_id in &harness.topology.shard_for_key(&Key(1)).node_ids { - harness - .nodes - .get_mut(node_id) - .unwrap() - .data_store_mut() - .insert(Key(1), Value(1)); - } - - for node_id in &harness.topology.shard_for_key(&Key(11)).node_ids { - harness - .nodes - .get_mut(node_id) - .unwrap() - .data_store_mut() - .insert(Key(11), Value(11)); - } - - harness.run(vec![( - NodeId(0), - NodeId(1), - Message::NewTransaction(NewTransaction { - body: TransactionBody { - keys: BTreeSet::from([Key(1), Key(11)]), - }, - }), - )]); - } - - #[kani::proof] - #[kani::unwind(100)] - fn new_transaction() { - let node_id = NodeId(1); - - let shards = vec![Shard { - range: KeyRange { - lo: Key(0), - hi: Key(10), - }, - node_ids: BTreeSet::from([node_id]), - }]; - - let topology = Topology::new(shards, BTreeMap::from([(node_id, vec![ShardId(0)])])); - - let timestamp_provider = TimestampProvider::new(node_id); - - let node = Node::new( - node_id, - topology.clone(), - timestamp_provider, - Coordinator::default(), - Replica::default(), - DataStore::default(), - ); - } -} +mod collections; +pub mod protocol; diff --git a/src/coordinator.rs b/src/protocol/coordinator.rs similarity index 90% rename from src/coordinator.rs rename to src/protocol/coordinator.rs index 797d309..c01b91d 100644 --- a/src/coordinator.rs +++ b/src/protocol/coordinator.rs @@ -1,18 +1,18 @@ -use std::{ - cmp, - collections::{HashMap, HashSet}, -}; +use std::cmp; use crate::{ - messages::{ - Accept, AcceptOk, Apply, Commit, CommitAndRead, EitherCommitOrAccept, NewTransaction, - PreAccept, PreAcceptOk, Read, ReadOk, + collections::{Map, Set}, + protocol::{ + messages::{ + Accept, AcceptOk, Apply, Commit, CommitAndRead, EitherCommitOrAccept, NewTransaction, + PreAccept, PreAcceptOk, Read, ReadOk, + }, + quorum_tracker::{Outcome, QuorumTracker, ShardQuorumTracker}, + timestamp::{Timestamp, TimestampProvider, TxnId}, + topology::{ShardId, Topology}, + transaction::{execute, Key, TransactionBody, Value}, + NodeId, }, - quorum_tracker::{Outcome, QuorumTracker, ShardQuorumTracker}, - timestamp::{Timestamp, TimestampProvider, TxnId}, - topology::{ShardId, Topology}, - transaction::{execute, Key, TransactionBody, Value}, - NodeId, }; #[derive(Debug, Clone)] @@ -22,16 +22,16 @@ struct StageConsensus { // tracks progress across participating shards in PreAccept and AcceptRounds quorum_tracker: ShardQuorumTracker, // Transactions this one depends from, gathered from participating shards - dependencies: HashSet, + dependencies: Set, // Participating shards, keep them to avoid recalculation each time we need // to send a message to nodes from every shard - participating_shards: HashSet, + participating_shards: Set, // The logical core of the transaction. For now this is just the keys it accesses body: TransactionBody, } impl StageConsensus { - fn combine_dependencies(&mut self, dependencies: HashSet) { + fn combine_dependencies(&mut self, dependencies: Set) { // TODO (perf): does extend(foo.iter()) invoke .reserve? self.dependencies.reserve(dependencies.len()); for dep in dependencies { @@ -45,13 +45,13 @@ struct StageRead { // reads gathered from participating shards execute_at: Timestamp, // Transactions this one depends from, gathered from participating shards - dependencies: HashSet, + dependencies: Set, // KV pairs fetched as part of transaction Read interest reads: Vec<(Key, Value)>, // number of pending read requests pending_reads: u8, - participating_shards: HashSet, + participating_shards: Set, body: TransactionBody, } @@ -115,7 +115,7 @@ impl TransactionProgress { #[derive(Default)] pub struct Coordinator { - transactions: HashMap, + transactions: Map, } impl Coordinator { @@ -126,11 +126,11 @@ impl Coordinator { timestamp_provider: &mut TimestampProvider, topology: &Topology, ) -> Vec<(NodeId, PreAccept)> { - let txn_id = TxnId::from(timestamp_provider.next()); + let txn_id = TxnId::from(timestamp_provider.tick_next()); // TODO (feature): in the paper it is union of fast path electorates for all participating shards, simplify with simple quorum for now // TODO (clarity): more comments - let mut participating_nodes: HashSet = HashSet::new(); - let mut participating_shards: HashSet = HashSet::new(); + let mut participating_nodes: Set = Set::new(); + let mut participating_shards: Set = Set::new(); let mut quorum_tracker = ShardQuorumTracker::default(); for key in &txn.body.keys { @@ -169,7 +169,7 @@ impl Coordinator { TransactionProgress::PreAccept(StageConsensus { execute_at: Timestamp::from(txn_id), quorum_tracker, - dependencies: HashSet::new(), + dependencies: Set::new(), body: txn.body, participating_shards, }), @@ -183,7 +183,7 @@ impl Coordinator { txn_id: TxnId, topology: &Topology, ) -> CommitAndRead { - let mut commits = HashMap::new(); + let mut commits = Map::new(); let mut reads = vec![]; @@ -259,7 +259,7 @@ impl Coordinator { let have_quorum = pre_accept_stage .quorum_tracker - .record_outcome(src_node, &topology, Outcome::Success); + .record_outcome(src_node, topology, Outcome::Success); if !have_quorum { return None; @@ -268,7 +268,7 @@ impl Coordinator { if pre_accept_stage.execute_at == Timestamp::from(txn_id) { // we've reached fast path decision there were no conflicts so our initial // timestamp becomes the transaction execution timestamp - let commit_and_read = Self::make_commit_and_read(pre_accept_stage, txn_id, &topology); + let commit_and_read = Self::make_commit_and_read(pre_accept_stage, txn_id, topology); *progress = TransactionProgress::Read(StageRead::from_stage_consensus_and_commit( pre_accept_stage, @@ -279,7 +279,7 @@ impl Coordinator { } else { // we didnt reach fast path decision, we now need to broadcast new timestamp // for a transaction that is a maximum among all ones we've received from all participating nodes - let mut accepts = HashMap::new(); + let mut accepts = Map::new(); for key in &pre_accept_stage.body.keys { let shard = &topology.shard_for_key(key); @@ -306,7 +306,7 @@ impl Coordinator { *progress = TransactionProgress::Accept(StageConsensus { execute_at: pre_accept_stage.execute_at, quorum_tracker, - dependencies: HashSet::new(), + dependencies: Set::new(), participating_shards, body, }); @@ -331,13 +331,13 @@ impl Coordinator { let have_quorum = accept_stage .quorum_tracker - .record_outcome(src_node, &topology, Outcome::Success); + .record_outcome(src_node, topology, Outcome::Success); if !have_quorum { return None; } - let commit_and_read = Self::make_commit_and_read(accept_stage, accept_ok.txn_id, &topology); + let commit_and_read = Self::make_commit_and_read(accept_stage, accept_ok.txn_id, topology); *progress = TransactionProgress::Read(StageRead::from_stage_consensus_and_commit( accept_stage, diff --git a/src/messages.rs b/src/protocol/messages.rs similarity index 79% rename from src/messages.rs rename to src/protocol/messages.rs index 5e1f64d..221133f 100644 --- a/src/messages.rs +++ b/src/protocol/messages.rs @@ -1,6 +1,6 @@ -use std::collections::{HashMap, HashSet}; +use crate::collections::{Map, Set}; -use crate::{ +use crate::protocol::{ timestamp::{Timestamp, TxnId}, transaction::{Key, TransactionBody, Value}, NodeId, @@ -21,21 +21,21 @@ pub struct PreAccept { pub struct PreAcceptOk { pub txn_id: TxnId, pub execute_at: Timestamp, - pub dependencies: HashSet, + pub dependencies: Set, } #[derive(Debug)] pub struct Commit { pub txn_id: TxnId, pub execute_at: Timestamp, - pub dependencies: HashSet, + pub dependencies: Set, } #[derive(Debug)] pub struct Accept { pub txn_id: TxnId, pub execute_at: Timestamp, - pub dependencies: HashSet, + pub dependencies: Set, // TODO (correctness): at this point we've already sent body in PreAccept, why send it second time? Check with java version pub body: TransactionBody, } @@ -43,27 +43,27 @@ pub struct Accept { #[derive(Debug)] pub enum EitherCommitOrAccept { Commit(CommitAndRead), - Accept(HashMap), + Accept(Map), } #[derive(Debug)] pub struct AcceptOk { pub txn_id: TxnId, - pub dependencies: HashSet, + pub dependencies: Set, } #[derive(Debug)] pub struct Read { pub txn_id: TxnId, pub execute_at: Timestamp, - pub dependencies: HashSet, - pub keys: HashSet, + pub dependencies: Set, + pub keys: Set, } // TODO (perf): coalesce Commit and Read coming to one node into one network packet #[derive(Debug)] pub struct CommitAndRead { - pub commits: HashMap, + pub commits: Map, pub reads: Vec<(NodeId, Read)>, } @@ -77,7 +77,7 @@ pub struct ReadOk { pub struct Apply { pub txn_id: TxnId, pub execute_at: Timestamp, - pub dependencies: HashSet, + pub dependencies: Set, /// Result of the computation for transaction. /// For now we just take sum of the keys that were read and add it to one of the keys value pub result: (Key, Value), diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs new file mode 100644 index 0000000..87beab6 --- /dev/null +++ b/src/protocol/mod.rs @@ -0,0 +1,390 @@ +mod coordinator; +pub mod messages; +pub mod node; +pub mod quorum_tracker; +mod replica; +pub mod timestamp; +pub mod topology; +pub mod transaction; + +use std::hash::Hash; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct NodeId(u16); + +impl std::fmt::Display for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("N{}", self.0)) + } +} + +#[cfg(any(test, kani))] +mod harness { + use std::hash::Hash; + + use tracing::info; + + use crate::collections::{Map, Set}; + + pub use super::{ + coordinator::Coordinator, + messages::{ + Accept, AcceptOk, Apply, Commit, CommitAndRead, EitherCommitOrAccept, NewTransaction, + PreAccept, PreAcceptOk, Read, ReadOk, + }, + node::{DataStore, Node}, + replica::Replica, + timestamp::TimestampProvider, + topology::{KeyRange, Shard, ShardId, Topology}, + transaction::{Key, TransactionBody, Value}, + NodeId, + }; + + #[cfg(not(kani))] + mod log { + use once_cell::sync::OnceCell; + pub static LOG_HANDLE: OnceCell<()> = OnceCell::new(); + } + + #[derive(Debug)] + pub enum Message { + NewTransaction(NewTransaction), + PreAccept(PreAccept), + PreAcceptOk(PreAcceptOk), + Commit(Commit), + Accept(Accept), + AcceptOk(AcceptOk), + Read(Read), + ReadOk(ReadOk), + Apply(Apply), + } + + pub struct Harness { + pub nodes: Map, + pub topology: Topology, + } + + // vector map doesnt have corresponding from impl :( + pub fn set_from_iter(a: [T; N]) -> Set { + let mut s = Set::new(); + for t in a { + s.insert(t); + } + s + } + + impl Harness { + pub fn new() -> Self { + let shards = vec![ + Shard { + range: KeyRange { + lo: Key(0), + hi: Key(10), + }, + node_ids: set_from_iter([NodeId(1), NodeId(2), NodeId(3)]), + }, + Shard { + range: KeyRange { + lo: Key(10), + hi: Key(20), + }, + node_ids: set_from_iter([NodeId(4), NodeId(5), NodeId(6)]), + }, + ]; + + let mut node_to_shards = Map::new(); + for (node, shard) in [ + (NodeId(1), vec![ShardId(0)]), + (NodeId(2), vec![ShardId(0)]), + (NodeId(3), vec![ShardId(0)]), + (NodeId(4), vec![ShardId(1)]), + (NodeId(5), vec![ShardId(1)]), + (NodeId(6), vec![ShardId(1)]), + ] { + node_to_shards.insert(node, shard); + } + + let topology = Topology::new(shards, node_to_shards); + + let mut nodes = Map::new(); + for id in 1..=6 { + let timestamp_provider = TimestampProvider::new(NodeId(id)); + + let node = Node::new( + NodeId(id), + topology.clone(), + timestamp_provider, + Coordinator::default(), + Replica::default(), + DataStore::default(), + ); + nodes.insert(NodeId(id), node); + } + + #[cfg(not(kani))] + log::LOG_HANDLE.get_or_init(|| { + tracing_subscriber::fmt::init(); + }); + + Harness { nodes, topology } + } + + fn decompose_commit_and_read( + messages: &mut Vec<(NodeId, NodeId, Message)>, + mut commit_and_read: CommitAndRead, + src_node: NodeId, + ) { + for (node_id, commit) in commit_and_read.commits.drain() { + messages.push((src_node, node_id, Message::Commit(commit))) + } + + for (node_id, read) in commit_and_read.reads.drain(..) { + messages.push((src_node, node_id, Message::Read(read))) + } + } + + pub fn run(&mut self, initial_messages: Vec<(NodeId, NodeId, Message)>) { + let mut messages = initial_messages; + + while !messages.is_empty() { + let mut new_messages = vec![]; + + for (src_node, dst_node, message) in messages.drain(..) { + info!("{:?} --> {:?}: {:?}", src_node, dst_node, message); + let node = self.nodes.get_mut(&dst_node).expect("cant be missing"); + match message { + Message::NewTransaction(new_transaction) => { + for (node_id, message) in node.receive_new_transaction(new_transaction) + { + new_messages.push((dst_node, node_id, Message::PreAccept(message))) + } + } + Message::PreAccept(pre_accept) => { + new_messages.push(( + dst_node, + src_node, + Message::PreAcceptOk(node.receive_pre_accept(pre_accept)), + )); + } + Message::PreAcceptOk(pre_accept_ok) => { + if let Some(reply) = node.receive_pre_accept_ok(src_node, pre_accept_ok) + { + match reply { + EitherCommitOrAccept::Commit(commit_and_read) => { + Self::decompose_commit_and_read( + &mut new_messages, + commit_and_read, + dst_node, + ) + } + EitherCommitOrAccept::Accept(mut accept) => { + for (node_id, commit) in accept.drain() { + new_messages.push(( + dst_node, + node_id, + Message::Accept(commit), + )) + } + } + } + } + } + Message::Commit(commit) => node.receive_commit(commit), + Message::Accept(accept) => new_messages.push(( + dst_node, + src_node, + Message::AcceptOk(node.receive_accept(accept)), + )), + Message::AcceptOk(accept_ok) => { + if let Some(commit_and_read) = + node.receive_accept_ok(src_node, accept_ok) + { + Self::decompose_commit_and_read( + &mut new_messages, + commit_and_read, + dst_node, + ) + } + } + Message::Read(read) => { + if let Some(read_ok) = node.receive_read(src_node, read) { + new_messages.push((dst_node, src_node, Message::ReadOk(read_ok))) + } + } + Message::ReadOk(read_ok) => { + if let Some(mut apply) = node.receive_read_ok(src_node, read_ok) { + { + let (_, apply) = apply.first().unwrap(); + info!( + "Responding to client: {:?}: {:?}", + apply.txn_id, apply.result + ); + } + for (node_id, apply) in apply.drain(..) { + new_messages.push((dst_node, node_id, Message::Apply(apply))) + } + } + } + Message::Apply(apply) => { + for (node_id, read_ok) in node.receive_apply(src_node, apply) { + new_messages.push((dst_node, node_id, Message::ReadOk(read_ok))) + } + } + } + } + + messages = new_messages; + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::collections::Set; + + use super::harness::*; + + #[test] + fn one_transaction() { + let mut harness = Harness::new(); + for node_id in &harness.topology.shard_for_key(&Key(1)).node_ids { + harness + .nodes + .get_mut(node_id) + .unwrap() + .data_store_mut() + .insert(Key(1), Value(1)); + } + + for node_id in &harness.topology.shard_for_key(&Key(11)).node_ids { + harness + .nodes + .get_mut(node_id) + .unwrap() + .data_store_mut() + .insert(Key(11), Value(11)); + } + + harness.run(vec![( + NodeId(0), + NodeId(1), + Message::NewTransaction(NewTransaction { + body: TransactionBody { + keys: Set::from([Key(1), Key(11)]), + }, + }), + )]); + } + + #[test] + fn two_transactions() { + let mut harness = Harness::new(); + for node_id in &harness.topology.shard_for_key(&Key(1)).node_ids { + let data_store = harness.nodes.get_mut(node_id).unwrap().data_store_mut(); + data_store.insert(Key(1), Value(1)); + data_store.insert(Key(2), Value(2)); + data_store.insert(Key(3), Value(3)); + } + + for node_id in &harness.topology.shard_for_key(&Key(11)).node_ids { + harness + .nodes + .get_mut(node_id) + .unwrap() + .data_store_mut() + .insert(Key(11), Value(11)); + } + + harness.run(vec![ + ( + NodeId(0), + NodeId(1), + Message::NewTransaction(NewTransaction { + body: TransactionBody { + keys: Set::from([Key(1), Key(11)]), + }, + }), + ), + ( + NodeId(0), + NodeId(4), + Message::NewTransaction(NewTransaction { + body: TransactionBody { + keys: Set::from([Key(2), Key(11)]), + }, + }), + ), + ]); + } +} + +#[cfg(kani)] +mod verification { + use crate::collections::{Map, Set}; + + use super::harness::*; + + #[kani::proof] + #[kani::unwind(7)] + fn one_transaction() { + let mut harness = Harness::new(); + for node_id in &harness.topology.shard_for_key(&Key(1)).node_ids { + harness + .nodes + .get_mut(node_id) + .unwrap() + .data_store_mut() + .insert(Key(1), Value(1)); + } + + for node_id in &harness.topology.shard_for_key(&Key(11)).node_ids { + harness + .nodes + .get_mut(node_id) + .unwrap() + .data_store_mut() + .insert(Key(11), Value(11)); + } + + harness.run(vec![( + NodeId(0), + NodeId(1), + Message::NewTransaction(NewTransaction { + body: TransactionBody { + keys: set_from_iter([Key(1), Key(11)]), + }, + }), + )]); + } + + #[kani::proof] + // #[kani::unwind(100)] + fn new_transaction() { + let node_id = NodeId(1); + + let shards = vec![Shard { + range: KeyRange { + lo: Key(0), + hi: Key(10), + }, + node_ids: set_from_iter([node_id]), + }]; + + let mut nodes = Map::new(); + nodes.insert(node_id, vec![ShardId(0)]); + + let topology = Topology::new(shards, nodes); + + let timestamp_provider = TimestampProvider::new(node_id); + + let node = Node::new( + node_id, + topology.clone(), + timestamp_provider, + Coordinator::default(), + Replica::default(), + DataStore::default(), + ); + } +} diff --git a/src/node.rs b/src/protocol/node.rs similarity index 89% rename from src/node.rs rename to src/protocol/node.rs index bf80e97..1c0f0d1 100644 --- a/src/node.rs +++ b/src/protocol/node.rs @@ -1,21 +1,22 @@ -use std::collections::HashMap; - use crate::{ - coordinator::Coordinator, - messages::{ - Accept, AcceptOk, Apply, Commit, CommitAndRead, EitherCommitOrAccept, NewTransaction, - PreAccept, PreAcceptOk, Read, ReadOk, + collections::Map, + protocol::{ + coordinator::Coordinator, + messages::{ + Accept, AcceptOk, Apply, Commit, CommitAndRead, EitherCommitOrAccept, NewTransaction, + PreAccept, PreAcceptOk, Read, ReadOk, + }, + replica::Replica, + timestamp::TimestampProvider, + topology::Topology, + transaction::{Key, Value}, + NodeId, }, - replica::Replica, - timestamp::TimestampProvider, - topology::Topology, - transaction::{Key, Value}, - NodeId, }; #[derive(Default)] pub struct DataStore { - data: HashMap, + data: Map, } impl DataStore { diff --git a/src/quorum_tracker.rs b/src/protocol/quorum_tracker.rs similarity index 92% rename from src/quorum_tracker.rs rename to src/protocol/quorum_tracker.rs index fe49401..46a3f3d 100644 --- a/src/quorum_tracker.rs +++ b/src/protocol/quorum_tracker.rs @@ -1,8 +1,9 @@ -use std::collections::HashMap; - use crate::{ - topology::{KeyRange, Topology}, - NodeId, + collections::Map, + protocol::{ + topology::{KeyRange, Topology}, + NodeId, + }, }; #[derive(Debug, Clone, Copy)] @@ -44,7 +45,7 @@ impl QuorumTracker { #[derive(Debug, Default, Clone)] pub struct ShardQuorumTracker { // TODO use shard_id? - pub shards: HashMap, + pub shards: Map, } impl ShardQuorumTracker { diff --git a/src/replica.rs b/src/protocol/replica.rs similarity index 92% rename from src/replica.rs rename to src/protocol/replica.rs index bd80ff1..d5321cc 100644 --- a/src/replica.rs +++ b/src/protocol/replica.rs @@ -1,13 +1,14 @@ -use std::{cmp, collections::HashSet}; - -use hashbrown::{hash_map::Entry, HashMap}; +use std::cmp; use crate::{ - messages::{Accept, AcceptOk, Apply, Commit, PreAccept, PreAcceptOk, Read, ReadOk}, - node::DataStore, - timestamp::{Timestamp, TxnId}, - transaction::{self, Key, TransactionBody, Value}, - Lens, LensIterGuard, NodeId, + collections::{Lens, LensIterGuard, Map, MapEntry, Set}, + protocol::{ + messages::{Accept, AcceptOk, Apply, Commit, PreAccept, PreAcceptOk, Read, ReadOk}, + node::DataStore, + timestamp::{Timestamp, TxnId}, + transaction::{self, Key, TransactionBody, Value}, + NodeId, + }, }; #[derive(Debug, Clone, Copy)] @@ -22,7 +23,7 @@ struct StageConsensus { // dependencies waiting on this transaction to become committed / applied // TODO describe how this can get populated during consensus stage - dependencies_waiting: HashSet, + dependencies_waiting: Set, body: TransactionBody, } @@ -37,9 +38,9 @@ struct StageExecution { // additionally it compresses txids into ints by maintaining a separate mapping in a vec. Consider doing that too // I havent seen the reverse mapping on java side though. // Also look at updateDependencyAndMaybeExecute - pending_dependencies: HashMap, + pending_dependencies: Map, // dependencies waiting on this transaction to become committed / applied - dependencies_waiting: HashSet, + dependencies_waiting: Set, body: TransactionBody, } @@ -49,7 +50,7 @@ impl From<&mut StageConsensus> for StageExecution { StageExecution { execute_at: stage_consensus.execute_at, max_witnessed_at: stage_consensus.max_witnessed_at, - pending_dependencies: HashMap::new(), + pending_dependencies: Map::new(), dependencies_waiting: std::mem::take(&mut stage_consensus.dependencies_waiting), body: std::mem::take(&mut stage_consensus.body), } @@ -95,6 +96,7 @@ impl Committed { ReplicaTransactionProgress::Applied } + #[allow(clippy::wrong_self_convention)] fn into_pending_apply(&mut self, result: (Key, Value)) -> ReplicaTransactionProgress { ReplicaTransactionProgress::CommittedApplyPending(CommittedApplyPending { stage_execution: StageExecution::from(&mut self.stage_execution), @@ -184,7 +186,7 @@ impl ReplicaTransactionProgress { } } - fn pending_dependencies(&mut self) -> &mut HashMap { + fn pending_dependencies(&mut self) -> &mut Map { use ReplicaTransactionProgress::*; match self { @@ -199,7 +201,7 @@ impl ReplicaTransactionProgress { fn mark_dependency_committed(&mut self, dep_id: TxnId) { match self.pending_dependencies().entry(dep_id) { - Entry::Occupied(mut o) => { + MapEntry::Occupied(mut o) => { let dep = o.get_mut(); // TODO: // replicas wait to answer this message until every such dependency has @@ -212,7 +214,7 @@ impl ReplicaTransactionProgress { WaitingOn::Apply => unreachable!("cant be applied before committed"), } } - Entry::Vacant(_) => panic!("TODO: dependency must exist"), + MapEntry::Vacant(_) => panic!("TODO: dependency must exist"), }; } @@ -220,8 +222,8 @@ impl ReplicaTransactionProgress { let pending_dependencies = self.pending_dependencies(); let dep = match pending_dependencies.entry(dep_id) { - Entry::Occupied(o) => o, - Entry::Vacant(_) => panic!("TODO: dependency must exist"), + MapEntry::Occupied(o) => o, + MapEntry::Vacant(_) => panic!("TODO: dependency must exist"), }; match dep.get() { @@ -236,7 +238,7 @@ impl ReplicaTransactionProgress { #[derive(Default)] pub struct Replica { - transactions: HashMap, + transactions: Map, } impl Replica { @@ -245,7 +247,7 @@ impl Replica { pub fn receive_pre_accept(&mut self, pre_accept: PreAccept, node_id: NodeId) -> PreAcceptOk { let initial_timestamp = Timestamp::from(pre_accept.txn_id); // t0 in the paper let mut max_conflicting_timestamp = initial_timestamp; - let mut dependencies = HashSet::new(); + let mut dependencies = Set::new(); for (txn_id, transaction) in self .transactions @@ -288,7 +290,7 @@ impl Replica { ReplicaTransactionProgress::PreAccepted(StageConsensus { execute_at, max_witnessed_at: execute_at, - dependencies_waiting: HashSet::new(), + dependencies_waiting: Set::new(), body: pre_accept.body, }), ) @@ -319,7 +321,7 @@ impl Replica { }); // TODO (clarity) extract into function. - let mut dependencies = HashSet::new(); + let mut dependencies = Set::new(); for (txn_id, transaction) in self .transactions .iter() @@ -347,9 +349,9 @@ impl Replica { pub fn receive_commit(&mut self, commit: Commit) { // TODO (correctness) store full dependency set, transaction body etc // TODO (feature) arm recovery timer - let dummy = HashSet::new(); + let dummy = Set::new(); let (mut root_guard, dummy_iter_guard) = - Lens::new(commit.txn_id, &dummy, &mut self.transactions).expect("TODO"); + Lens::zoom(commit.txn_id, &dummy, &mut self.transactions).expect("TODO"); root_guard.with_mut(|progress| { let stage_consensus = progress.as_mut_pre_accepted_or_accepted().expect("TODO"); @@ -371,10 +373,10 @@ impl Replica { fn register_pending_dependencies( txn_id: TxnId, deps_iter_guard: &mut LensIterGuard, - ) -> HashMap { + ) -> Map { // Register this transaction in each of its dependencies so once they're // committed/applied this transaction can move forward too - let mut pending_dependencies = HashMap::new(); + let mut pending_dependencies = Map::new(); // TODO it is actually OK that dependency is not found because we send full set of dependencies deps_iter_guard.for_each_mut(|dep_id, dep| { @@ -423,7 +425,7 @@ impl Replica { data_store: &DataStore, ) -> Option { let (mut root_guard, mut deps_iter_guard) = - Lens::new(read.txn_id, &read.dependencies, &mut self.transactions).expect("TODO"); + Lens::zoom(read.txn_id, &read.dependencies, &mut self.transactions).expect("TODO"); root_guard.with_mut(|progress| { let stage_committed = progress.as_mut_committed().expect("TODO"); @@ -457,9 +459,9 @@ impl Replica { }) } - fn propagate_apply_to_waiting_dependencies<'a, 'b>( + fn propagate_apply_to_waiting_dependencies( txn_id: TxnId, - dependencies_waiting_iter_guard: LensIterGuard<'a, 'b, TxnId, ReplicaTransactionProgress>, + dependencies_waiting_iter_guard: LensIterGuard<'_, '_, TxnId, ReplicaTransactionProgress>, res: &mut Vec<(NodeId, ReadOk)>, data_store: &mut DataStore, ) { @@ -504,12 +506,12 @@ impl Replica { &mut self, _src_node: NodeId, apply: Apply, - mut data_store: &mut DataStore, + data_store: &mut DataStore, ) -> Vec<(NodeId, ReadOk)> { use ReplicaTransactionProgress::*; let (mut root_guard, mut pending_deps_iter_guard) = - Lens::new(apply.txn_id, &apply.dependencies, &mut self.transactions).expect("TODO"); + Lens::zoom(apply.txn_id, &apply.dependencies, &mut self.transactions).expect("TODO"); let mut res = vec![]; @@ -537,7 +539,7 @@ impl Replica { // waiting on us are disjoint. In other words there cant be a cycle. .expect("failed to exchange for deps_waiting_guard"), &mut res, - &mut data_store, + data_store, ); let (key, value) = apply.result; diff --git a/src/timestamp.rs b/src/protocol/timestamp.rs similarity index 96% rename from src/timestamp.rs rename to src/protocol/timestamp.rs index 99e6d2b..7cad249 100644 --- a/src/timestamp.rs +++ b/src/protocol/timestamp.rs @@ -1,4 +1,4 @@ -use crate::NodeId; +use crate::protocol::NodeId; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Timestamp { @@ -72,7 +72,7 @@ impl TimestampProvider { } } - pub fn next(&mut self) -> Timestamp { + pub fn tick_next(&mut self) -> Timestamp { self.counter += 1; Timestamp { time: self.counter, diff --git a/src/topology.rs b/src/protocol/topology.rs similarity index 80% rename from src/topology.rs rename to src/protocol/topology.rs index 1eb5572..a62f1b6 100644 --- a/src/topology.rs +++ b/src/protocol/topology.rs @@ -1,15 +1,14 @@ -use std::{ - cmp::Ordering, - collections::{HashMap, HashSet}, - ops::Deref, -}; +use std::{cmp::Ordering, ops::Deref}; -use crate::{transaction::Key, NodeId}; +use crate::{ + collections::{Map, Set}, + protocol::{transaction::Key, NodeId}, +}; #[derive(Debug, Clone)] pub struct Shard { pub range: KeyRange, - pub node_ids: HashSet, + pub node_ids: Set, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -28,7 +27,7 @@ pub struct Topology { // contains sorted non intersecting ranges shards: Vec, // Here Vec contains indexes into shard Vec above - node_to_shards: HashMap>, + node_to_shards: Map>, } pub struct ShardRef<'a> { @@ -51,7 +50,7 @@ impl<'a> Deref for ShardRef<'a> { } impl Topology { - pub fn new(shards: Vec, node_to_shards: HashMap>) -> Self { + pub fn new(shards: Vec, node_to_shards: Map>) -> Self { Self { shards, node_to_shards, @@ -62,17 +61,14 @@ impl Topology { let idx = self .shards .binary_search_by(|shard| { - let o = if &shard.range.lo <= key && key < &shard.range.hi { + if &shard.range.lo <= key && key < &shard.range.hi { Ordering::Equal } else if key < &shard.range.lo { Ordering::Greater } else { // key >= range.hi Ordering::Less - }; - - // dbg!(&shard.range.lo, key, &shard.range.hi, o); - o + } }) .expect("we cover whole key range from min to max"); @@ -107,11 +103,9 @@ impl Topology { #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; - - use crate::{transaction::Key, NodeId}; + use crate::collections::{Map, Set}; - use super::{KeyRange, Shard, ShardId, Topology}; + use super::{Key, KeyRange, NodeId, Shard, ShardId, Topology}; #[test] fn key_to_shard() { @@ -121,20 +115,20 @@ mod tests { lo: Key(0), hi: Key(10), }, - node_ids: HashSet::from([NodeId(1), NodeId(2), NodeId(3)]), + node_ids: Set::from([NodeId(1), NodeId(2), NodeId(3)]), }, Shard { range: KeyRange { lo: Key(10), hi: Key(20), }, - node_ids: HashSet::from([NodeId(4), NodeId(5), NodeId(6)]), + node_ids: Set::from([NodeId(4), NodeId(5), NodeId(6)]), }, ]; let topology = Topology::new( shards, - HashMap::from([ + Map::from([ (NodeId(1), vec![ShardId(0)]), (NodeId(2), vec![ShardId(0)]), (NodeId(3), vec![ShardId(0)]), diff --git a/src/transaction.rs b/src/protocol/transaction.rs similarity index 92% rename from src/transaction.rs rename to src/protocol/transaction.rs index 667d3fc..29a890c 100644 --- a/src/transaction.rs +++ b/src/protocol/transaction.rs @@ -1,6 +1,4 @@ -use std::collections::HashSet; - -use crate::node::DataStore; +use crate::{collections::Set, protocol::node::DataStore}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Key(pub usize); @@ -10,14 +8,14 @@ pub struct Value(pub usize); #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct TransactionBody { - pub keys: HashSet, + pub keys: Set, } trait TransactionBehavior { type ExecutionResult; /// Returns set of keys that transaction wants to read from or write to - fn read_write_set(&self) -> HashSet; + fn read_write_set(&self) -> Set; /// Executes core transaction logic by transforming read set into write set. /// Returned values are inserted during as part of apply phase.