Skip to content

Commit

Permalink
use rusty_pool instead of rayon
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej authored and theduke committed Nov 28, 2023
1 parent 476d95d commit fae2cd5
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
25 changes: 24 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions lib/wasix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
rusty_pool = { version = "0.7.0", optional = true }
cfg-if = "1.0"
thiserror = "1"
tracing = { version = "0.1.37" }
Expand Down Expand Up @@ -66,7 +67,6 @@ tower-http = { version = "0.4.0", features = ["trace", "util", "catch-panic", "c
tower = { version = "0.4.13", features = ["make", "util"], optional = true }
url = "2.3.1"
petgraph = "0.6.3"
rayon = { version = "1.7.0", optional = true }
wasm-bindgen = { version = "0.2.87", optional = true }
js-sys = { version = "0.3.64", optional = true }
wasm-bindgen-futures = { version = "0.4.37", optional = true }
Expand Down Expand Up @@ -118,7 +118,7 @@ webc_runner_rt_emscripten = ["wasmer-emscripten"]
sys = ["webc/mmap", "time", "virtual-mio/sys"]
sys-default = ["sys", "logging", "host-fs", "sys-poll", "sys-thread", "host-vnet", "host-threads", "host-reqwest"]
sys-poll = []
sys-thread = ["tokio/rt", "tokio/time", "tokio/rt-multi-thread", "rayon"]
sys-thread = ["tokio/rt", "tokio/time", "tokio/rt-multi-thread", "rusty_pool"]

# Deprecated. Kept it for compatibility
compiler = []
Expand Down
44 changes: 34 additions & 10 deletions lib/wasix/src/runtime/task_manager/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,34 @@ impl RuntimeOrHandle {
}
}

#[derive(Clone)]
pub struct ThreadPool {
inner: rusty_pool::ThreadPool,
}

impl std::ops::Deref for ThreadPool {
type Target = rusty_pool::ThreadPool;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl std::fmt::Debug for ThreadPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ThreadPool")
.field("name", &self.get_name())
.field("current_worker_count", &self.get_current_worker_count())
.field("idle_worker_count", &self.get_idle_worker_count())
.finish()
}
}

/// A task manager that uses tokio to spawn tasks.
#[derive(Clone, Debug)]
pub struct TokioTaskManager {
rt: RuntimeOrHandle,
pool: Arc<rayon::ThreadPool>,
pool: Arc<ThreadPool>,
}

impl TokioTaskManager {
Expand All @@ -62,12 +85,13 @@ impl TokioTaskManager {

Self {
rt: rt.into(),
pool: Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(max_threads)
.build()
.unwrap(),
),
pool: Arc::new(ThreadPool {
inner: rusty_pool::Builder::new()
.name("TokioTaskManager Thread Pool".to_string())
.core_size(max_threads)
.max_size(max_threads)
.build(),
}),
}
}

Expand Down Expand Up @@ -141,7 +165,7 @@ impl VirtualTaskManager for TokioTaskManager {
self.rt.handle().spawn(async move {
let result = trigger.await;
// Build the task that will go on the callback
pool.spawn(move || {
pool.execute(move || {
// Invoke the callback
run(TaskWasmRunProperties {
ctx,
Expand All @@ -154,7 +178,7 @@ impl VirtualTaskManager for TokioTaskManager {
tracing::trace!("spawning task_wasm in blocking thread");

// Run the callback on a dedicated thread
self.pool.spawn(move || {
self.pool.execute(move || {
tracing::trace!("task_wasm started in blocking thread");

// Invoke the callback
Expand All @@ -173,7 +197,7 @@ impl VirtualTaskManager for TokioTaskManager {
&self,
task: Box<dyn FnOnce() + Send + 'static>,
) -> Result<(), WasiThreadError> {
self.pool.spawn(move || {
self.pool.execute(move || {
task();
});
Ok(())
Expand Down

0 comments on commit fae2cd5

Please sign in to comment.