Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⬆️ zb: Swtich to new smol-rs crates #494

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions book/src/blocking.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ fn main() -> Result<(), Box<dyn Error>> {
name: "GreeterName".to_string(),
done: event_listener::Event::new(),
};
let done_listener = greeter.done.listen();
let mut done_listener = greeter.done.listen();
let _handle = connection::Builder::session()?
.name("org.zbus.MyGreeter")?
.serve_at("/org/zbus/MyGreeter", greeter)?
.build()?;

done_listener.wait();
done_listener.as_mut().wait();

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions book/src/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,14 @@ async fn main() -> Result<()> {
name: "GreeterName".to_string(),
done: event_listener::Event::new(),
};
let done_listener = greeter.done.listen();
let mut done_listener = greeter.done.listen();
let _connection = Builder::session()?
.name("org.zbus.MyGreeter")?
.serve_at("/org/zbus/MyGreeter", greeter)?
.build()
.await?;

done_listener.wait();
done_listener.as_mut().wait();

Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions zbus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,26 @@ zbus_macros = { path = "../zbus_macros", version = "=4.0.0" }
enumflags2 = { version = "0.7.7", features = ["serde"] }
derivative = "2.2"
once_cell = "1.4.0"
async-io = { version = "1.12.0", optional = true }
async-io = { version = "2.0.0", optional = true }
futures-core = "0.3.25"
futures-sink = "0.3.25"
futures-util = { version = "0.3.25", default-features = false, features = [
"sink",
"std",
] }
async-lock = { version = "2.6.0", optional = true }
async-broadcast = "0.5.0"
async-lock = { version = "3.0.0", optional = true }
async-broadcast = "0.6.0"
async-executor = { version = "1.5.0", optional = true }
blocking = { version = "1.0.2", optional = true }
async-task = { version = "4.3.0", optional = true }
hex = "0.4.3"
ordered-stream = "0.2"
rand = "0.8.5"
sha1 = { version = "0.10.5", features = ["std"] }
event-listener = "2.5.3"
event-listener = "3.0.0"
static_assertions = "1.1.0"
async-trait = "0.1.58"
async-fs = { version = "1.6.0", optional = true }
async-fs = { version = "2.0.0", optional = true }
# FIXME: We should only enable process feature for Mac OS. See comment on async-process below for why we can't.
tokio = { version = "1.21.2", optional = true, features = [
"rt",
Expand Down Expand Up @@ -108,7 +108,7 @@ nix = { version = "0.27", default-features = false, features = [
[target.'cfg(target_os = "macos")'.dependencies]
# FIXME: This should only be enabled if async-io feature is enabled but currently
# Cargo doesn't provide a way to do that for only specific target OS: https://github.com/rust-lang/cargo/issues/1197.
async-process = "1.7.0"
async-process = "2.0.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not and can not test this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me neither. :) I think some tests cover this so CI will tell us.


[target.'cfg(any(target_os = "macos", windows))'.dependencies]
async-recursion = "1.0.0"
Expand Down
3 changes: 2 additions & 1 deletion zbus/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ impl Address {
Address::Tcp(addr) => connect_tcp(addr).await.map(Stream::Tcp),

Address::NonceTcp { addr, nonce_file } => {
#[allow(unused_mut)]
let mut stream = connect_tcp(addr).await?;

#[cfg(unix)]
Expand All @@ -352,7 +353,7 @@ impl Address {

while !nonce.is_empty() {
let len = stream
.write_with_mut(|s| std::io::Write::write(s, nonce))
.write_with(|mut s| std::io::Write::write(&mut s, nonce))
.await?;
nonce = &nonce[len..];
}
Expand Down
16 changes: 12 additions & 4 deletions zbus/src/blocking/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ mod tests {
});

let c = Builder::unix_stream(p1).p2p().build().unwrap();
let listener = c.monitor_activity();

let mut listener = Box::pin(c.monitor_activity());
listener.as_mut().listen();

let mut s = MessageIterator::from(&c);
tx.send(()).unwrap();
let m = s.next().unwrap().unwrap();
Expand All @@ -326,11 +329,16 @@ mod tests {
assert_eq!(val, "yay");

// there was some activity
listener.wait();
listener.as_mut().wait();
// eventually, nothing happens and it will timeout
loop {
let listener = c.monitor_activity();
if !listener.wait_timeout(std::time::Duration::from_millis(10)) {
let mut listener = Box::pin(c.monitor_activity());
listener.as_mut().listen();
if listener
.as_mut()
.wait_timeout(std::time::Duration::from_millis(10))
.is_none()
{
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions zbus/src/blocking/object_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ where
/// let connection = Connection::session()?;
///
/// let quit_event = Event::new();
/// let quit_listener = quit_event.listen();
/// let mut quit_listener = quit_event.listen();
/// let interface = Example::new(quit_event);
/// connection
/// .object_server()
/// .at("/org/zbus/path", interface)?;
///
/// quit_listener.wait();
/// quit_listener.as_mut().wait();
/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl Connection {
///
/// This function is meant for the caller to implement idle or timeout on inactivity.
pub fn monitor_activity(&self) -> EventListener {
self.inner.activity_event.listen()
EventListener::new(&self.inner.activity_event)
}

/// Returns the peer credentials.
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/connection/socket_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl SocketReader {
}
}

if let Err(e) = sender.broadcast(msg.clone()).await {
if let Err(e) = sender.broadcast_direct(msg.clone()).await {
// An error would be due to either of these:
//
// 1. the channel is closed.
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/object_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ impl<R> ResponseDispatchNotifier<R> {
/// Create a new `NotifyResponse`.
pub fn new(response: R) -> (Self, EventListener) {
let event = Event::new();
let listener = event.listen();
let listener = EventListener::new(&event);
(
Self {
response,
Expand Down
2 changes: 1 addition & 1 deletion zbus/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ where
pub struct PropertyStream<'a, T> {
name: &'a str,
proxy: Proxy<'a>,
changed_listener: EventListener,
changed_listener: Pin<Box<EventListener>>,
phantom: std::marker::PhantomData<T>,
}

Expand Down
2 changes: 2 additions & 0 deletions zbus/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ impl MyIfaceImpl {
) -> zbus::fdo::Result<ResponseDispatchNotifier<String>> {
debug!("`TestResponseNotify` called.");
let (response, listener) = ResponseDispatchNotifier::new(String::from("Meaning of life"));
let mut listener = Box::pin(listener);
listener.as_mut().listen();
let ctxt = ctxt.to_owned();
conn.executor()
.spawn(
Expand Down
Loading