Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use standalone runtime for RPC service #4459

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ckb-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn run_app_inner(
matches: &ArgMatches,
) -> Result<(), ExitCode> {
let is_silent_logging = is_silent_logging(cmd);
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime();
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(None);
let setup = Setup::from_matches(bin_name, cmd, matches)?;
let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?;

Expand Down
16 changes: 14 additions & 2 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::thread::available_parallelism;

use crate::helper::deadlock_detection;
use ckb_app_config::{ExitCode, RunArgs};
use ckb_async_runtime::Handle;
use ckb_async_runtime::{new_global_runtime, Handle};
use ckb_build_info::Version;
use ckb_launcher::Launcher;
use ckb_logger::info;
Expand All @@ -11,8 +13,11 @@ use ckb_types::core::cell::setup_system_cell_cache;
pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> {
deadlock_detection();

let rpc_threads_num = calc_rpc_threads_num(&args);
info!("ckb version: {}", version);
let mut launcher = Launcher::new(args, version, async_handle);
info!("run rpc server with {} threads", rpc_threads_num);
let (mut rpc_handle, _rpc_stop_rx, _runtime) = new_global_runtime(Some(rpc_threads_num));
let mut launcher = Launcher::new(args, version, async_handle, rpc_handle.clone());

let block_assembler_config = launcher.sanitize_block_assembler_config()?;
let miner_enable = block_assembler_config.is_some();
Expand Down Expand Up @@ -63,7 +68,14 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(),
})
.expect("Error setting Ctrl-C handler");

rpc_handle.drop_guard();
wait_all_ckb_services_exit();

Ok(())
}

fn calc_rpc_threads_num(args: &RunArgs) -> usize {
let system_parallelism: usize = available_parallelism().unwrap().into();
let default_num = usize::max(system_parallelism, 1);
args.config.rpc.threads.unwrap_or(default_num)
}
5 changes: 3 additions & 2 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl RpcServer {
};

let tcp_address = if let Some(addr) = config.tcp_listen_address {
let local_addr = handler.block_on(Self::start_tcp_server(rpc, addr));
let local_addr = handler.block_on(Self::start_tcp_server(rpc, addr, handler.clone()));
if let Ok(addr) = &local_addr {
info!("Listen TCP RPCServer on address: {}", addr);
};
Expand Down Expand Up @@ -137,11 +137,12 @@ impl RpcServer {
async fn start_tcp_server(
rpc: Arc<MetaIoHandler<Option<Session>>>,
tcp_listen_address: String,
handler: Handle,
) -> Result<SocketAddr, AnyError> {
// TCP server with line delimited json codec.
let listener = TcpListener::bind(tcp_listen_address).await?;
let tcp_address = listener.local_addr()?;
tokio::spawn(async move {
handler.spawn(async move {
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
Expand Down
2 changes: 1 addition & 1 deletion test/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Net {
)
})
.collect();
let (async_handle, _handle_recv, async_runtime) = new_global_runtime();
let (async_handle, _handle_recv, async_runtime) = new_global_runtime(None);
let controller = NetworkService::new(
Arc::clone(&network_state),
ckb_protocols,
Expand Down
3 changes: 3 additions & 0 deletions test/template/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ reject_ill_transactions = true
# By default deprecated rpc methods are disabled.
enable_deprecated_rpc = true

# threads number for RPC service, default value is the number of CPU cores
# threads = 4

[tx_pool]
max_tx_pool_size = 180_000_000 # 180mb
min_fee_rate = 0 # Here fee_rate are calculated directly using size in units of shannons/KB
Expand Down
8 changes: 5 additions & 3 deletions util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ pub struct Launcher {
pub version: Version,
/// ckb global runtime handle
pub async_handle: Handle,
/// rpc global runtime handle
pub rpc_handle: Handle,
}

impl Launcher {
/// Construct new Launcher from cli args
pub fn new(args: RunArgs, version: Version, async_handle: Handle) -> Self {
pub fn new(args: RunArgs, version: Version, async_handle: Handle, rpc_handle: Handle) -> Self {
Launcher {
args,
version,
async_handle,
rpc_handle,
}
}

Expand Down Expand Up @@ -427,8 +430,7 @@ impl Launcher {
builder.enable_subscription(shared.clone());
let io_handler = builder.build();

let async_handle = shared.async_handle();
let _rpc = RpcServer::new(rpc_config, io_handler, async_handle.clone());
let _rpc = RpcServer::new(rpc_config, io_handler, self.rpc_handle.clone());

network_controller
}
Expand Down
11 changes: 6 additions & 5 deletions util/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use ckb_spawn::Spawn;
use core::future::Future;
use std::sync::atomic::{AtomicU32, Ordering};

use std::thread::available_parallelism;
use tokio::runtime::Builder;
use tokio::runtime::Handle as TokioHandle;

Expand Down Expand Up @@ -88,9 +88,10 @@ impl Handle {
}

/// Create a new runtime with unique name.
fn new_runtime() -> Runtime {
fn new_runtime(worker_num: Option<usize>) -> Runtime {
Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into()))
.thread_name_fn(|| {
static ATOMIC_ID: AtomicU32 = AtomicU32::new(0);
let id = ATOMIC_ID
Expand Down Expand Up @@ -121,8 +122,8 @@ fn new_runtime() -> Runtime {
}

/// Create new threaded_scheduler tokio Runtime, return `Runtime`
pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) {
let runtime = new_runtime();
pub fn new_global_runtime(worker_num: Option<usize>) -> (Handle, Receiver<()>, Runtime) {
let runtime = new_runtime(worker_num);
let handle = runtime.handle().clone();
let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1);

Expand All @@ -132,7 +133,7 @@ pub fn new_global_runtime() -> (Handle, Receiver<()>, Runtime) {
/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle,
/// NOTICE: This is only used in testing
pub fn new_background_runtime() -> Handle {
let runtime = new_runtime();
let runtime = new_runtime(None);
let handle = runtime.handle().clone();

let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) =
Expand Down
2 changes: 1 addition & 1 deletion util/stop-handler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TestStopMemo {

#[test]
fn basic() {
let (mut handle, mut stop_recv, _runtime) = new_global_runtime();
let (mut handle, mut stop_recv, _runtime) = new_global_runtime(None);

ctrlc::set_handler(move || {
broadcast_exit_signals();
Expand Down
Loading