Skip to content

Commit

Permalink
Merge pull request #62 from Berrysoft/fix/netstatus-organize
Browse files Browse the repository at this point in the history
Organize netstatus code.
  • Loading branch information
Berrysoft authored Apr 27, 2023
2 parents 7b3112c + dd9c096 commit 72f0a00
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 86 deletions.
14 changes: 13 additions & 1 deletion netstatus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,25 @@ repository.workspace = true

[dependencies]
cfg-if = { workspace = true }
tokio-stream = { workspace = true }

[target.'cfg(target_os = "windows")'.dependencies]
windows = { workspace = true, features = ["Networking_Connectivity"] }
windows = { workspace = true, features = [
"Foundation",
"Networking_Connectivity",
] }
futures-util = { workspace = true }

[target.'cfg(target_os = "macos")'.dependencies]
anyhow = { workspace = true }
objc = { workspace = true }
system-configuration = { workspace = true }
core-foundation = { workspace = true }

[target.'cfg(any(target_os = "windows", target_os = "macos"))'.dependencies]
tokio = { workspace = true, features = ["sync"] }
tokio-stream = { workspace = true, features = ["sync"] }
pin-project = "1"

[target.'cfg(target_os = "linux")'.dependencies]
netlink_wi = "0.3"
5 changes: 5 additions & 0 deletions netstatus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::{Display, Formatter};
use tokio_stream::Stream;

cfg_if::cfg_if! {
if #[cfg(target_os = "windows")] {
Expand Down Expand Up @@ -28,6 +29,10 @@ impl NetStatus {
pub fn current() -> Self {
platform::current()
}

pub fn watch() -> impl Stream<Item = ()> {
platform::watch()
}
}

impl Display for NetStatus {
Expand Down
4 changes: 4 additions & 0 deletions netstatus/src/netlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ pub fn current() -> NetStatus {
NetStatus::Unknown
})
}

pub fn watch() -> impl Stream<Item = ()> {
tokio_stream::pending()
}
74 changes: 73 additions & 1 deletion netstatus/src/sc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
use crate::*;
use anyhow::{anyhow, Result};
use core_foundation::{
base::TCFType,
runloop::{kCFRunLoopDefaultMode, CFRunLoop, CFRunLoopRef},
};
use objc::{
runtime::{Class, Object},
*,
};
use std::ffi::CStr;
use pin_project::pin_project;
use std::{
ffi::CStr,
os::unix::thread::{JoinHandleExt, RawPthread},
pin::Pin,
task::{Context, Poll},
thread::JoinHandle,
};
use system_configuration::network_reachability::{ReachabilityFlags, SCNetworkReachability};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;

#[link(name = "CoreWLAN", kind = "framework")]
extern "C" {
Expand Down Expand Up @@ -50,3 +64,61 @@ pub fn current() -> NetStatus {
}
NetStatus::Unknown
}

pub fn watch() -> impl Stream<Item = ()> {
let (tx, rx) = watch::channel(());
let loop_thread = std::thread::spawn(move || -> Result<()> {
let host = unsafe { CStr::from_bytes_with_nul_unchecked(b"0.0.0.0\0") };
let mut sc = SCNetworkReachability::from_host(host)
.ok_or_else(|| anyhow!("Cannot get network reachability"))?;
sc.set_callback(move |_| {
tx.send(()).ok();
})?;
unsafe {
sc.schedule_with_runloop(&CFRunLoop::get_current(), kCFRunLoopDefaultMode)?;
}
CFRunLoop::run_current();
Ok(())
});
StatusWatchStream {
s: WatchStream::new(rx),
thread: CFJThread {
handle: Some(loop_thread),
},
}
}

#[pin_project]
struct StatusWatchStream {
#[pin]
s: WatchStream<()>,
thread: CFJThread,
}

impl Stream for StatusWatchStream {
type Item = ();

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().s.poll_next(cx)
}
}

struct CFJThread {
handle: Option<JoinHandle<Result<()>>>,
}

impl Drop for CFJThread {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
unsafe { CFRunLoop::wrap_under_get_rule(_CFRunLoopGet0(handle.as_pthread_t())) }.stop();
match handle.join() {
Ok(res) => res.unwrap(),
Err(e) => std::panic::resume_unwind(e),
}
}
}
}

extern "C" {
fn _CFRunLoopGet0(thread: RawPthread) -> CFRunLoopRef;
}
4 changes: 4 additions & 0 deletions netstatus/src/stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ use crate::*;
pub fn current() -> NetStatus {
NetStatus::Unknown
}

pub fn watch() -> impl Stream<Item = ()> {
tokio_stream::pending()
}
56 changes: 55 additions & 1 deletion netstatus/src/winrt.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use crate::*;
use windows::{core::*, Networking::Connectivity::*};
use futures_util::future::Either;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::watch;
use tokio_stream::{pending, wrappers::WatchStream};
use windows::{core::*, Foundation::EventRegistrationToken, Networking::Connectivity::*};

fn current_impl() -> Result<NetStatus> {
let profile = NetworkInformation::GetInternetConnectionProfile()?;
Expand All @@ -24,3 +32,49 @@ pub fn current() -> NetStatus {
NetStatus::Unknown
})
}

