Skip to content

Commit

Permalink
Add Tokio 1.0 runtime (#33)
Browse files Browse the repository at this point in the history
* feat: add tokio@1 runtime

* doc: add `runtime_tokio1` feature to README
  • Loading branch information
chpio committed Mar 11, 2021
1 parent c672986 commit 16184b4
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ attributes = ["agnostik-attributes"]
runtime_bastion = ["bastion-executor", "lightproc"]
runtime_asyncstd = ["async_std_crate"]
runtime_tokio = ["tokio_crate"]
runtime_tokio1 = ["tokio1_crate"]
runtime_smol = ["smol_crate"]

[dependencies]
agnostik-attributes = { version = "1.2.0", optional = true }
bastion-executor = { version = "0.4", optional = true }
async_std_crate = { version = "1.7.0", optional = true, features = ["unstable"], package = "async-std" }
tokio_crate = { version = "0.3.4", optional = true, features = ["rt", "rt-multi-thread"], package = "tokio" }
tokio1_crate = { version = "1", optional = true, features = ["rt", "rt-multi-thread"], package = "tokio" }
lightproc = { version = "0.3", optional = true }
smol_crate = { version = "1.2.4", optional = true, package = "smol" }
once_cell = "1.5.2"
Expand All @@ -30,6 +32,7 @@ pin-project = "1.0.2"
[dev-dependencies]
agnostik = { path = ".", features = ["attributes"] }
tokio_crate = { version = "0.3.4", features = ["time"], package = "tokio" }
tokio1_crate = { version = "1", features = ["time"], package = "tokio" }

[build-dependencies]
cfg_aliases = "0.1.1"
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ You can choose the executor, by using cargo features.
There can only be one enabled runtime.
Valid features are:
- `runtime_bastion` to use the [Bastion Executor](https://crates.io/crates/bastion-executor)
- `runtime_tokio` to use the [Tokio](https://tokio.rs) runtime
- `runtime_tokio` to use the [Tokio version >0.3.4](https://tokio.rs) runtime
- `runtime_tokio1` to use the [Tokio version 1.*](https://tokio.rs) runtime
- `runtime_asyncstd` to use the [AsyncStd](https://async.rs) runtime
- `runtime_smol` to use the new and awesome [smol](https://docs.rs/smol) runtime

Expand Down
6 changes: 4 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::env;
const EXECUTOR_FEATURES: &[&str] = &[
"CARGO_FEATURE_RUNTIME_BASTION",
"CARGO_FEATURE_RUNTIME_TOKIO",
"CARGO_FEATURE_RUNTIME_TOKIO1",
"CARGO_FEATURE_RUNTIME_ASYNCSTD",
"CARGO_FEATURE_RUNTIME_SMOL",
];
Expand All @@ -18,10 +19,11 @@ fn main() {
cfg_aliases! {
bastion: { feature = "runtime_bastion" },
tokio: { feature = "runtime_tokio" },
tokio1: { feature = "runtime_tokio1" },
async_std: { feature = "runtime_asyncstd" },
smol: { feature = "runtime_smol" },

local_spawn: { any(tokio, async_std) },
enable: { any(smol, tokio, async_std, bastion) },
local_spawn: { any(tokio, tokio1, async_std) },
enable: { any(smol, tokio, tokio1, async_std, bastion) },
}
}
2 changes: 2 additions & 0 deletions ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ if [ "$1" = "check" ]; then
cargo check --features=runtime_bastion
cargo check --features=runtime_asyncstd
cargo check --features=runtime_tokio
cargo check --features=runtime_tokio1
cargo check --features=runtime_smol
elif [ "$1" = "test" ]; then
cargo test --features=runtime_bastion
cargo test --features=runtime_asyncstd
cargo test --features=runtime_tokio
cargo test --features=runtime_tokio1
cargo test --features=runtime_smol
else
echo "You have to provide either 'check' or 'test' argument"
Expand Down
5 changes: 5 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ mod tokio;
#[cfg(tokio)]
pub use tokio::*;

#[cfg(tokio1)]
mod tokio1;
#[cfg(tokio1)]
pub use tokio1::*;

#[cfg(smol)]
mod smol;
#[cfg(smol)]
Expand Down
65 changes: 65 additions & 0 deletions src/executor/tokio1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::join_handle::{InnerJoinHandle, JoinHandle};
use crate::{AgnostikExecutor, LocalAgnostikExecutor};
use std::future::Future;
use std::sync::Mutex;
use tokio1_crate as tokio;

/// A wrapper around the `tokio` (version 1.*) crate which implements `AgnostikExecutor` and
/// `LocalAgnostikExecutor`.
pub struct Tokio1Executor(Mutex<tokio::runtime::Runtime>);

impl Tokio1Executor {
/// Create a new `Tokio1Executor`.
pub fn new() -> Self {
Self::with_runtime(tokio::runtime::Runtime::new().expect("failed to create runtime"))
}

/// Create a new `TokioExecutor` with a custom runtime.
pub fn with_runtime(runtime: tokio::runtime::Runtime) -> Self {
Tokio1Executor(Mutex::new(runtime))
}

pub(crate) fn set_runtime(&self, runtime: tokio::runtime::Runtime) {
let mut inner = self.0.lock().unwrap();
*inner = runtime;
}
}

impl AgnostikExecutor for Tokio1Executor {
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let handle = tokio::task::spawn(future);
JoinHandle(InnerJoinHandle::Tokio1(handle))
}

fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let handle = tokio::task::spawn_blocking(task);
JoinHandle(InnerJoinHandle::Tokio1(handle))
}

fn block_on<F>(&self, future: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.0.lock().unwrap().block_on(future)
}
}

impl LocalAgnostikExecutor for Tokio1Executor {
fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let handle = tokio::task::spawn_local(future);
JoinHandle(InnerJoinHandle::Tokio1(handle))
}
}
9 changes: 9 additions & 0 deletions src/join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use std::{
use async_std_crate::task::JoinHandle as AsyncStdHandle;
#[cfg(bastion)]
use lightproc::recoverable_handle::RecoverableHandle;
#[cfg(tokio1)]
use tokio1_crate::task::JoinHandle as Tokio1Handle;
#[cfg(tokio)]
use tokio_crate::task::JoinHandle as TokioHandle;

Expand All @@ -38,6 +40,9 @@ pub enum InnerJoinHandle<R> {
/// The `JoinHandle` which is used for the tokio runtime.
#[cfg(tokio)]
Tokio(#[pin] TokioHandle<R>),
/// The `JoinHandle` which is used for the tokio runtime.
#[cfg(tokio1)]
Tokio1(#[pin] Tokio1Handle<R>),
/// The `JoinHandle` which is used for the smol runtime.
#[cfg(smol)]
Smol(#[pin] smol_crate::Task<R>),
Expand Down Expand Up @@ -77,6 +82,10 @@ where
JoinHandleProj::Tokio(handle) => handle
.poll(cx)
.map(|val| val.expect("task failed to execute")),
#[cfg(tokio1)]
JoinHandleProj::Tokio1(handle) => handle
.poll(cx)
.map(|val| val.expect("task failed to execute")),
#[cfg(smol)]
JoinHandleProj::Smol(handle) => handle.poll(cx),
JoinHandleProj::__Private(_, _) => unreachable!(),
Expand Down
51 changes: 50 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ static EXECUTOR: Lazy<executor::AsyncStdExecutor> = Lazy::new(|| executor::Async
#[cfg(tokio)]
static EXECUTOR: Lazy<executor::TokioExecutor> = Lazy::new(|| executor::TokioExecutor::new());

#[cfg(tokio1)]
static EXECUTOR: Lazy<executor::Tokio1Executor> = Lazy::new(|| executor::Tokio1Executor::new());

#[cfg(smol)]
static EXECUTOR: Lazy<executor::SmolExecutor> = Lazy::new(|| executor::SmolExecutor);

Expand Down Expand Up @@ -248,6 +251,22 @@ impl Agnostik {
executor::TokioExecutor::new()
}

/// Returns an [LocalAgnostikExecutor], that will use the [Tokio] runtime to spawn futures.
///
/// **Attention:** This method will create a new [Runtime] object using the [Runtime::new]
/// method and will panic if it fails to create the [Runtime] object.
/// If you want to use your own [Runtime] object, use [tokio_with_runtime] instead.
///
/// [Tokio]: https://docs.rs/tokio
/// [Runtime]: tokio1_crate::runtime::Runtime
/// [Runtime::new]: tokio1_crate::runtime::Runtime::new
/// [tokio_with_runtime]: Self::tokio_with_runtime
/// [LocalAgnostikExecutor]: LocalAgnostikExecutor
#[cfg(tokio1)]
pub fn tokio() -> impl LocalAgnostikExecutor {
executor::Tokio1Executor::new()
}

/// Returns an [LocalAgnostikExecutor], that will use the [Tokio] runtime to spawn futures.
/// It will use the given [Runtime] object to spawn, and block_on futures. The spawn_blocking method
/// will use the [tokio::task::spawn_blocking] method.
Expand All @@ -264,6 +283,22 @@ impl Agnostik {
executor::TokioExecutor::with_runtime(runtime)
}

/// Returns an [LocalAgnostikExecutor], that will use the [Tokio] runtime to spawn futures.
/// It will use the given [Runtime] object to spawn, and block_on futures. The spawn_blocking method
/// will use the [tokio::task::spawn_blocking] method.
///
/// [tokio::task::spawn_blocking]: tokio1_crate::task::spawn_blocking
/// [Tokio]: https://docs.rs/tokio
/// [Runtime]: tokio1_crate::runtime::Runtime
/// [tokio_with_runtime]: ./fn.tokio_with_runtime.html
/// [LocalAgnostikExecutor]: LocalAgnostikExecutor
#[cfg(tokio1)]
pub fn tokio_with_runtime(
runtime: tokio1_crate::runtime::Runtime,
) -> impl LocalAgnostikExecutor {
executor::Tokio1Executor::with_runtime(runtime)
}

/// Returns an [LocalAgnostikExecutor] that will use the [smol] runtime, to spawn and run futures.
///
/// [smol]: https://docs.rs/smol
Expand Down Expand Up @@ -317,7 +352,7 @@ where

/// This method will set the [`tokio Runtime`] in the global executor.
///
/// [`tokio Runtime`]: https://docs.rs/tokio/0.2.21/tokio/runtime/struct.Runtime.html
/// [`tokio Runtime`]: tokio_crate::runtime::Runtime
#[cfg(tokio)]
pub fn set_runtime(runtime: tokio_crate::runtime::Runtime) {
use std::any::Any;
Expand All @@ -329,6 +364,20 @@ pub fn set_runtime(runtime: tokio_crate::runtime::Runtime) {
}
}

/// This method will set the [`tokio Runtime`] in the global executor.
///
/// [`tokio Runtime`]: tokio1_crate::runtime::Runtime
#[cfg(tokio1)]
pub fn set_runtime(runtime: tokio1_crate::runtime::Runtime) {
use std::any::Any;

let executor = executor() as &dyn Any;
match executor.downcast_ref::<executor::Tokio1Executor>() {
Some(executor) => executor.set_runtime(runtime),
None => unreachable!(),
}
}

/// Returns a reference to the global executor.
#[cfg(not(local_spawn))]
pub fn executor() -> &'static impl AgnostikExecutor {
Expand Down
47 changes: 47 additions & 0 deletions tests/tokio1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
pub use agnostik::prelude::*;
pub use tokio1_crate as tokio;

#[cfg(feature = "runtime_tokio1")]
mod tokio_tests {
use super::*;
#[test]
fn test_tokio() {
agnostik::block_on(async {
agnostik::spawn(async {
let mut i = 0;
while i < 5 {
println!("Counting from Tokio: {}", i);
i += 1;
}
})
});
}

#[test]
fn test_basic_scheduler() {
let rt = tokio::runtime::Runtime::new().unwrap();
let rt = std::sync::Arc::new(rt);

for _ in 0..100 {
let rt = rt.clone();
std::thread::spawn(move || {
rt.block_on(
async move { tokio::time::sleep(std::time::Duration::from_secs(1)).await },
);
});
}
}
}

#[cfg(feature = "runtime_tokio1")]
#[test]
fn test_tokio_implicit() {
let res = agnostik::block_on(async {
agnostik::spawn(async {
println!("hello world");
1
})
.await
});
assert_eq!(res, 1);
}

0 comments on commit 16184b4

Please sign in to comment.