Skip to content

Commit

Permalink
deps: update to stun-proto 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ystreet committed Jul 14, 2024
1 parent e0a0400 commit 3e91a9f
Show file tree
Hide file tree
Showing 8 changed files with 478 additions and 271 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ edition = "2021"
rust-version = "1.68.2"

[workspace.dependencies]
stun-proto = { version = "0.1.0" }
stun-proto = "0.2.0"
arbitrary = { version = "1", features = ["derive"] }
byteorder = "1"
get_if_addrs = "0.5"
Expand Down
89 changes: 69 additions & 20 deletions librice-proto/src/conncheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::{Duration, Instant};

use crate::candidate::{Candidate, CandidatePair, CandidateType, TcpType, TransportType};
use crate::component::ComponentConnectionState;
use byteorder::{BigEndian, ByteOrder};
use stun_proto::agent::{
HandleStunReply, StunAgent, StunAgentPollRet, StunError, TcpBuffer, Transmit,
};
Expand Down Expand Up @@ -1337,7 +1338,6 @@ impl ConnCheckList {
self.triggered
.retain(|check_id| !triggered_to_remove.contains(check_id));
let nominated_ids = self.nominated.clone();
error!("nominated ids {nominated_ids:?}");
self.pairs.retain(|check| {
if nominated_ids.contains(&check.conncheck_id) {
true
Expand Down Expand Up @@ -1582,6 +1582,7 @@ impl ConnCheckListSetBuilder {
checklist_i: 0,
last_send_time: Instant::now() - ConnCheckListSet::MINIMUM_SET_TICK,
pending_transmits: Default::default(),
pending_messages: Default::default(),
}
}
}
Expand All @@ -1596,6 +1597,13 @@ pub struct ConnCheckListSet {
checklist_i: usize,
last_send_time: Instant,
pending_transmits: VecDeque<(usize, usize, Transmit<'static>)>,
pending_messages: VecDeque<(
usize,
usize,
Arc<Mutex<StunAgent>>,
MessageBuilder<'static>,
SocketAddr,
)>,
}

impl ConnCheckListSet {
Expand Down Expand Up @@ -1662,12 +1670,12 @@ impl ConnCheckListSet {
&request,
transmit.from,
)? {
debug!("Sending response {response:?} to {:?}", transmit.from);
let mut agent_inner = agent.lock().unwrap();
self.pending_transmits.push_front((
self.pending_messages.push_back((
checklist_id,
local_cand.component_id,
agent_inner.send(response, transmit.from)?.into_owned(),
agent.clone(),
response.into_owned(),
transmit.from,
));
return Ok(Some(HandleRecvReply::Handled));
}
Expand Down Expand Up @@ -2311,12 +2319,12 @@ impl ConnCheckListSet {
let mut agent = agent.lock().unwrap();

let transmit = agent
.send(stun_request, conncheck.pair.remote.address)
.send(stun_request, conncheck.pair.remote.address, now)
.unwrap();
Ok(CheckListSetPollRet::Transmit(
checklist_id,
conncheck.pair.local.component_id,
transmit.into_owned(),
transmit_send(transmit),
))
}

Expand Down Expand Up @@ -2410,6 +2418,20 @@ impl ConnCheckListSet {
if let Some((checklist_id, cid, transmit)) = self.pending_transmits.pop_back() {
return CheckListSetPollRet::Transmit(checklist_id, cid, transmit);
}
if let Some((checklist_id, cid, agent, msg, to)) = self.pending_messages.pop_back() {
debug!("Sending response {msg:?} to {:?}", to);
let mut agent_inner = agent.lock().unwrap();
match agent_inner.send(msg, to, now) {
Ok(transmit) => {
return CheckListSetPollRet::Transmit(
checklist_id,
cid,
transmit_send(transmit),
)
}
Err(e) => warn!("error sending: {e}"),
}
}

for checklist in self.checklists.iter_mut() {
if let Some(event) = checklist.poll_event() {
Expand Down Expand Up @@ -2469,7 +2491,7 @@ impl ConnCheckListSet {
return CheckListSetPollRet::Transmit(
checklist.checklist_id,
check.pair.local.component_id,
transmit.into_owned(),
transmit_send(transmit),
);
}
}
Expand Down Expand Up @@ -2589,15 +2611,14 @@ impl ConnCheckListSet {
));
let transaction_id = stun_request.transaction_id();

self.pending_transmits.push_front((
let agent = Arc::new(Mutex::new(agent));
self.pending_messages.push_front((
checklist_id,
check.pair.local.component_id,
agent
.send(stun_request, check.pair.remote.address)
.unwrap()
.into_owned(),
agent.clone(),
stun_request.into_owned(),
check.pair.remote.address,
));
let agent = Arc::new(Mutex::new(agent));
checklist.agents.push(agent.clone());

let mut new_check = ConnCheck::new(
Expand Down Expand Up @@ -2689,6 +2710,24 @@ fn validate_username(username: Username, local_credentials: &Credentials) -> boo
}
}

pub(crate) fn transmit_send(transmit: Transmit) -> Transmit<'static> {
match transmit.transport {
TransportType::Udp => transmit.into_owned(),
TransportType::Tcp => {
let mut data = Vec::with_capacity(transmit.data.len());
data.resize(2, 0);
BigEndian::write_u16(&mut data, transmit.data.len() as u16);
data.extend_from_slice(&transmit.data);
Transmit::new_owned(
data.into_boxed_slice(),
transmit.transport,
transmit.from,
transmit.to,
)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -2973,6 +3012,7 @@ mod tests {
transmit: Transmit<'_>,
error_response: Option<u16>,
response_address: Option<SocketAddr>,
now: Instant,
) -> Option<Transmit<'b>> {
// XXX: assumes that tcp framing is not in use
let offset = match transmit.transport {
Expand All @@ -2996,9 +3036,10 @@ mod tests {
)
.unwrap(),
transmit.from,
now,
)
.unwrap();
return Some(transmit.into_owned());
return Some(transmit_send(transmit));
}
}
}
Expand Down Expand Up @@ -3327,6 +3368,7 @@ mod tests {
transmit,
self.error_response,
self.response_address,
now,
)
.unwrap();
info!("reply: {reply:?}");
Expand Down Expand Up @@ -3656,6 +3698,7 @@ mod tests {
transmit,
None,
None,
now,
) else {
unreachable!();
};
Expand Down Expand Up @@ -3690,6 +3733,7 @@ mod tests {
transmit.into_owned(),
None,
None,
now,
) else {
unreachable!();
};
Expand Down Expand Up @@ -3733,6 +3777,7 @@ mod tests {
.priority(10);
let mut state = state.build();
state.local_list().generate_checks();
let now = Instant::now();
let remote_addr = SocketAddr::new(state.remote.candidate.base_address.ip(), 2000);
let mut remote_cand = state.remote.candidate.clone();
remote_cand.address = remote_addr;
Expand Down Expand Up @@ -3766,9 +3811,11 @@ mod tests {
.checklist_set
.incoming_data(
state.local.checklist_id,
&remote_agent
.send(request, state.local.peer.candidate.base_address)
.unwrap(),
&transmit_send(
remote_agent
.send(request, state.local.peer.candidate.base_address, now)
.unwrap()
),
)
.unwrap()[0],
HandleRecvReply::Handled
Expand Down Expand Up @@ -3810,6 +3857,7 @@ mod tests {
transmit.into_owned(),
None,
None,
now,
) else {
unreachable!();
};
Expand Down Expand Up @@ -3845,6 +3893,7 @@ mod tests {
transmit.into_owned(),
None,
None,
now,
) else {
unreachable!();
};
Expand Down Expand Up @@ -3932,7 +3981,7 @@ mod tests {
request.add_fingerprint().unwrap();

let local_addr = state.local.peer.stun_agent().local_addr();
let transmit = remote_agent.send(request, local_addr).unwrap();
let transmit = remote_agent.send(request, local_addr, now).unwrap();

info!("sending prflx request");
let reply = state
Expand Down Expand Up @@ -4314,7 +4363,7 @@ mod tests {
request.add_fingerprint().unwrap();

let local_addr = state.local.peer.stun_agent().local_addr();
let transmit = remote_agent.send(request, local_addr).unwrap();
let transmit = remote_agent.send(request, local_addr, now).unwrap();

info!("sending request");
let reply = state
Expand Down
Loading

0 comments on commit 3e91a9f

Please sign in to comment.