Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use SpawnNamed to give tasks names #1379

Merged
merged 2 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions availability-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#![warn(missing_docs)]

use futures::prelude::*;
use futures::{channel::{mpsc, oneshot}, task::Spawn};
use futures::channel::{mpsc, oneshot};
use keystore::KeyStorePtr;
use polkadot_primitives::{
Hash, Block,
Expand All @@ -39,6 +39,7 @@ use client::{
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use codec::{Encode, Decode};
use sp_core::traits::SpawnNamed;

use log::warn;

Expand Down Expand Up @@ -174,7 +175,7 @@ impl Store {
&self,
wrapped_block_import: I,
client: Arc<P>,
spawner: impl Spawn,
spawner: impl SpawnNamed,
keystore: KeyStorePtr,
) -> ClientResult<AvailabilityBlockImport<I, P>>
where
Expand Down
11 changes: 5 additions & 6 deletions availability-store/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use std::thread;

use log::{error, info, trace, warn};
use sp_blockchain::{Result as ClientResult};
use sp_blockchain::Result as ClientResult;
use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256};
use sp_api::{ApiExt, ProvideRuntimeApi};
use client::{
Expand All @@ -32,12 +32,13 @@ use consensus_common::{
ImportResult,
import_queue::CacheKeyId,
};
use sp_core::traits::SpawnNamed;
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
ValidatorPair, ErasureChunk,
};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}};
use futures::future::AbortHandle;
use keystore::KeyStorePtr;

Expand Down Expand Up @@ -641,7 +642,7 @@ impl<I, P> AvailabilityBlockImport<I, P> {
pub(crate) fn new(
client: Arc<P>,
block_import: I,
spawner: impl Spawn,
spawner: impl SpawnNamed,
keystore: KeyStorePtr,
to_worker: mpsc::UnboundedSender<WorkerMsg>,
) -> Self
Expand All @@ -662,9 +663,7 @@ impl<I, P> AvailabilityBlockImport<I, P> {
to_worker.clone(),
));

if let Err(_) = spawner.spawn(prune_available.map(drop)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
}
spawner.spawn("polkadot-prune-availibility", prune_available.map(drop).boxed());

AvailabilityBlockImport {
client,
Expand Down
7 changes: 4 additions & 3 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;

use futures::{future, Future, Stream, FutureExt, StreamExt, task::Spawn};
use futures::{future, Future, Stream, FutureExt, StreamExt};
use log::warn;
use sc_client_api::{StateBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
Expand Down Expand Up @@ -82,6 +82,7 @@ use polkadot_service_new::{
Error as ServiceError, FullNodeHandles, PolkadotClient,
};
use sc_service::SpawnTaskHandle;
use sp_core::traits::SpawnNamed;

const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -133,7 +134,7 @@ pub trait BuildParachainContext {
Client::Api: RuntimeApiCollection<Extrinsic>,
<Client::Api as ApiExt<Block>>::StateBackend: StateBackend<HashFor<Block>>,
Extrinsic: codec::Codec + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static;
SP: SpawnNamed + Clone + Send + Sync + 'static;
}

/// Parachain context needed for collation.
Expand Down Expand Up @@ -233,7 +234,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
Extrinsic: service::Codec + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
SP: SpawnNamed + Clone + Send + Sync + 'static,
{
Err("Collator is not functional with the new service yet".into())
}
Expand Down
9 changes: 2 additions & 7 deletions network/src/legacy/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash {
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<NetworkService<Block, Hash>>,
chain: C,
executor: &impl futures::task::Spawn,
executor: &impl sp_core::traits::SpawnNamed,
) -> RegisteredMessageValidator
{
let s = service.clone();
Expand Down Expand Up @@ -331,12 +331,7 @@ pub fn register_validator<C: ChainContext + 'static>(
let fut = futures::future::poll_fn(move |cx| {
gossip_engine.lock().poll_unpin(cx)
});
let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));

// Note: we consider the chances of an error to spawn a background task almost null.
if spawn_res.is_err() {
log::error!(target: "polkadot-gossip", "Failed to spawn background task");
}
executor.spawn("polkadot-legacy-gossip-engine", fut.boxed());
}

RegisteredMessageValidator {
Expand Down
176 changes: 97 additions & 79 deletions network/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use codec::{Decode, Encode};
use futures::channel::{mpsc, oneshot};
use futures::future::Either;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt, Context, Poll};
use futures::task::{Context, Poll};
use futures::stream::{FuturesUnordered, StreamFuture};
use log::{debug, trace};

