Skip to content

Commit

Permalink
remove retry delay handling
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jun 25, 2024
1 parent be71a20 commit b36b3c5
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 66 deletions.
7 changes: 3 additions & 4 deletions proxy/src/console/messages.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};
use std::time::Duration;

use crate::auth::IpPattern;

use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::proxy::retry::{CouldRetry, Retry};
use crate::proxy::retry::CouldRetry;

/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
Expand Down Expand Up @@ -66,13 +65,13 @@ impl Display for ConsoleError {
}

impl CouldRetry for ConsoleError {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
// retry if the retry info is set.
// if no status or retry info, do not retry.
self.status
.as_ref()
.and_then(|status| status.details.retry_info.as_ref())
.map(|info| Retry::Fixed(Duration::from_millis(info.retry_delay_ms)))
.is_some()
}
}

Expand Down
12 changes: 6 additions & 6 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod errors {
use crate::{
console::messages::{self, ConsoleError},
error::{io_error, ReportableError, UserFacingError},
proxy::retry::{CouldRetry, Retry},
proxy::retry::CouldRetry,
};
use thiserror::Error;

Expand Down Expand Up @@ -129,7 +129,7 @@ pub mod errors {
}

impl CouldRetry for ApiError {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
match self {
// retry some transport errors
Self::Transport(io) => io.could_retry(),
Expand Down Expand Up @@ -241,12 +241,12 @@ pub mod errors {
}

impl CouldRetry for WakeComputeError {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
match self {
WakeComputeError::BadComputeAddress(_) => None,
WakeComputeError::BadComputeAddress(_) => false,
WakeComputeError::ApiError(e) => e.could_retry(),
WakeComputeError::TooManyConnections => None,
WakeComputeError::TooManyConnectionAttempts(_) => None,
WakeComputeError::TooManyConnections => false,
WakeComputeError::TooManyConnectionAttempts(_) => false,
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions proxy/src/proxy/connect_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
error::ReportableError,
metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
proxy::{
retry::{should_retry, CouldRetry},
retry::{retry_after, should_retry, CouldRetry},
wake_compute::wake_compute,
},
Host,
Expand Down Expand Up @@ -144,7 +144,7 @@ where
let node_info = if !node_info.cached() || !err.should_retry_database_address() {
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, connect_to_compute_retry_config).is_none() {
if should_retry(&err, num_retries, connect_to_compute_retry_config) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
Expand Down Expand Up @@ -173,7 +173,7 @@ where
info!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
let (e, wait_duration) = match mechanism
match mechanism
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
.await
{
Expand All @@ -189,9 +189,8 @@ where
info!(?num_retries, "connected to compute node after");
return Ok(res);
}
Err(e) => match should_retry(&e, num_retries, connect_to_compute_retry_config) {
Some(wait_duration) => (e, wait_duration),
None => {
Err(e) => {
if !should_retry(&e, num_retries, connect_to_compute_retry_config) {
error!(error = ?e, num_retries, retriable = false, "couldn't connect to compute node");
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
Expand All @@ -202,10 +201,12 @@ where
);
return Err(e.into());
}
},

warn!(error = ?e, num_retries, retriable = true, "couldn't connect to compute node");
}
};

warn!(error = ?e, num_retries, retriable = true, "couldn't connect to compute node");
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
num_retries += 1;

let pause = ctx
Expand Down
45 changes: 15 additions & 30 deletions proxy/src/proxy/retry.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,40 @@
use crate::{compute, config::RetryConfig};
use std::{error::Error, io, time::Duration};
use std::{error::Error, io};
use tokio::time;

pub enum Retry {
Backoff,
Fixed(Duration),
}

pub trait CouldRetry {
fn could_retry(&self) -> Option<Retry>;
fn could_retry(&self) -> bool;
}

pub trait CouldRetry2 {
fn should_retry_database_address(&self) -> bool;
}

pub fn should_retry(
err: &impl CouldRetry,
num_retries: u32,
config: RetryConfig,
) -> Option<Duration> {
match err {
_ if num_retries >= config.max_retries => None,
err => match err.could_retry()? {
Retry::Backoff => Some(retry_after(num_retries, config)),
Retry::Fixed(fixed) => Some(fixed),
},
}
pub fn should_retry(err: &impl CouldRetry, num_retries: u32, config: RetryConfig) -> bool {
num_retries < config.max_retries && err.could_retry()
}

impl CouldRetry for io::Error {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
use std::io::ErrorKind;
match self.kind() {
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut => {
Some(Retry::Backoff)
true
}
_ => None,
_ => false,
}
}
}

impl CouldRetry for tokio_postgres::error::DbError {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
use tokio_postgres::error::SqlState;
match *self.code() {
SqlState::CONNECTION_FAILURE
| SqlState::CONNECTION_EXCEPTION
| SqlState::CONNECTION_DOES_NOT_EXIST
| SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION => Some(Retry::Backoff),
_ => None,
| SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION => true,
_ => false,
}
}
}
Expand All @@ -72,13 +57,13 @@ impl CouldRetry2 for tokio_postgres::error::DbError {
}

impl CouldRetry for tokio_postgres::Error {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
io::Error::could_retry(io_err)
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
tokio_postgres::error::DbError::could_retry(db_err)
} else {
None
false
}
}
}
Expand All @@ -93,12 +78,12 @@ impl CouldRetry2 for tokio_postgres::Error {
}

impl CouldRetry for compute::ConnectionError {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.could_retry(),
compute::ConnectionError::CouldNotConnect(err) => err.could_retry(),
compute::ConnectionError::WakeComputeError(err) => err.could_retry(),
_ => None,
_ => false,
}
}
}
Expand All @@ -113,7 +98,7 @@ impl CouldRetry2 for compute::ConnectionError {
}
}

fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration {
pub fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration {
config
.base_delay
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
Expand Down
10 changes: 5 additions & 5 deletions proxy/src/proxy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::error::ErrorKind;
use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId};
use anyhow::{bail, Context};
use async_trait::async_trait;
use retry::{CouldRetry2, Retry};
use retry::CouldRetry2;
use rstest::rstest;
use rustls::pki_types;
use tokio_postgres::config::SslMode;
Expand Down Expand Up @@ -425,8 +425,8 @@ impl std::fmt::Display for TestConnectError {
impl std::error::Error for TestConnectError {}

impl CouldRetry for TestConnectError {
fn could_retry(&self) -> Option<Retry> {
self.retryable.then_some(Retry::Backoff)
fn could_retry(&self) -> bool {
self.retryable
}
}
impl CouldRetry2 for TestConnectError {
Expand Down Expand Up @@ -480,7 +480,7 @@ impl TestBackend for TestConnectMechanism {
error: "TEST".into(),
status: None,
});
assert!(err.could_retry().is_none());
assert!(!err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
ConnectAction::WakeRetry => {
Expand All @@ -497,7 +497,7 @@ impl TestBackend for TestConnectMechanism {
},
}),
});
assert!(err.could_retry().is_some());
assert!(err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
x => panic!("expecting action {:?}, wake_compute is called instead", x),
Expand Down
20 changes: 10 additions & 10 deletions proxy/src/proxy/wake_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
WakeupFailureKind,
};
use crate::proxy::retry::should_retry;
use crate::proxy::retry::{retry_after, should_retry};
use hyper1::StatusCode;
use tracing::{error, info, warn};

Expand All @@ -20,12 +20,12 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
) -> Result<CachedNodeInfo, WakeComputeError> {
let retry_type = RetryType::WakeCompute;
loop {
let (e, wait_duration) = match api.wake_compute(ctx).await {
Err(e) => match should_retry(&e, *num_retries, config) {
Some(wait_duration) => (e, wait_duration),
None => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
match api.wake_compute(ctx).await {
Err(e) => {
let retriable = should_retry(&e, *num_retries, config);
report_error(&e, retriable);
if !retriable {
error!(error = ?e, num_retries, retriable, "couldn't wake compute node");
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
Expand All @@ -35,7 +35,8 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
);
return Err(e);
}
},
warn!(error = ?e, num_retries, retriable, "couldn't wake compute node");
}
Ok(n) => {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
Expand All @@ -49,8 +50,7 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
}
};

warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
report_error(&e, true);
let wait_duration = retry_after(*num_retries, config);
*num_retries += 1;

let pause = ctx
Expand Down
6 changes: 3 additions & 3 deletions proxy/src/serverless/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
intern::EndpointIdInt,
proxy::{
connect_compute::ConnectMechanism,
retry::{CouldRetry, CouldRetry2, Retry},
retry::{CouldRetry, CouldRetry2},
},
rate_limiter::EndpointRateLimiter,
Host,
Expand Down Expand Up @@ -183,14 +183,14 @@ impl UserFacingError for HttpConnError {
}

impl CouldRetry for HttpConnError {
fn could_retry(&self) -> Option<Retry> {
fn could_retry(&self) -> bool {
match self {
HttpConnError::ConnectionError(e) => e.could_retry(),
HttpConnError::ConnectionClosedAbruptly(_)
| HttpConnError::GetAuthInfo(_)
| HttpConnError::AuthError(_)
| HttpConnError::WakeCompute(_)
| HttpConnError::TooManyConnectionAttempts(_) => None,
| HttpConnError::TooManyConnectionAttempts(_) => false,
}
}
}
Expand Down

0 comments on commit b36b3c5

Please sign in to comment.