From e9093b81213699f1043d7a7333febca6f0fa1084 Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 10 May 2024 17:06:50 +0800 Subject: [PATCH 1/2] add new runtime for rpc --- ckb-bin/src/lib.rs | 2 +- ckb-bin/src/subcommand/run.rs | 16 ++++++++++++++-- rpc/src/server.rs | 5 +++-- test/src/net.rs | 2 +- test/template/ckb.toml | 4 ++++ util/launcher/src/lib.rs | 8 +++++--- util/runtime/src/lib.rs | 11 ++++++----- util/stop-handler/src/tests.rs | 2 +- 8 files changed, 35 insertions(+), 15 deletions(-) diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index d632352e2c..23c09d238b 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -121,7 +121,7 @@ fn run_app_inner( matches: &ArgMatches, ) -> Result<(), ExitCode> { let is_silent_logging = is_silent_logging(cmd); - let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(); + let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(None); let setup = Setup::from_matches(bin_name, cmd, matches)?; let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?; diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 094b29bbb3..0659724ae2 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,6 +1,8 @@ +use std::thread::available_parallelism; + use crate::helper::deadlock_detection; use ckb_app_config::{ExitCode, RunArgs}; -use ckb_async_runtime::Handle; +use ckb_async_runtime::{new_global_runtime, Handle}; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; @@ -11,8 +13,11 @@ use ckb_types::core::cell::setup_system_cell_cache; pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { deadlock_detection(); + let rpc_threads_num = calc_rpc_threads_num(&args); info!("ckb version: {}", version); - let mut launcher = Launcher::new(args, version, async_handle); + info!("run rpc server with {} threads", rpc_threads_num); + let (mut rpc_handle, _rpc_stop_rx, _runtime) = new_global_runtime(Some(rpc_threads_num)); + let mut launcher = Launcher::new(args, version, async_handle, rpc_handle.clone()); let block_assembler_config = launcher.sanitize_block_assembler_config()?; let miner_enable = block_assembler_config.is_some(); @@ -63,7 +68,14 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), }) .expect("Error setting Ctrl-C handler"); + rpc_handle.drop_guard(); wait_all_ckb_services_exit(); Ok(()) } + +fn calc_rpc_threads_num(args: &RunArgs) -> usize { + let system_parallelism: usize = available_parallelism().unwrap().into(); + let default_num = usize::max(system_parallelism - 1, 1); + args.config.rpc.threads.unwrap_or(default_num) +} diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 9577e33d4f..13fd55e37c 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -64,7 +64,7 @@ impl RpcServer { }; let tcp_address = if let Some(addr) = config.tcp_listen_address { - let local_addr = handler.block_on(Self::start_tcp_server(rpc, addr)); + let local_addr = handler.block_on(Self::start_tcp_server(rpc, addr, handler.clone())); if let Ok(addr) = &local_addr { info!("Listen TCP RPCServer on address: {}", addr); }; @@ -137,11 +137,12 @@ impl RpcServer { async fn start_tcp_server( rpc: Arc>>, tcp_listen_address: String, + handler: Handle, ) -> Result { // TCP server with line delimited json codec. let listener = TcpListener::bind(tcp_listen_address).await?; let tcp_address = listener.local_addr()?; - tokio::spawn(async move { + handler.spawn(async move { let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024); let stream_config = StreamServerConfig::default() .with_channel_size(4) diff --git a/test/src/net.rs b/test/src/net.rs index 56c4f5676e..ae60192c3f 100644 --- a/test/src/net.rs +++ b/test/src/net.rs @@ -63,7 +63,7 @@ impl Net { ) }) .collect(); - let (async_handle, _handle_recv, async_runtime) = new_global_runtime(); + let (async_handle, _handle_recv, async_runtime) = new_global_runtime(None); let controller = NetworkService::new( Arc::clone(&network_state), ckb_protocols, diff --git a/test/template/ckb.toml b/test/template/ckb.toml index 0eea3eb7b1..bfcacd8594 100644 --- a/test/template/ckb.toml +++ b/test/template/ckb.toml @@ -76,6 +76,10 @@ reject_ill_transactions = true # By default deprecated rpc methods are disabled. enable_deprecated_rpc = true +# threads number for RPC service, +# default it's equal to the number of CPU cores minus one +# threads = 4 + [tx_pool] max_tx_pool_size = 180_000_000 # 180mb min_fee_rate = 0 # Here fee_rate are calculated directly using size in units of shannons/KB diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 0370339a54..469ef2b81d 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -43,15 +43,18 @@ pub struct Launcher { pub version: Version, /// ckb global runtime handle pub async_handle: Handle, + /// rpc global runtime handle + pub rpc_handle: Handle, } impl Launcher { /// Construct new Launcher from cli args - pub fn new(args: RunArgs, version: Version, async_handle: Handle) -> Self { + pub fn new(args: RunArgs, version: Version, async_handle: Handle, rpc_handle: Handle) -> Self { Launcher { args, version, async_handle, + rpc_handle, } } @@ -427,8 +430,7 @@ impl Launcher { builder.enable_subscription(shared.clone()); let io_handler = builder.build(); - let async_handle = shared.async_handle(); - let _rpc = RpcServer::new(rpc_config, io_handler, async_handle.clone()); + let _rpc = RpcServer::new(rpc_config, io_handler, self.rpc_handle.clone()); network_controller } diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs index a5f6965645..5b9809bea0 100644 --- a/util/runtime/src/lib.rs +++ b/util/runtime/src/lib.rs @@ -3,7 +3,7 @@ use ckb_spawn::Spawn; use core::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; - +use std::thread::available_parallelism; use tokio::runtime::Builder; use tokio::runtime::Handle as TokioHandle; @@ -88,9 +88,10 @@ impl Handle { } /// Create a new runtime with unique name. -fn new_runtime() -> Runtime { +fn new_runtime(worker_num: Option) -> Runtime { Builder::new_multi_thread() .enable_all() + .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into())) .thread_name_fn(|| { static ATOMIC_ID: AtomicU32 = AtomicU32::new(0); let id = ATOMIC_ID @@ -121,8 +122,8 @@ fn new_runtime() -> Runtime { } /// Create new threaded_scheduler tokio Runtime, return `Runtime` -pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) { - let runtime = new_runtime(); +pub fn new_global_runtime(worker_num: Option) -> (Handle, Receiver<()>, Runtime) { + let runtime = new_runtime(worker_num); let handle = runtime.handle().clone(); let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1); @@ -132,7 +133,7 @@ pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) { /// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, /// NOTICE: This is only used in testing pub fn new_background_runtime() -> Handle { - let runtime = new_runtime(); + let runtime = new_runtime(None); let handle = runtime.handle().clone(); let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) = diff --git a/util/stop-handler/src/tests.rs b/util/stop-handler/src/tests.rs index f45c200a83..fa747dca2d 100644 --- a/util/stop-handler/src/tests.rs +++ b/util/stop-handler/src/tests.rs @@ -113,7 +113,7 @@ impl TestStopMemo { #[test] fn basic() { - let (mut handle, mut stop_recv, _runtime) = new_global_runtime(); + let (mut handle, mut stop_recv, _runtime) = new_global_runtime(None); ctrlc::set_handler(move || { broadcast_exit_signals(); From 07da76d08c8e4c16d4434e5f881e67f6c9faf551 Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 21 May 2024 15:05:16 +0800 Subject: [PATCH 2/2] use full cpu cores for RPC service --- ckb-bin/src/subcommand/run.rs | 2 +- test/template/ckb.toml | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 0659724ae2..8d5ff1b22d 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -76,6 +76,6 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), fn calc_rpc_threads_num(args: &RunArgs) -> usize { let system_parallelism: usize = available_parallelism().unwrap().into(); - let default_num = usize::max(system_parallelism - 1, 1); + let default_num = usize::max(system_parallelism, 1); args.config.rpc.threads.unwrap_or(default_num) } diff --git a/test/template/ckb.toml b/test/template/ckb.toml index bfcacd8594..f713d47abc 100644 --- a/test/template/ckb.toml +++ b/test/template/ckb.toml @@ -76,8 +76,7 @@ reject_ill_transactions = true # By default deprecated rpc methods are disabled. enable_deprecated_rpc = true -# threads number for RPC service, -# default it's equal to the number of CPU cores minus one +# threads number for RPC service, default value is the number of CPU cores # threads = 4 [tx_pool]