Skip to content

Commit

Permalink
made rate limiting async
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Sep 19, 2024
1 parent 7c54e92 commit 9d6b3ed
Showing 1 changed file with 104 additions and 62 deletions.
166 changes: 104 additions & 62 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use {
futures::{stream::FuturesUnordered, Future, StreamExt as _},
indexmap::map::{Entry, IndexMap},
percentage::Percentage,
quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn::{
Accept, Connecting, Connection, Endpoint, EndpointConfig, Incoming, TokioRuntime, VarInt,
},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
smallvec::SmallVec,
Expand Down Expand Up @@ -236,6 +238,85 @@ pub fn spawn_server_multi(
})
}

#[allow(clippy::too_many_arguments)]
async fn handle_incoming_connection(
incoming: Incoming,
overall_connection_rate_limiter: Arc<TotalConnectionRateLimiter>,
rate_limiter: Arc<ConnectionRateLimiter>,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
staked_connection_table: Arc<Mutex<ConnectionTable>>,
packet_sender: AsyncSender<PacketAccumulator>,
max_connections_per_peer: usize,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamerStats>,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
) {
let remote_address = incoming.remote_address();

// first check overall connection rate limit:
if !overall_connection_rate_limiter.is_allowed() {
debug!(
"Reject connection from {:?} -- total rate limiting exceeded",
remote_address.ip()
);
stats
.connection_rate_limited_across_all
.fetch_add(1, Ordering::Relaxed);
incoming.ignore();
return;
}

if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD {
rate_limiter.retain_recent();
}
stats
.connection_rate_limiter_length
.store(rate_limiter.len(), Ordering::Relaxed);
debug!("Got a connection {remote_address:?}");
if !rate_limiter.is_allowed(&remote_address.ip()) {
debug!(
"Reject connection from {:?} -- rate limiting exceeded",
remote_address
);
stats
.connection_rate_limited_per_ipaddr
.fetch_add(1, Ordering::Relaxed);
incoming.ignore();
return;
}

stats
.outstanding_incoming_connection_attempts
.fetch_add(1, Ordering::Relaxed);
let connecting = incoming.accept();
match connecting {
Ok(connecting) => {
setup_connection(
connecting,
unstaked_connection_table,
staked_connection_table,
packet_sender,
max_connections_per_peer,
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats,
wait_for_chunk_timeout,
stream_load_ema,
)
.await;
}
Err(err) => {
debug!("Incoming::accept(): error {:?}", err);
}
}
}

#[allow(clippy::too_many_arguments)]
async fn run_server(
name: &'static str,
Expand All @@ -252,9 +333,12 @@ async fn run_server(
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) {
let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min);
let overall_connection_rate_limiter =
TotalConnectionRateLimiter::new(TOTAL_CONNECTIONS_PER_SECOND);
let rate_limiter = Arc::new(ConnectionRateLimiter::new(
max_connections_per_ipaddr_per_min,
));
let overall_connection_rate_limiter = Arc::new(TotalConnectionRateLimiter::new(
TOTAL_CONNECTIONS_PER_SECOND,
));

const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
debug!("spawn quic server");
Expand Down Expand Up @@ -320,65 +404,23 @@ async fn run_server(
stats
.total_incoming_connection_attempts
.fetch_add(1, Ordering::Relaxed);
let remote_address = incoming.remote_address();

// first check overall connection rate limit:
if !overall_connection_rate_limiter.is_allowed() {
debug!(
"Reject connection from {:?} -- total rate limiting exceeded",
remote_address.ip()
);
stats
.connection_rate_limited_across_all
.fetch_add(1, Ordering::Relaxed);
incoming.ignore();
continue;
}

if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD {
rate_limiter.retain_recent();
}
stats
.connection_rate_limiter_length
.store(rate_limiter.len(), Ordering::Relaxed);
debug!("Got a connection {remote_address:?}");
if !rate_limiter.is_allowed(&remote_address.ip()) {
debug!(
"Reject connection from {:?} -- rate limiting exceeded",
remote_address
);
stats
.connection_rate_limited_per_ipaddr
.fetch_add(1, Ordering::Relaxed);
incoming.ignore();
continue;
}

stats
.outstanding_incoming_connection_attempts
.fetch_add(1, Ordering::Relaxed);
let connecting = incoming.accept();
match connecting {
Ok(connecting) => {
tokio::spawn(setup_connection(
connecting,
unstaked_connection_table.clone(),
staked_connection_table.clone(),
sender.clone(),
max_connections_per_peer,
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
stream_load_ema.clone(),
));
}
Err(err) => {
debug!("Incoming::accept(): error {:?}", err);
}
}
tokio::spawn(handle_incoming_connection(
incoming,
overall_connection_rate_limiter.clone(),
rate_limiter.clone(),
unstaked_connection_table.clone(),
staked_connection_table.clone(),
sender.clone(),
max_connections_per_peer,
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
stream_load_ema.clone(),
));
} else {
debug!("accept(): Timed out waiting for connection");
}
Expand Down

0 comments on commit 9d6b3ed

Please sign in to comment.