From 81fb5a4a02b9bf7f89c3e1d4f7dc3cc5b7b17c8a Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Wed, 24 Jul 2024 20:39:44 +1000 Subject: [PATCH 1/4] implement and expose a librice-io crate for C compatibility Only does io things using some low level async primitives. In the future, may also implement support for more advanced network handling like GRO/GSO/io_uring. --- Cargo.toml | 3 +- librice-io/Cargo.toml | 51 +++ librice-io/cbindgen.toml | 21 ++ librice-io/src/capi.rs | 545 +++++++++++++++++++++++++++++++++ librice-io/src/lib.rs | 10 + librice-proto/Cargo.toml | 16 +- librice-proto/cbindgen.toml | 2 +- librice-proto/src/candidate.rs | 42 +-- librice-proto/src/capi.rs | 393 ++++++++---------------- librice-proto/src/conncheck.rs | 43 ++- librice/Cargo.toml | 2 +- 11 files changed, 813 insertions(+), 315 deletions(-) create mode 100644 librice-io/Cargo.toml create mode 100644 librice-io/cbindgen.toml create mode 100644 librice-io/src/capi.rs create mode 100644 librice-io/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index b6304e2..891f124 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["librice-proto", "librice", "fuzz"] +members = ["librice-proto", "librice", "librice-io", "fuzz"] default-members = ["librice", "librice-proto"] resolver = "2" @@ -17,4 +17,5 @@ get_if_addrs = "0.5" rand = "0.8" tracing = "0.1" tracing-subscriber = "0.3" +librice-proto = { version = "0.0.5", path = "librice-proto" } diff --git a/librice-io/Cargo.toml b/librice-io/Cargo.toml new file mode 100644 index 0000000..d77af00 --- /dev/null +++ b/librice-io/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "librice-io" +description = "ICE (RFC8445) implementation protocol" +version.workspace = true +authors = ["Matthew Waters "] +license = "MIT OR Apache-2.0" +keywords = ["STUN", "ICE"] +categories = ["network-programming", ] +#documentation = "https://docs.rs/librice-proto" +edition.workspace = true +repository.workspace = true +rust-version.workspace = true +workspace = ".." + +[features] +capi = ["get_if_addrs", "libc", "librice-proto/capi", "tracing-subscriber", "async-lock", "async-io", "async-task", "flume", "futures-lite"] + +[dependencies] +arbitrary = { workspace = true, optional = true } +async-io = {version = "2", optional = true } +async-lock = {version = "3", optional = true } +async-task = {version = "4", optional = true } +byteorder.workspace = true +flume = { version = "0.11", optional = true } +futures-lite = {version = "2", optional = true } +get_if_addrs = { workspace = true, optional = true } +libc = { version = "0.2", optional = true } +librice-proto.workspace = true +stun-proto.workspace = true +tracing.workspace = true +tracing-subscriber = { workspace = true, optional = true } + +[dev-dependencies] +tracing-subscriber = { workspace = true, features = ["env-filter"] } + +[package.metadata.capi] +min_version = "0.9.21" + +[package.metadata.capi.header] +subdirectory = "rice" +name = "rice-io" + +[package.metadata.capi.library] +name = "rice-io" +version_suffix_components = 1 +rustflags = "-Cpanic=abort" + +[package.metadata.capi.pkg_config] +name = "librice-io" +filename = "rice-io" +requires = "rice-proto" diff --git a/librice-io/cbindgen.toml b/librice-io/cbindgen.toml new file mode 100644 index 0000000..a8a6e3b --- /dev/null +++ b/librice-io/cbindgen.toml @@ -0,0 +1,21 @@ +header = "// SPDX-License-Identifier: MIT OR Apache-2.0" +includes = ["rice-proto.h"] +include_guard = "LIBRICE_IO_H" +tab_width = 4 +language = "C" +cpp_compat = true +usize_is_size_t = true + +[export] +exclude = ["MAGIC_COOKIE", "BINDING", "RTP", "RTCP"] +item_types = ["enums", "structs", "opaque", "functions", "typedefs"] + +[export.rename] +"CandidateType" = "RiceCandidateType" +"ComponentConnectionState" = "RiceComponentConnectionState" + +[fn] +args = "vertical" + +[enum] +rename_variants = "QualifiedScreamingSnakeCase" diff --git a/librice-io/src/capi.rs b/librice-io/src/capi.rs new file mode 100644 index 0000000..03f9c55 --- /dev/null +++ b/librice-io/src/capi.rs @@ -0,0 +1,545 @@ +// Copyright (C) 2024 Matthew Waters +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// everything will be unsafe since this is a FFI +#![allow(clippy::missing_safety_doc)] +#![deny(improper_ctypes_definitions)] + +use core::ffi::c_void; + +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr, TcpStream, UdpSocket}; +use std::sync::{Arc, Mutex, Once, OnceLock}; +use std::{panic, thread}; + +use tracing::warn; + +use get_if_addrs::get_if_addrs; + +use async_io::Async; +use async_task::{Runnable, Task}; +use futures_lite::stream::StreamExt; + +use librice_proto::capi::{RiceAddress, RiceError, RiceTransportType}; + +static TRACING: Once = Once::new(); + +fn init_logs() { + TRACING.call_once(|| { + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::Layer; + + let level_filter = std::env::var("RICE_LOG") + .ok() + .and_then(|var| var.parse::().ok()) + .unwrap_or( + tracing_subscriber::filter::Targets::new().with_default(tracing::Level::TRACE), + ); + let registry = tracing_subscriber::registry().with( + tracing_subscriber::fmt::layer() + .with_file(true) + .with_line_number(true) + .with_level(true) + .with_target(false) + .with_test_writer() + .with_filter(level_filter), + ); + let _ = tracing::subscriber::set_global_default(registry); + }); +} + +static RUNNABLE_QUEUE: OnceLock> = OnceLock::new(); + +fn sender() -> &'static flume::Sender { + RUNNABLE_QUEUE.get_or_init(|| { + let (sender, receiver) = flume::unbounded::(); + thread::spawn(|| { + async_io::block_on(async move { + let mut stream = receiver.into_stream(); + while let Some(runnable) = stream.next().await { + if let Err(panic) = panic::catch_unwind(|| runnable.run()) { + warn!("task panic: {panic:?}"); + } + } + }); + }); + sender + }) +} + +fn schedule(runnable: Runnable) { + sender().send(runnable).unwrap(); +} + +/* +/// Connect from the specified interface to the specified address. Reply (success or failure) +/// should be notified using [`rice_agent_handle_tcp_connect`] with the same parameters. +#[derive(Debug)] +#[repr(C)] +pub struct RiceIoTcpConnect { + pub stream_id: usize, + pub component_id: usize, + pub from: *const RiceAddress, + pub to: *const RiceAddress, +} + +impl From for RiceIoTcpConnect { + fn from(value: librice_proto::agent::AgentTcpConnect) -> Self { + Self { + stream_id: value.stream_id, + component_id: value.component_id, + from: Box::into_raw(Box::new(RiceAddress(value.from))), + to: Box::into_raw(Box::new(RiceAddress(value.to))), + } + } +} + +impl From for librice_proto::agent::AgentTcpConnect { + fn from(value: RiceIoTcpConnect) -> Self { + unsafe { + Self { + stream_id: value.stream_id, + component_id: value.component_id, + from: RiceAddress::from_c(value.from).0, + to: RiceAddress::from_c(value.to).0, + } + } + } +} +*/ + +#[derive(Debug)] +pub struct RiceUdpSocket { + socket: Async, +} + +#[no_mangle] +pub unsafe extern "C" fn rice_udp_socket_new(local_addr: *const RiceAddress) -> *mut RiceUdpSocket { + init_logs(); + + let local_addr = Box::from_raw(mut_override(local_addr)); + + let ret = if let Ok(socket) = Async::::bind(**local_addr) { + mut_override(Arc::into_raw(Arc::new(RiceUdpSocket { socket }))) + } else { + core::ptr::null_mut::() + }; + + core::mem::forget(local_addr); + ret +} + +#[no_mangle] +pub unsafe extern "C" fn rice_udp_socket_ref(udp: *mut RiceUdpSocket) -> *mut RiceUdpSocket { + Arc::increment_strong_count(udp); + udp +} + +#[no_mangle] +pub unsafe extern "C" fn rice_udp_socket_unref(udp: *mut RiceUdpSocket) { + Arc::decrement_strong_count(udp); +} + +#[no_mangle] +pub unsafe extern "C" fn rice_udp_socket_local_addr(udp: *const RiceUdpSocket) -> *mut RiceAddress { + let udp = Arc::from_raw(udp); + let ret = match udp.socket.get_ref().local_addr() { + Ok(addr) => mut_override(RiceAddress::new(addr).to_c()), + Err(_) => core::ptr::null_mut(), + }; + core::mem::forget(udp); + ret +} + +#[derive(Debug, Copy, Clone)] +struct IoNotifyData { + io_notify: RiceIoNotify, + io_notify_data: SendPtr, + io_destroy: RiceIoNotify, +} + +#[derive(Debug)] +struct UdpSocketTask { + inner: Arc, + // dropping this stops polling for readable + #[allow(dead_code)] + poll_task: Task<()>, + // blocks the poll task until recv() is called for this socket + semaphore_guard: Arc>>, + io_notify_data: Option>>, +} + +#[derive(Debug)] +pub struct RiceSockets { + inner: Mutex, + notify_data: Arc>>, +} + +impl RiceSockets { + fn set_notify(&self, notify_data: Option) { + let removed_notify = { + let mut our_notify_data = self.notify_data.lock().unwrap(); + let mut notify_data = notify_data.clone(); + std::mem::swap(&mut *our_notify_data, &mut notify_data); + notify_data + }; + + { + let mut inner = self.inner.lock().unwrap(); + let notify_data = notify_data.map(|notify| Arc::new(Mutex::new(notify.clone()))); + for udp in inner.udp_sockets.values_mut() { + let mut notify_data = notify_data.clone(); + std::mem::swap(&mut udp.io_notify_data, &mut notify_data); + } + } + + if let Some(notify_data) = removed_notify { + if let Some(destroy) = notify_data.io_destroy { + destroy(notify_data.io_notify_data.ptr); + } + } + } +} + +impl Drop for RiceSockets { + fn drop(&mut self) { + self.set_notify(None); + } +} + +#[derive(Debug)] +struct RiceSocketsInner { + udp_sockets: HashMap, + tcp_sockets: HashMap<(SocketAddr, SocketAddr), TcpStream>, +} + +pub type RiceIoNotify = Option; +pub type RiceIoDestroy = Option; + +#[derive(Debug, Copy, Clone)] +struct SendPtr { + ptr: *mut c_void, +} + +unsafe impl Send for SendPtr {} + +impl SendPtr { + fn new(val: *mut c_void) -> Self { + Self { ptr: val } + } +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_new_with_notify( + io_notify: RiceIoNotify, + io_data: *mut c_void, + io_destroy: RiceIoDestroy, +) -> *mut RiceSockets { + init_logs(); + let notify_data = io_notify.map(|io_notify| IoNotifyData { + io_notify: Some(io_notify), + io_notify_data: SendPtr::new(io_data), + io_destroy, + }); + let ret = Arc::new(RiceSockets { + inner: Mutex::new(RiceSocketsInner { + udp_sockets: Default::default(), + tcp_sockets: Default::default(), + }), + notify_data: Arc::new(Mutex::new(notify_data)), + }); + + mut_override(Arc::into_raw(ret)) +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_new() -> *mut RiceSockets { + rice_sockets_new_with_notify(None, core::ptr::null_mut(), None) +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_ref(sockets: *mut RiceSockets) -> *mut RiceSockets { + Arc::increment_strong_count(sockets); + sockets +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_unref(sockets: *mut RiceSockets) { + Arc::decrement_strong_count(sockets) +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_set_notify( + sockets: *mut RiceSockets, + io_notify: RiceIoNotify, + io_data: *mut c_void, + io_destroy: RiceIoDestroy, +) { + let sockets = Arc::from_raw(sockets); + let notify_data = io_notify.map(|io_notify| IoNotifyData { + io_notify: Some(io_notify), + io_notify_data: SendPtr::new(io_data), + io_destroy, + }); + + sockets.set_notify(notify_data); + + core::mem::forget(sockets); +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_add_udp( + sockets: *mut RiceSockets, + udp: *mut RiceUdpSocket, +) -> bool { + let sockets = Arc::from_raw(sockets); + let udp = Arc::from_raw(udp); + let notify_data = sockets.notify_data.clone(); + let mut inner = sockets.inner.lock().unwrap(); + + let local_addr = udp.socket.get_ref().local_addr().unwrap(); + let entry = inner.udp_sockets.entry(local_addr); + let ret = match entry { + std::collections::hash_map::Entry::Occupied(_) => false, + std::collections::hash_map::Entry::Vacant(vacant) => { + let udp_clone = udp.clone(); + let semaphore = Arc::new(async_lock::Semaphore::new(1)); + let poll_guard = Arc::new(Mutex::new(None)); + let io_notify_data = notify_data + .lock() + .unwrap() + .map(|notify| Arc::new(Mutex::new(notify.clone()))); + let (runnable, task) = async_task::spawn( + { + let poll_guard = poll_guard.clone(); + let semaphore = semaphore.clone(); + let io_notify_data = io_notify_data.clone(); + async move { + loop { + // Some poll implementations will return readable whenever there is any + // data to read on the socket. However if `rice_sockets_recv()` occurs on + // a different thread, then the notification to the application may take a + // while for the read to be processed and thus cause the IO thread to busy + // loop. This Semaphore is designed to mitigate this by only allowing a + // single poll() and recv() combination for a particular socket to occur + // in lockstep. + let guard = semaphore.acquire_arc().await; + *poll_guard.lock().unwrap() = Some(guard); + if let Err(e) = futures_lite::future::poll_fn(|cx| { + udp_clone.socket.poll_readable(cx) + }) + .await + { + warn!("Failed to poll udp socket: {e}"); + break; + } + if let Some(ref notify_data) = io_notify_data { + let notify_data = notify_data.lock().unwrap(); + if let Some(notify) = notify_data.io_notify.as_ref() { + notify(notify_data.io_notify_data.ptr); + } + } + } + } + }, + schedule, + ); + runnable.run(); + vacant.insert(UdpSocketTask { + inner: udp, + poll_task: task, + semaphore_guard: poll_guard, + io_notify_data, + }); + true + } + }; + drop(inner); + + core::mem::forget(sockets); + ret +} + +fn address_is_ignorable(ip: IpAddr) -> bool { + // TODO: add is_benchmarking() and is_documentation() when they become stable + if ip.is_loopback() || ip.is_unspecified() || ip.is_multicast() { + return true; + } + match ip { + IpAddr::V4(ipv4) => ipv4.is_broadcast() || ipv4.is_link_local(), + IpAddr::V6(_ipv6) => false, + } +} + +#[no_mangle] +pub unsafe extern "C" fn rice_interfaces(ret_len: *mut usize) -> *mut *mut RiceAddress { + init_logs(); + + let Ok(mut ifaces) = get_if_addrs() else { + return mut_override(std::ptr::null()); + }; + // We only care about non-loopback interfaces for now + // TODO: remove 'Deprecated IPv4-compatible IPv6 addresses [RFC4291]' + // TODO: remove 'IPv6 site-local unicast addresses [RFC3879]' + // TODO: remove 'IPv4-mapped IPv6 addresses unless ipv6 only' + // TODO: location tracking Ipv6 address mismatches + ifaces.retain(|e| !address_is_ignorable(e.ip())); + + let ret = ifaces + .iter() + .map(|iface| RiceAddress::to_c(RiceAddress::new(SocketAddr::new(iface.ip(), 0)))) + .collect::>() + .into_boxed_slice(); + *ret_len = ret.len(); + Box::into_raw(ret) as *mut _ +} + +#[no_mangle] +pub unsafe extern "C" fn rice_addresses_free(addresses: *mut *mut RiceAddress, len: usize) { + let addresses = Box::from_raw(core::slice::from_raw_parts_mut(addresses, len)); + for i in 0..len { + let _addr = RiceAddress::from_c(addresses[i]); + } +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_send( + sockets: *mut RiceSockets, + transport: RiceTransportType, + from: *const RiceAddress, + to: *const RiceAddress, + data: *mut u8, + len: usize, +) -> RiceError { + let sockets = Arc::from_raw(sockets); + let from = RiceAddress::from_c(mut_override(from)); + let to = RiceAddress::from_c(mut_override(to)); + let data = core::slice::from_raw_parts_mut(data, len); + let inner = sockets.inner.lock().unwrap(); + let ret = match transport { + RiceTransportType::Udp => { + if let Some(udp) = inner.udp_sockets.get(&**from) { + if udp.inner.socket.get_ref().send_to(data, **to).is_err() { + RiceError::Failed + } else { + RiceError::Success + } + } else { + RiceError::NotFound + } + } + RiceTransportType::Tcp => RiceError::NotFound, // FIXME + }; + + drop(inner); + core::mem::forget(sockets); + core::mem::forget(from); + core::mem::forget(to); + ret +} + +#[repr(C)] +pub struct RiceIoData { + transport: RiceTransportType, + from: *mut RiceAddress, + to: *mut RiceAddress, + len: usize, +} + +#[repr(C)] +pub struct RiceIoClosed { + transport: RiceTransportType, + from: *mut RiceAddress, + to: *mut RiceAddress, +} + +#[repr(C)] +pub enum RiceIoRecv { + WouldBlock, + Data(RiceIoData), + Closed(RiceIoClosed), +} + +#[no_mangle] +pub unsafe extern "C" fn rice_recv_clear(recv: *mut RiceIoRecv) { + if recv.is_null() { + return; + } + match &*recv { + RiceIoRecv::Data(data) => { + let _from = RiceAddress::from_c(data.from); + let _to = RiceAddress::from_c(data.to); + } + RiceIoRecv::Closed(closed) => { + let _from = RiceAddress::from_c(closed.from); + let _to = RiceAddress::from_c(closed.to); + } + RiceIoRecv::WouldBlock => (), + } + *recv = RiceIoRecv::WouldBlock +} + +#[no_mangle] +pub unsafe extern "C" fn rice_sockets_recv( + sockets: *mut RiceSockets, + data: *mut u8, + len: usize, + ret: *mut RiceIoRecv, +) { + let sockets = Arc::from_raw(sockets); + *ret = RiceIoRecv::WouldBlock; + let mut inner = sockets.inner.lock().unwrap(); + let mut data = core::slice::from_raw_parts_mut(data, len); + for (&local_addr, udp) in inner.udp_sockets.iter_mut() { + match udp.inner.socket.get_ref().recv_from(&mut data) { + Ok((len, from)) => { + udp.semaphore_guard.lock().unwrap().take(); + *ret = RiceIoRecv::Data(RiceIoData { + transport: RiceTransportType::Udp, + from: mut_override(RiceAddress::new(from).to_c()), + to: mut_override(RiceAddress::new(local_addr).to_c()), + len, + }); + break; + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + warn!("Failed to receive data for UDP socket {local_addr:?}: {e}"); + } + } + } + + drop(inner); + core::mem::forget(sockets); +} + +fn mut_override(val: *const T) -> *mut T { + val as *mut T +} + +fn const_override(val: *mut T) -> *const T { + val as *const T +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rice_address() { + unsafe { + let s = CString::new("127.0.0.1:2000").unwrap(); + let addr = rice_address_new_from_string(s.as_ptr()); + let addr2 = rice_address_copy(addr); + rice_address_free(addr); + rice_address_free(addr2); + } + } +} diff --git a/librice-io/src/lib.rs b/librice-io/src/lib.rs new file mode 100644 index 0000000..8b45ac5 --- /dev/null +++ b/librice-io/src/lib.rs @@ -0,0 +1,10 @@ +// Copyright (C) 2024 Matthew Waters +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#[cfg(feature = "capi")] +mod capi; diff --git a/librice-proto/Cargo.toml b/librice-proto/Cargo.toml index bfb81cd..b2727da 100644 --- a/librice-proto/Cargo.toml +++ b/librice-proto/Cargo.toml @@ -13,18 +13,16 @@ rust-version.workspace = true workspace = ".." [features] -capi = ["libc", "tracing-subscriber", "get_if_addrs"] +capi = ["libc", "tracing-subscriber"] [dependencies] arbitrary = { workspace = true, optional = true } byteorder.workspace = true -get_if_addrs = { workspace = true, optional = true } +libc = { version = "0.2", optional = true } nom = "7" rand.workspace = true -tracing.workspace = true -libc = { version = "0.2", optional = true } -socket2 = { version = "0.5", optional = true } stun-proto.workspace = true +tracing.workspace = true tracing-subscriber = { workspace = true, optional = true } [dev-dependencies] @@ -33,7 +31,15 @@ tracing-subscriber.workspace = true [package.metadata.capi] min_version = "0.9.21" +[package.metadata.capi.header] +name = "rice-proto" +subdirectory = "rice" + [package.metadata.capi.library] name = "rice-proto" version_suffix_components = 1 rustflags = "-Cpanic=abort" + +[package.metadata.capi.pkg_config] +name = "librice-proto" +filename = "rice-proto" diff --git a/librice-proto/cbindgen.toml b/librice-proto/cbindgen.toml index d9c3bee..099ada8 100644 --- a/librice-proto/cbindgen.toml +++ b/librice-proto/cbindgen.toml @@ -1,5 +1,5 @@ header = "// SPDX-License-Identifier: MIT OR Apache-2.0" -sys_includes = ["sys/socket.h"] +includes = [] include_guard = "LIBRICE_PROTO_H" tab_width = 4 language = "C" diff --git a/librice-proto/src/candidate.rs b/librice-proto/src/candidate.rs index 4eb2ec7..7e5af5f 100644 --- a/librice-proto/src/candidate.rs +++ b/librice-proto/src/candidate.rs @@ -337,34 +337,6 @@ impl Candidate { ret } - pub(crate) fn pair_tcp_type(local: TcpType) -> TcpType { - match local { - TcpType::Active => TcpType::Passive, - TcpType::Passive => TcpType::Active, - TcpType::So => TcpType::So, - } - } - - // can this candidate pair with 'remote' in any way - pub(crate) fn can_pair_with(&self, remote: &Candidate) -> bool { - let address = match self.candidate_type { - CandidateType::Host => self.address, - _ => self.base_address, - }; - if self.transport_type == TransportType::Tcp - && remote.transport_type == TransportType::Tcp - && (self.tcp_type.is_none() - || remote.tcp_type.is_none() - || Candidate::pair_tcp_type(self.tcp_type.unwrap()) != remote.tcp_type.unwrap()) - { - return false; - } - self.transport_type == remote.transport_type - && self.component_id == remote.component_id - && address.is_ipv4() == remote.address.is_ipv4() - && address.is_ipv6() == remote.address.is_ipv6() - } - fn priority_type_preference(ctype: CandidateType) -> u32 { match ctype { CandidateType::Host => 126, @@ -664,11 +636,11 @@ impl CandidatePair { Self { local, remote } } - pub(crate) fn foundation(&self) -> String { + pub fn foundation(&self) -> String { self.local.foundation.to_string() + ":" + &self.remote.foundation } - pub(crate) fn priority(&self, are_controlling: bool) -> u64 { + pub fn priority(&self, are_controlling: bool) -> u64 { let (controlling_priority, controlled_priority) = if are_controlling { (self.local.priority as u64, self.remote.priority as u64) } else { @@ -684,15 +656,6 @@ impl CandidatePair { + extra } - pub(crate) fn construct_valid(&self, mapped_address: SocketAddr) -> Self { - let mut local = self.local.clone(); - local.address = mapped_address; - Self { - local, - remote: self.remote.clone(), - } - } - /// Whether the pair is redundant when combined with the provided list. Returns the existing /// candidate that this pair is redundant with. If redundant, this pair should not be used for /// ICE connectivity checks. @@ -710,6 +673,7 @@ impl CandidatePair { #[cfg(test)] mod tests { use super::*; + use tracing::debug; #[test] fn candidate_pair_redundant_with_itself() { diff --git a/librice-proto/src/capi.rs b/librice-proto/src/capi.rs index b11cc54..d43e3d0 100644 --- a/librice-proto/src/capi.rs +++ b/librice-proto/src/capi.rs @@ -13,23 +13,19 @@ use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int}; -use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::str::FromStr; use std::sync::{Arc, Mutex, Once, Weak}; use std::time::{Duration, Instant}; -use get_if_addrs::get_if_addrs; - use crate::agent::Agent; pub use crate::agent::AgentPoll; -use crate::candidate::{Candidate, CandidateType}; +use crate::candidate::{Candidate, CandidateType, TransportType}; pub use crate::component::ComponentConnectionState; use crate::gathering::GatherPoll; use crate::stream::Credentials; use stun_proto::agent::{StunAgent, StunError, Transmit}; use stun_proto::types::data::{Data, DataOwned, DataSlice}; -use stun_proto::types::TransportType; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; @@ -53,7 +49,7 @@ fn init_logs() { .with_test_writer() .with_filter(level_filter), ); - tracing::subscriber::set_global_default(registry).unwrap(); + let _ = tracing::subscriber::set_global_default(registry); }); } @@ -168,7 +164,7 @@ pub enum RiceAgentPoll { /// Wait until the specified `Instant` has been reached (or an external event) WaitUntilMicros(u64), /// Transmit data using the specified 5-tuple - Transmit(RiceAgentTransmit), + Transmit(RiceTransmit), /// Connect from the specified interface to the specified address. Reply (success or failure) /// should be notified using [`StreamMut::handle_tcp_connect`] with the same parameters. TcpConnect(RiceAgentTcpConnect), @@ -222,7 +218,7 @@ impl RiceDataImpl { core::slice::from_raw_parts_mut(self.ptr, self.size) } - fn borrowed_to_c<'a>(val: &'a [u8]) -> Self { + fn borrowed_to_c(val: &[u8]) -> Self { Self { ptr: mut_override(val.as_ptr()), size: val.len(), @@ -250,10 +246,30 @@ impl<'a> From for Data<'a> { } } +#[no_mangle] +pub unsafe extern "C" fn rice_data_len(data: *const RiceData) -> usize { + let len = match &*data { + RiceData::Borrowed(imp) => imp.size, + RiceData::Owned(imp) => imp.size, + }; + + len +} + +#[no_mangle] +pub unsafe extern "C" fn rice_data_ptr(data: *const RiceData) -> *mut u8 { + let ptr = match &*data { + RiceData::Borrowed(imp) => imp.ptr, + RiceData::Owned(imp) => imp.ptr, + }; + + ptr +} + /// Transmit the data using the specified 5-tuple. #[derive(Debug)] #[repr(C)] -pub struct RiceAgentTransmit { +pub struct RiceTransmit { stream_id: usize, component_id: usize, transport: RiceTransportType, @@ -262,10 +278,10 @@ pub struct RiceAgentTransmit { data: RiceData, } -impl<'a> From> for RiceAgentTransmit { +impl<'a> From> for RiceTransmit { fn from(value: crate::agent::AgentTransmit<'a>) -> Self { - let from = Box::new(RiceAddress(value.transmit.from)); - let to = Box::new(RiceAddress(value.transmit.to)); + let from = Box::new(RiceAddress::new(value.transmit.from)); + let to = Box::new(RiceAddress::new(value.transmit.to)); Self { stream_id: value.stream_id, component_id: value.component_id, @@ -277,21 +293,7 @@ impl<'a> From> for RiceAgentTransmit { } } -impl RiceAgentTransmit { - fn from_rust_gather(stream_id: usize, component_id: usize, transmit: Transmit) -> Self { - let from = Box::new(RiceAddress(transmit.from)); - let to = Box::new(RiceAddress(transmit.to)); - let ret = Self { - stream_id, - component_id, - transport: transmit.transport.into(), - from: Box::into_raw(from), - to: Box::into_raw(to), - data: transmit.data.into(), - }; - ret - } - +impl RiceTransmit { unsafe fn clear_c(self) { let _from = RiceAddress::from_c(self.from); let _to = RiceAddress::from_c(self.to); @@ -299,6 +301,24 @@ impl RiceAgentTransmit { } } +fn transmit_from_rust_gather( + stream_id: usize, + component_id: usize, + transmit: Transmit, +) -> RiceTransmit { + let from = Box::new(RiceAddress::new(transmit.from)); + let to = Box::new(RiceAddress::new(transmit.to)); + let ret = RiceTransmit { + stream_id, + component_id, + transport: transmit.transport.into(), + from: Box::into_raw(from), + to: Box::into_raw(to), + data: transmit.data.into(), + }; + ret +} + /// Connect from the specified interface to the specified address. Reply (success or failure) /// should be notified using [`rice_agent_handle_tcp_connect`] with the same parameters. #[derive(Debug)] @@ -315,8 +335,8 @@ impl From for RiceAgentTcpConnect { Self { stream_id: value.stream_id, component_id: value.component_id, - from: Box::into_raw(Box::new(RiceAddress(value.from))), - to: Box::into_raw(Box::new(RiceAddress(value.to))), + from: Box::into_raw(Box::new(RiceAddress::new(value.from))), + to: Box::into_raw(Box::new(RiceAddress::new(value.to))), } } } @@ -327,8 +347,8 @@ impl From for crate::agent::AgentTcpConnect { Self { stream_id: value.stream_id, component_id: value.component_id, - from: RiceAddress::from_c(value.from).0, - to: RiceAddress::from_c(value.to).0, + from: **RiceAddress::from_c(value.from), + to: **RiceAddress::from_c(value.to), } } } @@ -351,8 +371,8 @@ impl From for RiceAgentSelectedPair { stream_id: value.stream_id, component_id: value.component_id, transport: value.selected.candidate_pair().local.transport_type.into(), - from: RiceAddress(value.selected.candidate_pair().local.base_address).to_c(), - to: RiceAddress(value.selected.candidate_pair().remote.address).to_c(), + from: RiceAddress::new(value.selected.candidate_pair().local.base_address).to_c(), + to: RiceAddress::new(value.selected.candidate_pair().remote.address).to_c(), } } } @@ -444,7 +464,7 @@ pub unsafe extern "C" fn rice_agent_add_stun_server( let agent = Arc::from_raw(agent); let addr = Box::from_raw(mut_override(addr)); let mut inner = agent.inner.lock().unwrap(); - inner.stun_servers.push((transport.into(), (*addr).0)); + inner.stun_servers.push((transport.into(), **addr)); drop(inner); core::mem::forget(addr); core::mem::forget(agent); @@ -545,8 +565,8 @@ pub unsafe extern "C" fn rice_stream_now(stream: *mut RiceStream) -> u64 { #[derive(Debug)] pub struct RiceCredentials { - ufrag: *mut c_char, - passwd: *mut c_char, + pub ufrag: *mut c_char, + pub passwd: *mut c_char, } #[no_mangle] @@ -564,7 +584,7 @@ pub unsafe extern "C" fn rice_credentials_free(credentials: *mut RiceCredentials let _passwd = CString::from_raw(creds.passwd); } -fn credentials_to_c(credentials: Credentials) -> *mut RiceCredentials { +pub fn credentials_to_c(credentials: Credentials) -> *mut RiceCredentials { let creds = Box::new(RiceCredentials { ufrag: CString::new(credentials.ufrag).unwrap().into_raw(), passwd: CString::new(credentials.passwd).unwrap().into_raw(), @@ -647,6 +667,7 @@ pub unsafe extern "C" fn rice_stream_set_remote_credentials( } #[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[repr(u32)] pub enum RiceTcpType { None, Active, @@ -676,10 +697,46 @@ impl From for Option { } } +/// The type of the candidate +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u32)] +pub enum RiceCandidateType { + /// The candidate is a local network interface + Host, + /// The candidate was discovered from incoming data + PeerReflexive, + /// The candidate was discovered by asking an external server (STUN/TURN) + ServerReflexive, + /// The candidate will relay all data through an external server (TURN). + Relayed, +} + +impl From for RiceCandidateType { + fn from(value: CandidateType) -> Self { + match value { + CandidateType::Host => RiceCandidateType::Host, + CandidateType::PeerReflexive => RiceCandidateType::PeerReflexive, + CandidateType::ServerReflexive => RiceCandidateType::ServerReflexive, + CandidateType::Relayed => RiceCandidateType::Relayed, + } + } +} +impl From for CandidateType { + fn from(value: RiceCandidateType) -> Self { + match value { + RiceCandidateType::Host => CandidateType::Host, + RiceCandidateType::PeerReflexive => CandidateType::PeerReflexive, + RiceCandidateType::ServerReflexive => CandidateType::ServerReflexive, + RiceCandidateType::Relayed => CandidateType::Relayed, + } + } +} + #[derive(Debug)] +#[repr(C)] pub struct RiceCandidate { component_id: usize, - candidate_type: CandidateType, + candidate_type: RiceCandidateType, transport_type: RiceTransportType, foundation: *const c_char, priority: u32, @@ -687,7 +744,7 @@ pub struct RiceCandidate { base_address: *const RiceAddress, related_address: *const RiceAddress, tcp_type: RiceTcpType, - extensions: *const *const c_char, + extensions: *mut *mut c_char, extensions_len: usize, } @@ -711,7 +768,7 @@ impl From for RiceCandidate { related_address, tcp_type: value.tcp_type.into(), // FIXME - extensions: std::ptr::null(), + extensions: std::ptr::null_mut(), extensions_len: 0, } } @@ -839,7 +896,7 @@ pub unsafe extern "C" fn rice_stream_end_of_remote_candidates(stream: *mut RiceS #[repr(C)] pub enum RiceGatherPoll { NeedAgent(RiceGatherPollNeedAgent), - SendData(RiceAgentTransmit), + SendData(RiceTransmit), WaitUntilMicros(u64), NewCandidate(*mut RiceCandidate), Complete, @@ -879,8 +936,8 @@ impl RiceGatherPollNeedAgent { Self { component_id, transport: transport.into(), - from: Box::into_raw(Box::new(RiceAddress(from))), - to: Box::into_raw(Box::new(RiceAddress(to))), + from: Box::into_raw(Box::new(RiceAddress::new(from))), + to: Box::into_raw(Box::new(RiceAddress::new(to))), } } @@ -900,9 +957,9 @@ impl RiceGatherPoll { GatherPoll::NeedAgent(component_id, transport, from, to) => Self::NeedAgent( RiceGatherPollNeedAgent::from_rust(component_id, transport, from, to), ), - GatherPoll::SendData(component_id, transmit) => Self::SendData( - RiceAgentTransmit::from_rust_gather(stream_id, component_id, transmit), - ), + GatherPoll::SendData(component_id, transmit) => { + Self::SendData(transmit_from_rust_gather(stream_id, component_id, transmit)) + } GatherPoll::NewCandidate(cand) => { Self::NewCandidate(Box::into_raw(Box::new(cand.into()))) } @@ -952,7 +1009,7 @@ pub unsafe extern "C" fn rice_stream_handle_gather_tcp_connect( Ok(Box::from_raw(stun_agent).0) }; - proto_stream.handle_gather_tcp_connect(component_id, from.0, to.0, stun_agent); + proto_stream.handle_gather_tcp_connect(component_id, **from, **to, stun_agent); drop(proto_agent); core::mem::forget(from); @@ -1005,8 +1062,8 @@ pub unsafe extern "C" fn rice_stream_handle_incoming_data( let transmit = Transmit { transport: transport.into(), - from: from.0, - to: to.0, + from: **from, + to: **to, data: Data::Borrowed(DataSlice::from(std::slice::from_raw_parts(data, data_len))), }; core::mem::forget(from); @@ -1019,7 +1076,7 @@ pub unsafe extern "C" fn rice_stream_handle_incoming_data( let data_len = ret.data.len(); let data_data_lens = ret.data.iter().map(|d| d.len()).collect::>(); let data_data_lens = Box::into_raw(data_data_lens.into_boxed_slice()) as *const _; - let mut data = ret + let data = ret .data .into_iter() .map(|d| Box::into_raw(d.into_boxed_slice()) as *const _) @@ -1135,7 +1192,7 @@ pub unsafe extern "C" fn rice_component_gather_candidates( .zip(sockets_addr.iter()) .map(|(&transport, addr)| { let addr = RiceAddress::from_c(*addr); - let socket_addr = addr.0; + let socket_addr = **addr; core::mem::forget(addr); (transport.into(), socket_addr) }) @@ -1159,15 +1216,19 @@ pub unsafe extern "C" fn rice_component_gather_candidates( pub struct RiceAddress(SocketAddr); impl RiceAddress { - fn to_c(self) -> *const RiceAddress { + pub fn new(addr: SocketAddr) -> Self { + Self(addr) + } + + pub fn to_c(self) -> *const RiceAddress { const_override(Box::into_raw(Box::new(self))) } - unsafe fn from_c(value: *const RiceAddress) -> Box { + pub unsafe fn from_c(value: *const RiceAddress) -> Box { Box::from_raw(mut_override(value)) } - unsafe fn from_c_none(value: *const RiceAddress) -> Self { + pub unsafe fn from_c_none(value: *const RiceAddress) -> Self { let boxed = Box::from_raw(mut_override(value)); let ret = *boxed; core::mem::forget(boxed); @@ -1175,6 +1236,13 @@ impl RiceAddress { } } +impl std::ops::Deref for RiceAddress { + type Target = SocketAddr; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + #[no_mangle] pub unsafe extern "C" fn rice_address_new_from_string(string: *const c_char) -> *mut RiceAddress { let Ok(string) = CStr::from_ptr(string).to_str() else { @@ -1297,12 +1365,12 @@ pub unsafe extern "C" fn rice_stun_agent_new( remote_addr: *const RiceAddress, ) -> *mut RiceStunAgent { let local_addr = RiceAddress::from_c(local_addr); - let mut builder = StunAgent::builder(transport.into(), local_addr.0); + let mut builder = StunAgent::builder(transport.into(), **local_addr); core::mem::forget(local_addr); if !remote_addr.is_null() { let remote_addr = RiceAddress::from_c(remote_addr); - builder = builder.remote_addr(remote_addr.0); + builder = builder.remote_addr(**remote_addr); core::mem::forget(remote_addr); } @@ -1311,211 +1379,6 @@ pub unsafe extern "C" fn rice_stun_agent_new( Box::into_raw(ret) } -#[derive(Debug)] -pub struct RiceUdpSocket { - socket: Arc, - in_recv: bool, -} - -#[no_mangle] -pub unsafe extern "C" fn rice_udp_socket_new(local_addr: *const RiceAddress) -> *mut RiceUdpSocket { - let local_addr = Box::from_raw(mut_override(local_addr)); - - let ret = if let Ok(socket) = UdpSocket::bind((*local_addr).0) { - Box::into_raw(Box::new(RiceUdpSocket { - socket: Arc::new(socket), - in_recv: false, - })) - } else { - mut_override(core::ptr::null::()) - }; - - core::mem::forget(local_addr); - ret -} - -#[no_mangle] -pub unsafe extern "C" fn rice_udp_socket_free(udp: *mut RiceUdpSocket) { - let _udp = Box::from_raw(udp); -} - -#[derive(Debug, Default)] -pub struct RiceSockets { - inner: Mutex, -} - -#[derive(Debug, Default)] -struct RiceSocketsInner { - udp_sockets: HashMap, - tcp_sockets: HashMap<(SocketAddr, SocketAddr), TcpStream>, -} - -#[no_mangle] -pub unsafe extern "C" fn rice_sockets_new() -> *mut RiceSockets { - mut_override(Arc::into_raw(Arc::new(RiceSockets::default()))) -} - -#[no_mangle] -pub unsafe extern "C" fn rice_sockets_ref(sockets: *mut RiceSockets) -> *mut RiceSockets { - Arc::increment_strong_count(sockets); - sockets -} - -#[no_mangle] -pub unsafe extern "C" fn rice_sockets_unref(sockets: *mut RiceSockets) { - Arc::decrement_strong_count(sockets) -} - -#[no_mangle] -pub unsafe extern "C" fn rice_sockets_add_udp( - sockets: *mut RiceSockets, - udp: *mut RiceUdpSocket, -) -> bool { - let sockets = Arc::from_raw(sockets); - let udp = Box::from_raw(udp); - let mut inner = sockets.inner.lock().unwrap(); - - let local_addr = udp.socket.local_addr().unwrap(); - let entry = inner.udp_sockets.entry(local_addr); - let ret = match entry { - std::collections::hash_map::Entry::Occupied(_) => false, - std::collections::hash_map::Entry::Vacant(vacant) => { - vacant.insert(*udp); - true - } - }; - drop(inner); - - core::mem::forget(sockets); - ret -} - -fn address_is_ignorable(ip: IpAddr) -> bool { - // TODO: add is_benchmarking() and is_documentation() when they become stable - if ip.is_loopback() || ip.is_unspecified() || ip.is_multicast() { - return true; - } - match ip { - IpAddr::V4(ipv4) => ipv4.is_broadcast() || ipv4.is_link_local(), - IpAddr::V6(_ipv6) => false, - } -} - -#[no_mangle] -pub unsafe extern "C" fn rice_interfaces(ret_len: *mut usize) -> *mut *mut RiceAddress { - let Ok(mut ifaces) = get_if_addrs() else { - return mut_override(std::ptr::null()); - }; - // We only care about non-loopback interfaces for now - // TODO: remove 'Deprecated IPv4-compatible IPv6 addresses [RFC4291]' - // TODO: remove 'IPv6 site-local unicast addresses [RFC3879]' - // TODO: remove 'IPv4-mapped IPv6 addresses unless ipv6 only' - // TODO: location tracking Ipv6 address mismatches - ifaces.retain(|e| !address_is_ignorable(e.ip())); - - let ret = ifaces - .iter() - .map(|iface| RiceAddress::to_c(RiceAddress(SocketAddr::new(iface.ip(), 0)))) - .collect::>() - .into_boxed_slice(); - *ret_len = ret.len(); - Box::into_raw(ret) as *mut _ -} - -#[no_mangle] -pub unsafe extern "C" fn rice_addresses_free(addresses: *mut *mut RiceAddress, len: usize) { - let addresses = Box::from_raw(core::slice::from_raw_parts_mut(addresses, len)); - for i in 0..len { - let _addr = RiceAddress::from_c(addresses[i]); - } -} - -#[no_mangle] -pub unsafe extern "C" fn rice_sockets_send( - sockets: *mut RiceSockets, - transport: RiceTransportType, - from: *const RiceAddress, - to: *const RiceAddress, - data: *mut u8, - len: usize, -) -> RiceError { - let sockets = Arc::from_raw(sockets); - let from = RiceAddress::from_c(mut_override(from)); - let to = RiceAddress::from_c(mut_override(to)); - let data = core::slice::from_raw_parts_mut(data, len); - let inner = sockets.inner.lock().unwrap(); - let ret = match transport { - RiceTransportType::Udp => { - if let Some(socket) = inner.udp_sockets.get(&from.0) { - if socket.socket.send_to(data, to.0).is_err() { - RiceError::Failed - } else { - RiceError::Success - } - } else { - RiceError::NotFound - } - } - RiceTransportType::Tcp => RiceError::NotFound, // FIXME - }; - - drop(inner); - core::mem::forget(sockets); - core::mem::forget(from); - core::mem::forget(to); - ret -} - -#[no_mangle] -pub unsafe extern "C" fn rice_sockets_recv( - sockets: *mut RiceSockets, - transport: RiceTransportType, - from: *const RiceAddress, - to: *const RiceAddress, - data: *mut u8, - len: usize, -) -> usize { - let sockets = Arc::from_raw(sockets); - let from = RiceAddress::from_c(mut_override(from)); - let to = RiceAddress::from_c(mut_override(to)); - let mut inner = sockets.inner.lock().unwrap(); - let ret = match transport { - RiceTransportType::Udp => { - if let Some(socket) = inner.udp_sockets.get_mut(&from.0) { - if socket.in_recv { - 0 - } else { - socket.in_recv = true; - let socket = socket.socket.clone(); - drop(inner); - let data = core::slice::from_raw_parts_mut(data, len); - let ret = socket.recv(data).unwrap_or(0); - inner = sockets.inner.lock().unwrap(); - if let Some(socket) = inner.udp_sockets.get_mut(&from.0) { - if socket.in_recv { - socket.in_recv = false; - ret - } else { - 0 - } - } else { - 0 - } - } - } else { - 0 - } - } - RiceTransportType::Tcp => 0, // FIXME - }; - - drop(inner); - core::mem::forget(sockets); - core::mem::forget(from); - core::mem::forget(to); - ret -} - fn mut_override(val: *const T) -> *mut T { val as *mut T } @@ -1607,9 +1470,9 @@ mod tests { fn rice_agent_gather() { unsafe { let addr: SocketAddr = "192.168.0.1:1000".parse().unwrap(); - let addr = RiceAddress(addr).to_c(); + let addr = RiceAddress::new(addr).to_c(); let stun_addr: SocketAddr = "102.168.0.200:2000".parse().unwrap(); - let stun_addr = RiceAddress(stun_addr).to_c(); + let stun_addr = RiceAddress::new(stun_addr).to_c(); let agent = rice_agent_new(true, false); let stream = rice_agent_add_stream(agent); let component = rice_stream_add_component(stream); @@ -1651,7 +1514,7 @@ mod tests { }; let tcp_from_addr = "192.168.200.4:3000".parse().unwrap(); - let tcp_from_addr = RiceAddress(tcp_from_addr).to_c(); + let tcp_from_addr = RiceAddress::new(tcp_from_addr).to_c(); let stun_agent = rice_stun_agent_new(TransportType::Tcp.into(), tcp_from_addr, need_agent.to); let _tcp_from_addr = RiceAddress::from_c(tcp_from_addr); diff --git a/librice-proto/src/conncheck.rs b/librice-proto/src/conncheck.rs index a731666..76d2647 100644 --- a/librice-proto/src/conncheck.rs +++ b/librice-proto/src/conncheck.rs @@ -1052,7 +1052,7 @@ impl ConnCheckList { for local in self.local_candidates.iter() { for remote in self.remote_candidates.iter() { - if local.candidate.can_pair_with(remote) { + if candidate_can_pair_with(&local.candidate, remote) { let pair = CandidatePair::new(local.candidate.clone(), remote.clone()); let component_id = self .component_ids @@ -2004,7 +2004,7 @@ impl ConnCheckListSet { ) .priority(priority); if local.transport_type == TransportType::Tcp { - builder = builder.tcp_type(Candidate::pair_tcp_type(local.tcp_type.unwrap())) + builder = builder.tcp_type(pair_tcp_type(local.tcp_type.unwrap())) } let cand = builder.build(); debug!("new reflexive remote {:?}", cand); @@ -2138,7 +2138,7 @@ impl ConnCheckListSet { ); conncheck.set_state(CandidatePairState::Succeeded); let pair = conncheck.pair.clone(); - let ok_pair = pair.construct_valid(addr); + let ok_pair = pair_construct_valid(&pair, addr); let mut ok_check = ConnCheck::clone_with_pair_nominate(conncheck, checklist_id, ok_pair.clone(), false); @@ -2739,6 +2739,43 @@ impl<'a> CheckListSetPollRet<'a> { } } +fn pair_tcp_type(local: TcpType) -> TcpType { + match local { + TcpType::Active => TcpType::Passive, + TcpType::Passive => TcpType::Active, + TcpType::So => TcpType::So, + } +} + +fn pair_construct_valid(pair: &CandidatePair, mapped_address: SocketAddr) -> CandidatePair { + let mut local = pair.local.clone(); + local.address = mapped_address; + CandidatePair { + local, + remote: pair.remote.clone(), + } +} + +// can the local candidate pair with 'remote' in any way +fn candidate_can_pair_with(local: &Candidate, remote: &Candidate) -> bool { + let address = match local.candidate_type { + CandidateType::Host => local.address, + _ => local.base_address, + }; + if local.transport_type == TransportType::Tcp + && remote.transport_type == TransportType::Tcp + && (local.tcp_type.is_none() + || remote.tcp_type.is_none() + || pair_tcp_type(local.tcp_type.unwrap()) != remote.tcp_type.unwrap()) + { + return false; + } + local.transport_type == remote.transport_type + && local.component_id == remote.component_id + && address.is_ipv4() == remote.address.is_ipv4() + && address.is_ipv6() == remote.address.is_ipv6() +} + fn validate_username(username: Username, local_credentials: &Credentials) -> bool { let username = username.username().as_bytes(); let local_user = local_credentials.ufrag.as_bytes(); diff --git a/librice/Cargo.toml b/librice/Cargo.toml index 6b47986..3c088e6 100644 --- a/librice/Cargo.toml +++ b/librice/Cargo.toml @@ -13,7 +13,7 @@ rust-version.workspace = true workspace = ".." [dependencies] -librice-proto = { version = "0.0.5", path = "../librice-proto" } +librice-proto.workspace = true async-std = "1" async-io = "2" byteorder.workspace = true From 2fce69e6ce9255d0b5c1aa979fb8d05b4100a5a6 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Sun, 28 Jul 2024 22:54:22 +1000 Subject: [PATCH 2/4] proto/capi: expose rice_component_send --- librice-proto/src/capi.rs | 79 +++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/librice-proto/src/capi.rs b/librice-proto/src/capi.rs index d43e3d0..da8fc37 100644 --- a/librice-proto/src/capi.rs +++ b/librice-proto/src/capi.rs @@ -13,6 +13,8 @@ use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int}; +use core::mem::MaybeUninit; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::str::FromStr; use std::sync::{Arc, Mutex, Once, Weak}; @@ -293,12 +295,37 @@ impl<'a> From> for RiceTransmit { } } -impl RiceTransmit { - unsafe fn clear_c(self) { - let _from = RiceAddress::from_c(self.from); - let _to = RiceAddress::from_c(self.to); - let _data = Data::from(self.data); +#[no_mangle] +pub unsafe extern "C" fn rice_transmit_clear(transmit: *mut RiceTransmit) { + if !(*transmit).from.is_null() { + let _from = RiceAddress::from_c((*transmit).from); + (*transmit).from = core::ptr::null_mut(); } + if !(*transmit).to.is_null() { + let _to = RiceAddress::from_c((*transmit).to); + (*transmit).to = core::ptr::null_mut(); + } + let mut data = RiceData::Borrowed(RiceDataImpl { + ptr: core::ptr::null_mut(), + size: 0, + }); + core::mem::swap(&mut data, &mut (*transmit).data); + let _data = Data::from(data); +} + +#[no_mangle] +pub unsafe extern "C" fn rice_transmit_init(transmit: *mut MaybeUninit) { + (*transmit).write(RiceTransmit { + stream_id: 0, + component_id: 0, + transport: RiceTransportType::Udp, + from: core::ptr::null(), + to: core::ptr::null(), + data: RiceData::Borrowed(RiceDataImpl { + ptr: core::ptr::null_mut(), + size: 0, + }), + }); } fn transmit_from_rust_gather( @@ -420,8 +447,8 @@ pub unsafe extern "C" fn rice_agent_poll_free(poll: *mut RiceAgentPoll) { match *Box::from_raw(poll) { RiceAgentPoll::Closed => (), RiceAgentPoll::WaitUntilMicros(_instant) => (), - RiceAgentPoll::Transmit(transmit) => { - transmit.clear_c(); + RiceAgentPoll::Transmit(mut transmit) => { + rice_transmit_clear(&mut transmit); } RiceAgentPoll::TcpConnect(connect) => { let _connect = AgentPoll::TcpConnect(connect.into()); @@ -908,8 +935,8 @@ pub unsafe extern "C" fn rice_gather_poll_free(poll: *mut RiceGatherPoll) { RiceGatherPoll::Complete => (), RiceGatherPoll::WaitUntilMicros(_instant) => (), RiceGatherPoll::NeedAgent(need_agent) => need_agent.clear_c(), - RiceGatherPoll::SendData(transmit) => { - transmit.clear_c(); + RiceGatherPoll::SendData(mut transmit) => { + rice_transmit_clear(&mut transmit); } RiceGatherPoll::NewCandidate(candidate) => { rice_candidate_free(candidate); @@ -1207,6 +1234,36 @@ pub unsafe extern "C" fn rice_component_gather_candidates( core::mem::forget(component); } +#[no_mangle] +pub unsafe extern "C" fn rice_component_send( + component: *mut RiceComponent, + data: *mut u8, + len: usize, + transmit: *mut RiceTransmit, +) -> RiceError { + let component = Arc::from_raw(component); + + let proto_agent = component.proto_agent.lock().unwrap(); + let proto_stream = proto_agent.stream(component.stream_id).unwrap(); + let proto_component = proto_stream.component(component.component_id).unwrap(); + + let bytes = core::slice::from_raw_parts(data, len); + match proto_component.send(bytes) { + Ok(stun_transmit) => { + *transmit = RiceTransmit { + stream_id: component.stream_id, + component_id: component.component_id, + transport: stun_transmit.transport.into(), + from: Box::into_raw(Box::new(RiceAddress::new(stun_transmit.from))), + to: Box::into_raw(Box::new(RiceAddress::new(stun_transmit.to))), + data: stun_transmit.data.into(), + }; + RiceError::Success + } + Err(_e) => RiceError::Failed, + } +} + // TODO: // - id // - state @@ -1527,10 +1584,10 @@ mod tests { ); need_agent.clear_c(); let ret = rice_stream_poll_gather(stream, 0); - let RiceGatherPoll::SendData(send) = *Box::from_raw(ret) else { + let RiceGatherPoll::SendData(mut send) = *Box::from_raw(ret) else { unreachable!() }; - send.clear_c(); + rice_transmit_clear(&mut send); //rice_gather_poll_free(rice_stream_poll_gather(stream, now)); From 41a8281846b92d7379b4297b2d4ac0665bf47fa5 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Mon, 29 Jul 2024 13:42:10 +1000 Subject: [PATCH 3/4] conncheck: remove internal ConnCheck::clone_with_pair_nominate() The equivalant functionality can be done by simply creating a new ConnCheck. Also take the check's controlling value directly from the checklist. Fixes the controlling value in dump_check_state(). --- librice-proto/src/conncheck.rs | 61 ++++++++++++++++------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/librice-proto/src/conncheck.rs b/librice-proto/src/conncheck.rs index 76d2647..0abbfe5 100644 --- a/librice-proto/src/conncheck.rs +++ b/librice-proto/src/conncheck.rs @@ -227,24 +227,6 @@ impl ConnCheck { } } - fn clone_with_pair_nominate( - conncheck: &ConnCheck, - checklist_id: usize, - pair: CandidatePair, - new_nominate: bool, - ) -> ConnCheck { - match &conncheck.variant { - ConnCheckVariant::Agent(agent) => ConnCheck::new( - checklist_id, - pair, - agent.clone(), - new_nominate, - conncheck.controlling, - ), - _ => unreachable!(), - } - } - fn agent_id(&self) -> Option { match &self.variant { ConnCheckVariant::Agent(agent) => Some(*agent), @@ -1487,11 +1469,13 @@ impl ConnCheckList { ); None } else { - let mut check = ConnCheck::clone_with_pair_nominate( - check, + let agent_id = check.agent_id().unwrap(); + let mut check = ConnCheck::new( self.checklist_id, check.pair.clone(), + agent_id, true, + self.controlling, ); check.set_state(CandidatePairState::Waiting); debug!("attempting nomination with check {:?}", check); @@ -2024,11 +2008,13 @@ impl ConnCheckListSet { if peer_nominating && !check.nominate() { debug!("existing pair succeeded -> nominate"); let pair = check.pair.clone(); - let mut new_check = ConnCheck::clone_with_pair_nominate( - &check, + let agent_id = check.agent_id().unwrap(); + let mut new_check = ConnCheck::new( checklist.checklist_id, pair.clone(), + agent_id, true, + self.controlling, ); checklist.add_check(check); new_check.set_state(CandidatePairState::Waiting); @@ -2056,11 +2042,13 @@ impl ConnCheckListSet { let pair = check.pair.clone(); // TODO: ignore response timeouts - let mut new_check = ConnCheck::clone_with_pair_nominate( - &check, + let agent_id = check.agent_id().unwrap(); + let mut new_check = ConnCheck::new( checklist.checklist_id, pair, + agent_id, peer_nominating, + self.controlling, ); checklist.add_check(check); new_check.set_state(CandidatePairState::Waiting); @@ -2079,11 +2067,12 @@ impl ConnCheckListSet { | CandidatePairState::Failed => { if peer_nominating && !check.nominate() { check.cancel(); - check = ConnCheck::clone_with_pair_nominate( - &check, + check = ConnCheck::new( checklist.checklist_id, check.pair.clone(), + agent_id, peer_nominating, + self.controlling, ); } check.set_state(CandidatePairState::Waiting); @@ -2139,8 +2128,14 @@ impl ConnCheckListSet { conncheck.set_state(CandidatePairState::Succeeded); let pair = conncheck.pair.clone(); let ok_pair = pair_construct_valid(&pair, addr); - let mut ok_check = - ConnCheck::clone_with_pair_nominate(conncheck, checklist_id, ok_pair.clone(), false); + let agent_id = conncheck.agent_id().unwrap(); + let mut ok_check = ConnCheck::new( + checklist_id, + ok_pair.clone(), + agent_id, + false, + self.controlling, + ); if checklist.state != CheckListState::Running { debug!("checklist is not running, ignoring check response"); @@ -2276,11 +2271,13 @@ impl ConnCheckListSet { let old_pair = conncheck.pair.clone(); self.controlling = new_role; conncheck.cancel(); - let mut conncheck = ConnCheck::clone_with_pair_nominate( - conncheck, + let agent_id = conncheck.agent_id().unwrap(); + let mut conncheck = ConnCheck::new( checklist_id, conncheck.pair.clone(), + agent_id, false, + self.controlling, ); conncheck.set_state(CandidatePairState::Waiting); checklist.add_triggered(&conncheck); @@ -2343,6 +2340,7 @@ impl ConnCheckListSet { ) .unwrap(); conncheck.stun_request = Some(stun_request.transaction_id()); + conncheck.controlling = self.controlling; let remote_addr = conncheck.pair.remote.address; let component_id = conncheck.pair.local.component_id; @@ -2654,7 +2652,6 @@ impl ConnCheckListSet { let checklist_id = check.checklist_id; let nominate = check.nominate; - let controlling = check.controlling; let conncheck_id = check.conncheck_id; let check_state = check.state; @@ -2672,7 +2669,7 @@ impl ConnCheckListSet { new_pair.clone(), agent_id, nominate, - controlling, + self.controlling, ); let is_triggered = checklist .triggered From 8ca4204e5f48a751c71bd66aa7921e2bf9eed0f8 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Mon, 29 Jul 2024 13:47:26 +1000 Subject: [PATCH 4/4] proto/capi: handle selected pair setting automatically --- librice-proto/src/capi.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/librice-proto/src/capi.rs b/librice-proto/src/capi.rs index da8fc37..96cd223 100644 --- a/librice-proto/src/capi.rs +++ b/librice-proto/src/capi.rs @@ -472,10 +472,15 @@ pub unsafe extern "C" fn rice_agent_poll( let agent = Arc::from_raw(agent); let mut proto_agent = agent.proto_agent.lock().unwrap(); let now = agent.base_instant + Duration::from_micros(now_micros); - let ret = Box::new(RiceAgentPoll::from_rust( - proto_agent.poll(now), - agent.base_instant, - )); + let ret = proto_agent.poll(now).into_owned(); + if let AgentPoll::SelectedPair(ref pair) = ret { + if let Some(mut stream) = proto_agent.mut_stream(pair.stream_id) { + if let Some(mut component) = stream.mut_component(pair.component_id) { + component.set_selected_pair_with_agent((*pair.selected).clone()); + } + } + } + let ret = Box::new(RiceAgentPoll::from_rust(ret, agent.base_instant)); drop(proto_agent); core::mem::forget(agent); @@ -1260,7 +1265,10 @@ pub unsafe extern "C" fn rice_component_send( }; RiceError::Success } - Err(_e) => RiceError::Failed, + Err(e) => { + warn!("Failed to send data: {e:?}"); + RiceError::Failed + } } }