Skip to content

Commit

Permalink
Bring down the node if the target block fails to return
Browse files Browse the repository at this point in the history
  • Loading branch information
samelamin committed Jan 13, 2023
1 parent de29579 commit c0ecb22
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 25 deletions.
2 changes: 1 addition & 1 deletion bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
spawn_handle: Box::new(task_manager.spawn_essential_handle()),
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
Expand Down
2 changes: 1 addition & 1 deletion bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ pub fn new_full_base(
config: &config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
spawn_handle: Box::new(task_manager.spawn_essential_handle()),
import_queue,
block_announce_validator_builder: None,
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
Expand Down
10 changes: 8 additions & 2 deletions client/network/sync/src/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
schema::v1::{StateRequest, StateResponse},
state::{ImportResult, StateSync},
};
use futures::FutureExt;
use futures::{executor::block_on, FutureExt};
use log::error;
use sc_client_api::ProofProvider;
use sc_network_common::sync::{
Expand Down Expand Up @@ -112,7 +112,13 @@ where
if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) {
Some(Phase::TargetBlock(target_block))
} else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) {
error!(target: "sync", "Failed to get target block. Error: {:?}",e);
block_on(async move {
error!(target: "sync", "Failed to get target block. Error: {:?}",e);
// This `return` might seem unnecessary, but we don't want to make it look like
// everything is working as normal even though the target block failed to be
// retrieved
return
});
None
} else {
None
Expand Down
74 changes: 53 additions & 21 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::block_validation::{
BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
};
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sp_core::traits::{CodeExecutor, SpawnEssentialNamed, SpawnNamed};
use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
use std::{str::FromStr, sync::Arc, time::SystemTime};
Expand Down Expand Up @@ -751,7 +751,7 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> {
/// A shared transaction pool.
pub transaction_pool: Arc<TExPool>,
/// A handle for spawning tasks.
pub spawn_handle: SpawnTaskHandle,
pub spawn_handle: Box<dyn SpawnEssentialNamed>,
/// An import queue.
pub import_queue: TImpQu,
/// A block announce validator builder.
Expand Down Expand Up @@ -827,7 +827,11 @@ where
config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
spawn_handle.spawn_essential(
"block-request-handler",
Some("networking"),
Box::pin(handler.run()),
);
protocol_config
};

Expand All @@ -839,7 +843,11 @@ where
client.clone(),
config.network.default_peers_set_num_full as usize,
);
spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
spawn_handle.spawn_essential(
"state-request-handler",
Some("networking"),
Box::pin(handler.run()),
);
protocol_config
};

Expand All @@ -856,7 +864,11 @@ where
config.chain_spec.fork_id(),
warp_with_provider.clone(),
);
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
spawn_handle.spawn_essential(
"warp-sync-request-handler",
Some("networking"),
Box::pin(handler.run()),
);
Some(protocol_config)
},
_ => None,
Expand All @@ -869,7 +881,11 @@ where
config.chain_spec.fork_id(),
client.clone(),
);
spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
spawn_handle.spawn_essential(
"light-client-request-handler",
Some("networking"),
Box::pin(handler.run()),
);
protocol_config
};

Expand Down Expand Up @@ -898,7 +914,11 @@ where

request_response_protocol_configs.push(config.network.ipfs_server.then(|| {
let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run());
spawn_handle.spawn_essential(
"bitswap-request-handler",
Some("networking"),
Box::pin(handler.run()),
);
protocol_config
}));

Expand All @@ -907,7 +927,7 @@ where
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
spawn_handle.spawn_essential("libp2p-node", Some("networking"), fut);
})
},
network_config: config.network.clone(),
Expand Down Expand Up @@ -955,13 +975,21 @@ where
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run());
spawn_handle.spawn(
spawn_handle.spawn_essential(
"network-transactions-handler",
Some("networking"),
Box::pin(tx_handler.run()),
);
spawn_handle.spawn_essential(
"chain-sync-network-service-provider",
Some("networking"),
chain_sync_network_provider.run(network.clone()),
Box::pin(chain_sync_network_provider.run(network.clone())),
);
spawn_handle.spawn_essential(
"import-queue",
None,
import_queue.run(Box::new(chain_sync_service)),
);
spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service)));

let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);

Expand Down Expand Up @@ -997,18 +1025,22 @@ where
// issue, and ideally we would like to fix the network future to take as little time as
// possible, but we also take the extra harm-prevention measure to execute the networking
// future using `spawn_blocking`.
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
log::warn!(
spawn_handle.spawn_essential_blocking(
"network-worker",
Some("networking"),
Box::pin(async move {
if network_start_rx.await.is_err() {
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
);
// This `return` might seem unnecessary, but we don't want to make it look like
// everything is working as normal even though the user is clearly misusing the API.
return
}
// This `return` might seem unnecessary, but we don't want to make it look like
// everything is working as normal even though the user is clearly misusing the API.
return
}

future.await
});
future.await
}),
);

Ok((network, system_rpc_tx, tx_handler_controller, NetworkStarter(network_start_tx)))
}
Expand Down

0 comments on commit c0ecb22

Please sign in to comment.