From b928654312591abbbb1be9c6d304d76e8fa5775f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Klocek?= Date: Fri, 4 Oct 2024 22:03:17 +0200 Subject: [PATCH] [tunnelbroker] Implement lazy AMQP channel wrapper Summary: This cancels minor quality regression introduced earlier in D13604. - For websocket, we store the channel in the `WebsocketSession` struct and reuse it unless it's broken - However for gRPC, after opted out of channel pool, a new short-lived channel is created for every single call, which is against what RabbitMQ [recommends](https://www.rabbitmq.com/docs/channels#high-channel-churn). Created a simple wrapper that lazily-initializes the channel upon first use, and creates a new one only if the previous was closed. Depends on D13611 Test Plan: Added some additional logging and ran Commtest Reviewers: kamil Reviewed By: kamil Subscribers: ashoat, tomek Differential Revision: https://phab.comm.dev/D13621 --- services/tunnelbroker/src/amqp.rs | 36 ++++++++++++++++++++++++++- services/tunnelbroker/src/grpc/mod.rs | 14 +++++------ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs index 1b947a3f07..38cca467bf 100644 --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -3,7 +3,7 @@ use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; use crate::constants::error_types; @@ -127,6 +127,40 @@ impl AmqpConnection { } } +/// Wrapper over [`lapin::Channel`] that automatically recreates AMQP channel +/// in case of errors. The channel is initialized on first use. +/// +/// TODO: Add support for restoring channel topology (queues and consumers) +/// (`lapin` has this built-in, but it's internal crate feature) +pub struct AmqpChannel { + conn: AmqpConnection, + channel: Arc>>, +} + +impl AmqpChannel { + pub fn new(amqp_connection: &AmqpConnection) -> Self { + let channel = Arc::new(Mutex::new(None)); + Self { + conn: amqp_connection.clone(), + channel, + } + } + + pub async fn get(&self) -> Result { + let mut channel = self.channel.lock().await; + match channel.as_ref() { + Some(ch) if ch.status().connected() => Ok(ch.clone()), + _ => { + let new_channel = self.conn.new_channel().await?; + let channel_id = new_channel.id(); + debug!(channel_id, "Instantiated lazy AMQP channel."); + *channel = Some(new_channel.clone()); + Ok(new_channel) + } + } + } +} + fn should_ignore_error(err: &lapin::Error) -> bool { use lapin::Error as E; use std::io::ErrorKind; diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs index d32a5b51bb..147a00253c 100644 --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -11,14 +11,14 @@ use tonic::transport::Server; use tracing::debug; use tunnelbroker_messages::MessageToDevice; -use crate::amqp::AmqpConnection; +use crate::amqp::{AmqpChannel, AmqpConnection}; use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; struct TunnelbrokerGRPC { client: DatabaseClient, - amqp: AmqpConnection, + amqp_channel: AmqpChannel, } pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status { @@ -58,8 +58,8 @@ impl TunnelbrokerService for TunnelbrokerGRPC { .map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?; self - .amqp - .new_channel() + .amqp_channel + .get() .await .map_err(handle_amqp_error)? .basic_publish( @@ -85,8 +85,8 @@ impl TunnelbrokerService for TunnelbrokerGRPC { debug!("Connection close request for device {}", &message.device_id); self - .amqp - .new_channel() + .amqp_channel + .get() .await .map_err(handle_amqp_error)? .basic_publish( @@ -141,7 +141,7 @@ pub async fn run_server( .http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT)) .add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC { client, - amqp: amqp_connection.clone(), + amqp_channel: AmqpChannel::new(amqp_connection), })) .serve(addr) .await