Expand All @@ -44,6 +44,7 @@ use polkadot_validation::{
use sc_network::{ObservedRole, Event, PeerId};
use sp_api::ProvideRuntimeApi;
use sp_runtime::ConsensusEngineId;
use sp_core::traits::SpawnNamed;

use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::pin::Pin;
Expand Down Expand Up @@ -126,7 +127,9 @@ enum ServiceToWorkerMsg {
/// Messages from a background task to the main worker task.
enum BackgroundToWorkerMsg {
// Spawn a given future.
Spawn(future::BoxFuture<'static, ()>),
//
// The name is used for the future task.
Spawn(&'static str, future::BoxFuture<'static, ()>),
}

/// Operations that a handle to an underlying network service should provide.
Expand Down Expand Up @@ -221,7 +224,7 @@ pub fn start<C, Api, SP>(
C: ChainContext + 'static,
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
SP: Spawn + Clone + Send + 'static,
SP: SpawnNamed + Clone + Send + 'static,
{
const SERVICE_TO_WORKER_BUF: usize = 256;

Expand All @@ -234,67 +237,73 @@ pub fn start<C, Api, SP>(
chain_context,
&executor,
);
executor.spawn(worker_loop(
config,
service.clone(),
gossip_validator,
api,
worker_receiver,
executor.clone(),
))?;
executor.spawn(
"polkadot-network-worker",
worker_loop(
config,
service.clone(),
gossip_validator,
api,
worker_receiver,
executor.clone(),
).boxed(),
);

let polkadot_service = Service {
sender: worker_sender.clone(),
network_service: service.clone(),
};

executor.spawn(async move {
while let Some(event) = event_stream.next().await {
let res = match event {
Event::Dht(_) => continue,
Event::NotificationStreamOpened {
remote,
engine_id,
role,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }

worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
},
Event::NotificationStreamClosed {
remote,
engine_id,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }

worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
},
Event::NotificationsReceived {
remote,
messages,
} => {
let our_notifications = messages.into_iter()
.filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
Some(message)
} else {
None
})
.collect();
executor.spawn(
"polkadot-network-notifications",
async move {
while let Some(event) = event_stream.next().await {
let res = match event {
Event::Dht(_) => continue,
Event::NotificationStreamOpened {
remote,
engine_id,
role,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }

worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
},
Event::NotificationStreamClosed {
remote,
engine_id,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }

worker_sender.send(
ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
).await
}
};
worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
},
Event::NotificationsReceived {
remote,
messages,
} => {
let our_notifications = messages.into_iter()
.filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
Some(message)
} else {
None
})
.collect();

worker_sender.send(
ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
).await
}
};

if let Err(e) = res {
// full is impossible here, as we've `await`ed the value being sent.
if e.is_disconnected() {
break
if let Err(e) = res {
// full is impossible here, as we've `await`ed the value being sent.
if e.is_disconnected() {
break
}
}
}
}
})?;
}.boxed(),
);

Ok(polkadot_service)
}
Expand Down Expand Up @@ -845,7 +854,7 @@ struct Worker<Api, Sp, Gossip> {
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone,
Sp: SpawnNamed + Clone,
Gossip: GossipOps,
{
// spawns a background task to spawn consensus networking.
Expand Down Expand Up @@ -888,14 +897,17 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where

// glue the incoming messages, shared table, and validation
// work together.
let _ = self.executor.spawn(statement_import_loop(
relay_parent,
table,
self.api.clone(),
self.gossip_handle.clone(),
self.background_to_main_sender.clone(),
exit,
));
self.executor.spawn(
"polkadot-statement-import-loop",
statement_import_loop(
relay_parent,
table,
self.api.clone(),
self.gossip_handle.clone(),
self.background_to_main_sender.clone(),
exit,
).boxed(),
);
}

fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
Expand Down Expand Up @@ -932,12 +944,15 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
// before placing in the pool, so we can safely check by candidate hash.
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);

let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((pov_block, _)) = res {
let _ = sender.send(pov_block);
}
});
self.executor.spawn(
"polkadot-fetch-pov-block",
async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((pov_block, _)) = res {
let _ = sender.send(pov_block);
}
}.boxed(),
);
}
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
Expand All @@ -963,12 +978,15 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
"gossip message streams do not conclude early; qed"
));

let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((chunk, _)) = res {
let _ = sender.send(chunk);
}
});
self.executor.spawn(
"polkadot-fetch-erasure-chunk",
async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((chunk, _)) = res {
let _ = sender.send(chunk);
}
}.boxed(),
);
}
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
Expand Down Expand Up @@ -1017,8 +1035,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where

fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) {
match message {
BackgroundToWorkerMsg::Spawn(task) => {
let _ = self.executor.spawn(task);
BackgroundToWorkerMsg::Spawn(name, task) => {
let _ = self.executor.spawn(name, task);
}
}
}
Expand Down Expand Up @@ -1068,7 +1086,7 @@ async fn worker_loop<Api, Sp>(
) where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone,
Sp: SpawnNamed + Clone,
{
const BACKGROUND_TO_MAIN_BUF: usize = 16;

Expand Down Expand Up @@ -1250,7 +1268,7 @@ async fn statement_import_loop<Api>(

let work = future::select(work.boxed(), exit.clone()).map(drop);
if let Err(_) = to_worker.send(
BackgroundToWorkerMsg::Spawn(work.boxed())
BackgroundToWorkerMsg::Spawn("polkadot-statement-import-loop-sub-task", work.boxed())
).await {
// can fail only if remote has hung up - worker is dead,
// we should die too. this is defensive, since the exit future
Expand Down
Loading