Skip to content

Commit

Permalink
[tunnelbroker] Implement lazy AMQP channel wrapper
Browse files Browse the repository at this point in the history
Summary:
This cancels minor quality regression introduced earlier in D13604.
- For websocket, we store the channel in the `WebsocketSession` struct and reuse it unless it's broken
- However for gRPC, after opted out of channel pool, a new short-lived channel is created for every single call, which is against what RabbitMQ [recommends](https://www.rabbitmq.com/docs/channels#high-channel-churn).

Created a simple wrapper that lazily-initializes the channel upon first use, and creates a new one only if the previous was closed.

Depends on D13611

Test Plan: Added some additional logging and ran Commtest

Reviewers: kamil

Reviewed By: kamil

Subscribers: ashoat, tomek

Differential Revision: https://phab.comm.dev/D13621
  • Loading branch information
barthap committed Oct 7, 2024
1 parent 51a32d2 commit b928654
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
36 changes: 35 additions & 1 deletion services/tunnelbroker/src/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};

use crate::constants::error_types;
Expand Down Expand Up @@ -127,6 +127,40 @@ impl AmqpConnection {
}
}

/// Wrapper over [`lapin::Channel`] that automatically recreates AMQP channel
/// in case of errors. The channel is initialized on first use.
///
/// TODO: Add support for restoring channel topology (queues and consumers)
/// (`lapin` has this built-in, but it's internal crate feature)
pub struct AmqpChannel {
conn: AmqpConnection,
channel: Arc<Mutex<Option<lapin::Channel>>>,
}

impl AmqpChannel {
pub fn new(amqp_connection: &AmqpConnection) -> Self {
let channel = Arc::new(Mutex::new(None));
Self {
conn: amqp_connection.clone(),
channel,
}
}

pub async fn get(&self) -> Result<lapin::Channel, lapin::Error> {
let mut channel = self.channel.lock().await;
match channel.as_ref() {
Some(ch) if ch.status().connected() => Ok(ch.clone()),
_ => {
let new_channel = self.conn.new_channel().await?;
let channel_id = new_channel.id();
debug!(channel_id, "Instantiated lazy AMQP channel.");
*channel = Some(new_channel.clone());
Ok(new_channel)
}
}
}
}

fn should_ignore_error(err: &lapin::Error) -> bool {
use lapin::Error as E;
use std::io::ErrorKind;
Expand Down
14 changes: 7 additions & 7 deletions services/tunnelbroker/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use tonic::transport::Server;
use tracing::debug;
use tunnelbroker_messages::MessageToDevice;

use crate::amqp::AmqpConnection;
use crate::amqp::{AmqpChannel, AmqpConnection};
use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG};
use crate::database::{handle_ddb_error, DatabaseClient};
use crate::{constants, CONFIG};

struct TunnelbrokerGRPC {
client: DatabaseClient,
amqp: AmqpConnection,
amqp_channel: AmqpChannel,
}

pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status {
Expand Down Expand Up @@ -58,8 +58,8 @@ impl TunnelbrokerService for TunnelbrokerGRPC {
.map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?;

self
.amqp
.new_channel()
.amqp_channel
.get()
.await
.map_err(handle_amqp_error)?
.basic_publish(
Expand All @@ -85,8 +85,8 @@ impl TunnelbrokerService for TunnelbrokerGRPC {
debug!("Connection close request for device {}", &message.device_id);

self
.amqp
.new_channel()
.amqp_channel
.get()
.await
.map_err(handle_amqp_error)?
.basic_publish(
Expand Down Expand Up @@ -141,7 +141,7 @@ pub async fn run_server(
.http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT))
.add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC {
client,
amqp: amqp_connection.clone(),
amqp_channel: AmqpChannel::new(amqp_connection),
}))
.serve(addr)
.await
Expand Down

0 comments on commit b928654

Please sign in to comment.