fn watch_impl() -> Result<impl Stream<Item = ()>> {
let (tx, rx) = watch::channel(());
let token = NetworkInformation::NetworkStatusChanged(&NetworkStatusChangedEventHandler::new(
move |_| {
tx.send(()).ok();
Ok(())
},
))?;
Ok(StatusWatchStream {
s: WatchStream::new(rx),
token: NetworkStatusChangedToken(token),
})
}

pub fn watch() -> impl Stream<Item = ()> {
watch_impl().map(Either::Left).unwrap_or_else(|e| {
if cfg!(debug_assertions) {
eprintln!("WARNING: {}", e.message());
}
Either::Right(pending())
})
}

#[pin_project]
struct StatusWatchStream {
#[pin]
s: WatchStream<()>,
token: NetworkStatusChangedToken,
}

impl Stream for StatusWatchStream {
type Item = ();

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().s.poll_next(cx)
}
}

struct NetworkStatusChangedToken(EventRegistrationToken);

impl Drop for NetworkStatusChangedToken {
fn drop(&mut self) {
NetworkInformation::RemoveNetworkStatusChanged(self.0).unwrap()
}
}
5 changes: 5 additions & 0 deletions tunet-cui/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl Event {
}

pub fn start(&self) {
self.spawn_watch_status();
self.spawn_timer();
self.spawn_online();
self.spawn_details();
Expand Down Expand Up @@ -79,6 +80,10 @@ impl Event {
});
}

fn spawn_watch_status(&self) {
self.model.queue(Action::WatchStatus);
}

fn spawn_timer(&self) {
self.model.queue(Action::Timer);
}
Expand Down
6 changes: 3 additions & 3 deletions tunet-gui/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async fn main() -> Result<()> {
if let Ok(cred) = settings_reader.read_with_password() {
model.queue(Action::Credential(Arc::new(cred)));
}
model.queue(Action::WatchStatus);
model.queue(Action::Timer);

home_model.set_status(model.status.to_string().into());
Expand Down Expand Up @@ -335,11 +336,10 @@ fn update(
UpdateMsg::Status => {
model.queue(Action::State(None));

let status = model.status.clone();
let status = model.status.to_string();
weak_app
.upgrade_in_event_loop(move |app| {
app.global::<HomeModel>()
.set_status(status.to_string().into());
app.global::<HomeModel>().set_status(status.into());
})
.unwrap();
}
Expand Down
32 changes: 25 additions & 7 deletions tunet-model/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![forbid(unsafe_code)]

use drop_guard::guard;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use itertools::Itertools;
use mac_address::*;
use netstatus::*;
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Model {
let http = self.http.clone();
let status = self.status.clone();
tokio::spawn(async move {
let state = suggest::suggest_with_status(&http, status).await;
let state = suggest::suggest_with_status(&http, &status).await;
tx.send(Action::State(Some(state))).await.ok()
});
}
Expand All @@ -133,6 +133,16 @@ impl Model {
}
};
}
Action::WatchStatus => {
self.spawn_watch_status();
}
Action::Status => {
let status = NetStatus::current();
if status != self.status {
self.status = status;
self.update(UpdateMsg::Status);
}
}
Action::Timer => {
self.spawn_timer();
}
Expand All @@ -154,11 +164,6 @@ impl Model {
self.spawn_logout();
}
Action::Flux => {
let status = NetStatus::current();
if status != self.status {
self.status = status;
self.update(UpdateMsg::Status);
}
self.spawn_flux();
}
Action::LoginDone(s) | Action::LogoutDone(s) => {
Expand Down Expand Up @@ -224,6 +229,17 @@ impl Model {
}
}

fn spawn_watch_status(&self) {
let tx = self.tx.clone();
tokio::spawn(async move {
let mut events = NetStatus::watch();
while let Some(()) = events.next().await {
tx.send(Action::Status).await?;
}
Ok::<_, anyhow::Error>(())
});
}

fn spawn_timer(&self) {
let tx = self.tx.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -363,6 +379,8 @@ pub enum Action {
Credential(Arc<NetCredential>),
UpdateCredential(String, String),
State(Option<NetState>),
WatchStatus,
Status,
Timer,
Tick,
Login,
Expand Down
7 changes: 2 additions & 5 deletions tunet-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true
repository.workspace = true

[dependencies]
netstatus = { workspace = true }
tunet-helper = { workspace = true }
tunet-suggest = { workspace = true }
tunet-settings = { workspace = true }
Expand All @@ -18,14 +19,13 @@ humantime = "2"
enum_dispatch = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "signal", "time"] }
tokio-stream = { workspace = true, features = ["time"] }
futures-util = { workspace = true }
notify-rust = "4"

[target.'cfg(target_os = "windows")'.dependencies]
tokio = { workspace = true, features = ["sync"] }
tokio-stream = { workspace = true, features = ["sync"] }
windows = { workspace = true, features = [
"Foundation",
"Networking_Connectivity",
"Win32_Foundation",
"Win32_System_Environment",
"Win32_System_RemoteDesktop",
Expand All @@ -37,9 +37,6 @@ windows-service = "0.6"
is_elevated = "0.1"

[target.'cfg(target_os = "macos")'.dependencies]
tokio = { workspace = true, features = ["sync"] }
system-configuration = { workspace = true }
core-foundation = { workspace = true }
dirs = { workspace = true }
serde = { workspace = true, features = ["derive"] }
plist = "1"
Expand Down
Loading

0 comments on commit 72f0a00

Please sign in to comment.