Skip to content

Commit

Permalink
Fix skip leave signal randomly when SnapshotHandle::leave followed …
Browse files Browse the repository at this point in the history
…by a `SnapshotHandle::shutdown` immediately.
  • Loading branch information
al8n committed Apr 15, 2024
1 parent ffaaba2 commit 7198d5d
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 45 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ smol_str = "0.2"
smallvec = "1"
rand = "0.8"

# memberlist-types = { version = "0.2", path = "../memberlist/types", default-features = false }
# memberlist-core = { version = "0.2", path = "../memberlist/core", default-features = false }
# memberlist = { version = "0.2", path = "../memberlist/memberlist", default-features = false }

ruserf-core = { path = "core", version = "0.1.0", default-features = false }
ruserf-types = { path = "types", version = "0.1.0", default-features = false }
9 changes: 7 additions & 2 deletions core/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use super::{
};

/// KeyResponse is used to relay a query for a list of all keys in use.
#[derive(Default)]
#[viewit::viewit(
vis_all = "pub(crate)",
getters(style = "ref", vis_all = "pub"),
setters(skip)
)]
#[derive(Default, Debug)]
pub struct KeyResponse<I> {
/// Map of node id to response message
messages: HashMap<I, SmolStr>,
Expand Down Expand Up @@ -222,7 +227,7 @@ where
resp.num_resp += 1;

// Decode the response
if !r.payload.is_empty() || r.payload[0] != MessageType::KeyResponse as u8 {
if r.payload.is_empty() || r.payload[0] != MessageType::KeyResponse as u8 {
resp.messages.insert(
r.from.id().cheap_clone(),
SmolStr::new(format!(
Expand Down
16 changes: 13 additions & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn invalid_data_io_error<E: std::error::Error + Send + Sync + 'static>(e: E) ->
#[cfg(feature = "test")]
#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
pub mod tests {
pub use memberlist_core::tests::{next_socket_addr_v4, next_socket_addr_v6, run, AnyError};
pub use memberlist_core::tests::{next_socket_addr_v4, next_socket_addr_v6, AnyError};
pub use paste;

pub use super::serf::base::tests::{serf::*, *};
Expand Down Expand Up @@ -98,8 +98,8 @@ pub mod tests {
use std::sync::Once;
static TRACE: Once = Once::new();
TRACE.call_once(|| {
let filter =
std::env::var("RUSERF_TESTING_LOG").unwrap_or_else(|_| "ruserf_core=trace".to_owned());
let filter = std::env::var("RUSERF_TESTING_LOG")
.unwrap_or_else(|_| "ruserf_core=info,memberlist_core=debug".to_owned());
memberlist_core::tracing::subscriber::set_global_default(
tracing_subscriber::fmt::fmt()
.without_time()
Expand All @@ -113,4 +113,14 @@ pub mod tests {
.unwrap();
});
}

/// Run the unit test with a given async runtime sequentially.
pub fn run<B, F>(block_on: B, fut: F)
where
B: FnOnce(F) -> F::Output,
F: std::future::Future<Output = ()>,
{
// initialize_tests_tracing();
block_on(fut);
}
}
1 change: 0 additions & 1 deletion core/src/serf/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,6 @@ where
// channel signals that we are cleaned up outside of Serf.
*s = SerfState::Shutdown;
}

self.inner.memberlist.shutdown().await?;
self.inner.shutdown_tx.close();

Expand Down
24 changes: 17 additions & 7 deletions core/src/serf/base/tests/serf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn serf_get_queue_max<T>(
let want = 1024;
assert_eq!(got, want);

s.shutdown().await.unwrap();
sn.shutdown().await.unwrap();
<T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;

// Bring it under the number of nodes, so the calculation based on
Expand Down Expand Up @@ -153,6 +153,8 @@ pub async fn serf_get_queue_max<T>(
let got = snn.get_queue_max().await;
let want = 202;
assert_eq!(got, want);
snn.shutdown().await.unwrap();
drop(snn);
}

/// Unit tests for the update
Expand Down Expand Up @@ -240,6 +242,10 @@ pub async fn serf_update<T, F>(
}
}
assert!(found, "did not found s2 in members");

for s in serfs.iter() {
s.shutdown().await.unwrap();
}
}

/// Unit tests for the role
Expand Down Expand Up @@ -564,7 +570,7 @@ pub async fn serf_coordinates<T>(
}

if start.elapsed() > Duration::from_secs(7) {
panic!("timed out");
panic!("timed out cond1 {} cond2 {} cond3 {} cond4 {}", cond1, cond2, cond3, cond4);
}
}

Expand Down Expand Up @@ -635,8 +641,8 @@ pub async fn serf_coordinates<T>(
break;
}

if start.elapsed() > Duration::from_secs(7) {
panic!("timed out");
if start.elapsed() > Duration::from_secs(14) {
panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
}
}

Expand Down Expand Up @@ -791,18 +797,18 @@ pub async fn serf_write_keyring_file<T>(
let existing_bytes = general_purpose::STANDARD.decode(EXISTING).unwrap();
let sk = memberlist_core::types::SecretKey::try_from(existing_bytes.as_slice()).unwrap();

let s = Serf::<T>::new(
let serf = Serf::<T>::new(
get_transport_opts(sk),
test_config().with_keyring_file(Some(p.clone())),
)
.await
.unwrap();
assert!(
s.encryption_enabled(),
serf.encryption_enabled(),
"write keyring file test only works on encrypted serf"
);

let manager = s.key_manager();
let manager = serf.key_manager();
let new_key = general_purpose::STANDARD.decode(NEW_KEY).unwrap();
let new_sk = memberlist_core::types::SecretKey::try_from(new_key.as_slice()).unwrap();
manager.install_key(new_sk, None).await.unwrap();
Expand Down Expand Up @@ -848,6 +854,10 @@ pub async fn serf_write_keyring_file<T>(
assert_eq!(lines.len(), 3);

assert!(lines[1].contains(NEW_KEY));

let resp = manager.list_keys().await.unwrap();
assert_eq!(resp.primary_keys().len(), 1);
assert_eq!(resp.keys().len(), 1);
}

#[test]
Expand Down
8 changes: 2 additions & 6 deletions core/src/serf/base/tests/serf/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,10 +867,6 @@ where
}
assert_eq!(acks.len(), 2, "missing acks {acks:?}");
assert_eq!(responses.len(), 1, "missing responses {responses:?}");

for s in serfs {
let _ = s.shutdown().await;
}
}

/// Unit test for serf query filter
Expand Down Expand Up @@ -993,8 +989,8 @@ pub async fn serf_query_filter<T>(
assert_eq!(acks.len(), 1, "missing acks {acks:?}");
assert_eq!(responses.len(), 1, "missing responses {responses:?}");

for s in serfs {
let _ = s.shutdown().await;
for s in serfs.iter() {
s.shutdown().await.unwrap();
}
}

Expand Down
4 changes: 0 additions & 4 deletions core/src/serf/base/tests/serf/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ pub async fn serf_reconnect_same_ip<T, R, F>(
.collect(),
)
.await;

for s in serfs.iter() {
s.shutdown().await.unwrap();
}
}

#[derive(Clone)]
Expand Down
14 changes: 11 additions & 3 deletions core/src/serf/base/tests/serf/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,17 @@ pub async fn snapshoter_leave<T>(
// Open the snapshoter
let (shutdown_tx, shutdown_rx) = async_channel::bounded(1);
let res = open_and_replay_snapshot::<_, _, DefaultDelegate<T>, _>(&p, false).unwrap();
assert!(res.last_clock == 0.into());
assert!(res.last_event_clock == 0.into());
assert!(res.last_query_clock == 0.into());
assert!(res.last_clock == 0.into(), "last_clock: {}", res.last_clock);
assert!(
res.last_event_clock == 0.into(),
"last_event_clock: {}",
res.last_event_clock
);
assert!(
res.last_query_clock == 0.into(),
"last_query_clock: {}",
res.last_query_clock
);
let (out_tx, _out_rx) = async_channel::unbounded();
let (_, alive_nodes, _) = Snapshot::<T, DefaultDelegate<T>>::from_replay_result(
res,
Expand Down
47 changes: 28 additions & 19 deletions core/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,26 @@ where
default => break,
}
}
tracing::info!("ruserf: snapshotter tee stream exits");
tracing::debug!("ruserf: snapshotter tee stream exits");
}

fn handle_leave(&mut self) {
self.leaving = true;

// If we plan to re-join, keep our state
if !self.rejoin_after_leave {
self.alive_nodes.clear();
}
self.try_append(SnapshotRecord::Leave);
if let Some(fh) = self.fh.as_mut() {
if let Err(e) = fh.flush() {
tracing::error!(target="ruserf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
}

if let Err(e) = fh.get_mut().sync_all() {
tracing::error!(target="ruserf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
}
}
}

/// Long running routine that is used to handle events
Expand All @@ -559,23 +578,9 @@ where

loop {
futures::select! {
_ = self.leave_rx.recv().fuse() => {
self.leaving = true;

// If we plan to re-join, keep our state
if !self.rejoin_after_leave {
self.alive_nodes.clear();
}
self.try_append(SnapshotRecord::Leave);

if let Some(fh) = self.fh.as_mut() {
if let Err(e) = fh.flush() {
tracing::error!(target="ruserf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
}

if let Err(e) = fh.get_mut().sync_all() {
tracing::error!(target="ruserf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
}
signal = self.leave_rx.recv().fuse() => {
if signal.is_ok() {
self.handle_leave();
}
}
ev = self.stream_rx.recv().fuse() => {
Expand All @@ -594,6 +599,10 @@ where
}
}

if self.leave_rx.try_recv().is_ok() {
self.handle_leave();
}

// Setup a timeout
let flush_timeout = <T::Runtime as RuntimeLite>::sleep(SHUTDOWN_FLUSH_TIMEOUT);
futures::pin_mut!(flush_timeout);
Expand Down Expand Up @@ -632,7 +641,7 @@ where

self.wait_tx.close();
tee_handle.await;
tracing::info!("ruserf: snapshotter stream exits");
tracing::debug!("ruserf: snapshotter stream exits");
}

/// Used to handle a single user event
Expand Down

0 comments on commit 7198d5d

Please sign in to comment.