Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/validator api shutdown #1573

Merged
merged 12 commits into from
Aug 30, 2022
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
- validator: fixed local docker-compose setup to work on Apple M1 ([#1329])
- explorer-api: listen out for SIGTERM and SIGQUIT too, making it play nicely as a system service ([#1482]).
- network-requester: fix filter for suffix-only domains ([#1487])
- validator-api: listen out for SIGTERM and SIGQUIT too, making it play nicely as a system service ([#1496]).
- validator-api: listen out for SIGTERM and SIGQUIT too, making it play nicely as a system service; cleaner shutdown, without panics ([#1496], [#1573]).

### Changed

Expand Down Expand Up @@ -99,6 +99,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
[#1496]: https://github.com/nymtech/nym/pull/1496
[#1503]: https://github.com/nymtech/nym/pull/1503
[#1520]: https://github.com/nymtech/nym/pull/1520
[#1573]: https://github.com/nymtech/nym/pull/1573

## [v1.0.1](https://github.com/nymtech/nym/tree/v1.0.1) (2022-05-04)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ where
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(chunk_clone, topology, &self.ack_key, &recipient)
.await
.unwrap();

real_messages.push(RealMessage::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ where
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(chunk_clone, topology_ref, &self.ack_key, packet_recipient)
.await
.unwrap();

// if we have the ONLY strong reference to the ack data, it means it was removed from the
Expand Down
1 change: 0 additions & 1 deletion clients/webassembly/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ impl NymClient {
// don't bother with acks etc. for time being
let prepared_fragment = message_preparer
.prepare_chunk_for_sending(message_chunk, topology, &self.ack_key, &recipient)
.await
.unwrap();

console_warn!("packet is going to have round trip time of {:?}, but we're not going to do anything for acks anyway ", prepared_fragment.total_delay);
Expand Down
10 changes: 4 additions & 6 deletions common/nymsphinx/src/preparer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
/// - compute vk_b = g^x || v_b
/// - compute sphinx_plaintext = SURB_ACK || g^x || v_b
/// - compute sphinx_packet = Sphinx(recipient, sphinx_plaintext)
pub async fn prepare_chunk_for_sending(
pub fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
Expand All @@ -222,8 +222,7 @@ where
) -> Result<PreparedFragment, NymTopologyError> {
// create an ack
let (ack_delay, surb_ack_bytes) = self
.generate_surb_ack(fragment.fragment_identifier(), topology, ack_key)
.await?
.generate_surb_ack(fragment.fragment_identifier(), topology, ack_key)?
.prepare_for_sending();

// TODO:
Expand Down Expand Up @@ -294,7 +293,7 @@ where
}

/// Construct an acknowledgement SURB for the given [`FragmentIdentifier`]
async fn generate_surb_ack(
fn generate_surb_ack(
&mut self,
fragment_id: FragmentIdentifier,
topology: &NymTopology,
Expand Down Expand Up @@ -357,8 +356,7 @@ where
// gateways could not distinguish reply packets from normal messages due to lack of said acks
// note: the ack delay is irrelevant since we do not know the delay of actual surb
let (_, surb_ack_bytes) = self
.generate_surb_ack(reply_id, topology, ack_key)
.await?
.generate_surb_ack(reply_id, topology, ack_key)?
.prepare_for_sending();

let zero_pad_len = self.packet_size.plaintext_size()
Expand Down
13 changes: 11 additions & 2 deletions common/task/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::time::Duration;

use tokio::sync::watch::{self, error::SendError};

const SHUTDOWN_TIMER_SECS: u64 = 5;
const DEFAULT_SHUTDOWN_TIMER_SECS: u64 = 5;

/// Used to notify other tasks to gracefully shutdown
#[derive(Debug)]
pub struct ShutdownNotifier {
notify_tx: watch::Sender<()>,
notify_rx: Option<watch::Receiver<()>>,
shutdown_timer_secs: u64,
}

impl Default for ShutdownNotifier {
Expand All @@ -20,11 +21,19 @@ impl Default for ShutdownNotifier {
Self {
notify_tx,
notify_rx: Some(notify_rx),
shutdown_timer_secs: DEFAULT_SHUTDOWN_TIMER_SECS,
}
}
}

impl ShutdownNotifier {
pub fn new(shutdown_timer_secs: u64) -> Self {
Self {
shutdown_timer_secs,
..Default::default()
}
}

pub fn subscribe(&self) -> ShutdownListener {
ShutdownListener::new(
self.notify_rx
Expand All @@ -50,7 +59,7 @@ impl ShutdownNotifier {
_ = tokio::signal::ctrl_c() => {
log::info!("Forcing shutdown");
}
_ = tokio::time::sleep(Duration::from_secs(SHUTDOWN_TIMER_SECS)) => {
_ = tokio::time::sleep(Duration::from_secs(self.shutdown_timer_secs)) => {
log::info!("Timout reached, forcing shutdown");
},
}
Expand Down
22 changes: 15 additions & 7 deletions validator-api/src/contract_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,21 @@ impl<C> ValidatorCacheRefresher<C> {
while !shutdown.is_shutdown() {
tokio::select! {
_ = interval.tick() => {
if let Err(err) = self.refresh_cache().await {
error!("Failed to refresh validator cache - {}", err);
} else {
// relaxed memory ordering is fine here. worst case scenario network monitor
// will just have to wait for an additional backoff to see the change.
// And so this will not really incur any performance penalties by setting it every loop iteration
self.cache.initialised.store(true, Ordering::Relaxed)
tokio::select! {
biased;
_ = shutdown.recv() => {
trace!("ValidatorCacheRefresher: Received shutdown");
}
ret = self.refresh_cache() => {
if let Err(err) = ret {
error!("Failed to refresh validator cache - {}", err);
} else {
// relaxed memory ordering is fine here. worst case scenario network monitor
// will just have to wait for an additional backoff to see the change.
// And so this will not really incur any performance penalties by setting it every loop iteration
self.cache.initialised.store(true, Ordering::Relaxed)
}
}
}
}
_ = shutdown.recv() => {
Expand Down
6 changes: 4 additions & 2 deletions validator-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
let signing_nymd_client = Client::new_signing(&config);

let liftoff_notify = Arc::new(Notify::new());
let shutdown = ShutdownNotifier::default();
// We need a bigger timeout
let shutdown = ShutdownNotifier::new(10);

// let's build our rocket!
let rocket = setup_rocket(
Expand Down Expand Up @@ -609,7 +610,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
// spawn rewarded set updater
let mut rewarded_set_updater =
RewardedSetUpdater::new(signing_nymd_client, validator_cache.clone(), storage).await?;
tokio::spawn(async move { rewarded_set_updater.run().await.unwrap() });
let shutdown_listener = shutdown.subscribe();
tokio::spawn(async move { rewarded_set_updater.run(shutdown_listener).await.unwrap() });

validator_cache_listener
} else {
Expand Down
8 changes: 4 additions & 4 deletions validator-api/src/network_monitor/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use topology::NymTopology;
const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(200);
const DEFAULT_AVERAGE_ACK_DELAY: Duration = Duration::from_millis(200);

#[derive(Clone)]
pub(crate) struct Chunker {
rng: OsRng,
message_preparer: MessagePreparer<OsRng>,
Expand All @@ -30,7 +31,7 @@ impl Chunker {
}
}

pub(crate) async fn prepare_packets_from(
pub(crate) fn prepare_packets_from(
&mut self,
message: Vec<u8>,
topology: &NymTopology,
Expand All @@ -40,10 +41,10 @@ impl Chunker {
// but without some significant API changes in the `MessagePreparer` this was the easiest
// way to being able to have variable sender address.
self.message_preparer.set_sender_address(packet_sender);
self.prepare_packets(message, topology, packet_sender).await
self.prepare_packets(message, topology, packet_sender)
}

async fn prepare_packets(
fn prepare_packets(
&mut self,
message: Vec<u8>,
topology: &NymTopology,
Expand All @@ -62,7 +63,6 @@ impl Chunker {
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(message_chunk, topology, &ack_key, &packet_sender)
.await
.unwrap();

mix_packets.push(prepared_fragment.mix_packet);
Expand Down
21 changes: 17 additions & 4 deletions validator-api/src/network_monitor/monitor/gateways_pinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crypto::asymmetric::identity;
use crypto::asymmetric::identity::PUBLIC_KEY_LENGTH;
use log::{debug, info, trace, warn};
use std::time::Duration;
use task::ShutdownListener;
use tokio::time::{sleep, Instant};

// TODO: should it perhaps be moved to config along other timeout values?
Expand Down Expand Up @@ -143,10 +144,22 @@ impl GatewayPinger {
info!("Pinging all active gateways took {:?}", time_taken);
}

pub(crate) async fn run(&self) {
loop {
sleep(self.pinging_interval).await;
self.ping_and_cleanup_all_gateways().await
pub(crate) async fn run(&self, mut shutdown: ShutdownListener) {
while !shutdown.is_shutdown() {
tokio::select! {
_ = sleep(self.pinging_interval) => {
tokio::select! {
biased;
_ = shutdown.recv() => {
trace!("GatewaysPinger: Received shutdown");
}
_ = self.ping_and_cleanup_all_gateways() => (),
}
}
_ = shutdown.recv() => {
trace!("GatewaysPinger: Received shutdown");
}
}
}
}
}
26 changes: 19 additions & 7 deletions validator-api/src/network_monitor/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,15 @@ impl Monitor {

let mut packets = Vec::with_capacity(routes.len());
for route in routes {
packets.push(
self.packet_preparer
.prepare_test_route_viability_packets(route, self.route_test_packets)
.await,
);
let mut packet_preparer = self.packet_preparer.clone();
let route = route.clone();
let route_test_packets = self.route_test_packets;
let gateway_packets = tokio::spawn(async move {
packet_preparer.prepare_test_route_viability_packets(&route, route_test_packets)
})
.await
.unwrap();
packets.push(gateway_packets);
}

self.received_processor.set_route_test_nonce().await;
Expand Down Expand Up @@ -306,12 +310,20 @@ impl Monitor {
.await;

self.packet_sender
.spawn_gateways_pinger(self.gateway_ping_interval);
.spawn_gateways_pinger(self.gateway_ping_interval, shutdown.clone());

let mut run_interval = tokio::time::interval(self.run_interval);
while !shutdown.is_shutdown() {
tokio::select! {
_ = run_interval.tick() => self.test_run().await,
_ = run_interval.tick() => {
tokio::select! {
biased;
_ = shutdown.recv() => {
trace!("UpdateHandler: Received shutdown");
}
_ = self.test_run() => (),
}
}
_ = shutdown.recv() => {
trace!("UpdateHandler: Received shutdown");
}
Expand Down
28 changes: 11 additions & 17 deletions validator-api/src/network_monitor/monitor/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub(crate) struct PreparedPackets {
pub(super) invalid_gateways: Vec<InvalidNode>,
}

#[derive(Clone)]
pub(crate) struct PacketPreparer {
system_version: String,
chunker: Option<Chunker>,
Expand Down Expand Up @@ -151,7 +152,7 @@ impl PacketPreparer {
}
}

async fn wrap_test_packet(
fn wrap_test_packet(
&mut self,
packet: &TestPacket,
topology: &NymTopology,
Expand All @@ -162,12 +163,11 @@ impl PacketPreparer {
if self.chunker.is_none() {
self.chunker = Some(Chunker::new(packet_recipient));
}
let mut mix_packets = self
.chunker
.as_mut()
.unwrap()
.prepare_packets_from(packet.to_bytes(), topology, packet_recipient)
.await;
let mut mix_packets = self.chunker.as_mut().unwrap().prepare_packets_from(
packet.to_bytes(),
topology,
packet_recipient,
);
assert_eq!(
mix_packets.len(),
1,
Expand Down Expand Up @@ -351,7 +351,7 @@ impl PacketPreparer {
)
}

pub(crate) async fn prepare_test_route_viability_packets(
pub(crate) fn prepare_test_route_viability_packets(
&mut self,
route: &TestRoute,
num: usize,
Expand All @@ -360,9 +360,7 @@ impl PacketPreparer {
let test_packet = route.self_test_packet();
let recipient = self.create_packet_sender(route.gateway());
for _ in 0..num {
let mix_packet = self
.wrap_test_packet(&test_packet, route.topology(), recipient)
.await;
let mix_packet = self.wrap_test_packet(&test_packet, route.topology(), recipient);
mix_packets.push(mix_packet)
}

Expand Down Expand Up @@ -451,9 +449,7 @@ impl PacketPreparer {
let topology = test_route.substitute_mix(mixnode);
// produce n mix packets
for _ in 0..self.per_node_test_packets {
let mix_packet = self
.wrap_test_packet(&test_packet, &topology, recipient)
.await;
let mix_packet = self.wrap_test_packet(&test_packet, &topology, recipient);
mix_packets.push(mix_packet);
}
}
Expand All @@ -476,9 +472,7 @@ impl PacketPreparer {
let topology = test_route.substitute_gateway(gateway);
// produce n mix packets
for _ in 0..self.per_node_test_packets {
let mix_packet = self
.wrap_test_packet(&test_packet, &topology, recipient)
.await;
let mix_packet = self.wrap_test_packet(&test_packet, &topology, recipient);
gateway_mix_packets.push(mix_packet);
}

Expand Down
Loading