Skip to content

Commit

Permalink
proxy fix wake compute console retry (#8141)
Browse files Browse the repository at this point in the history
## Problem

1. Proxy is retrying errors from cplane that shouldn't be retried
2. ~~Proxy is not using the retry_after_ms value~~

## Summary of changes

1. Correct the could_retry impl for ConsoleError.
2. ~~Update could_retry interface to support returning a fixed wait
duration.~~
  • Loading branch information
conradludgate committed Jun 25, 2024
1 parent cd9a550 commit 6c5d3b5
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 167 deletions.
111 changes: 71 additions & 40 deletions proxy/src/console/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::fmt::{self, Display};
use crate::auth::IpPattern;

use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::proxy::retry::ShouldRetry;
use crate::proxy::retry::CouldRetry;

/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
Expand Down Expand Up @@ -64,45 +64,47 @@ impl Display for ConsoleError {
}
}

impl ShouldRetry for ConsoleError {
impl CouldRetry for ConsoleError {
fn could_retry(&self) -> bool {
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
return match &self {
ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} => !error.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
ref error,
..
} => {
!error.contains("quota exceeded")
&& !error.contains("the limit for current plan reached")
}
_ => false,
};
// If the error message does not have a status,
// the error is unknown and probably should not retry automatically
let Some(status) = &self.status else {
return false;
};

// retry if the retry info is set.
if status.details.retry_info.is_some() {
return true;
}

// retry if the response has a retry delay
if let Some(retry_info) = self
.status
.as_ref()
.and_then(|s| s.details.retry_info.as_ref())
{
retry_info.retry_delay_ms > 0
} else {
false
// if no retry info set, attempt to use the error code to guess the retry state.
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |e| e.reason);
match reason {
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations => true,
Reason::ConcurrencyLimitReached => true,
Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}
}
}
Expand All @@ -121,38 +123,67 @@ pub struct Details {
pub user_facing_message: Option<UserFacingMessage>,
}

#[derive(Debug, Deserialize)]
#[derive(Copy, Clone, Debug, Deserialize)]
pub struct ErrorInfo {
pub reason: Reason,
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
}

#[derive(Clone, Copy, Debug, Deserialize, Default)]
pub enum Reason {
/// RoleProtected indicates that the role is protected and the attempted operation is not permitted on protected roles.
#[serde(rename = "ROLE_PROTECTED")]
RoleProtected,
/// ResourceNotFound indicates that a resource (project, endpoint, branch, etc.) wasn't found,
/// usually due to the provided ID not being correct or because the subject doesn't have enough permissions to
/// access the requested resource.
/// Prefer a more specific reason if possible, e.g., ProjectNotFound, EndpointNotFound, etc.
#[serde(rename = "RESOURCE_NOT_FOUND")]
ResourceNotFound,
/// ProjectNotFound indicates that the project wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested project.
#[serde(rename = "PROJECT_NOT_FOUND")]
ProjectNotFound,
/// EndpointNotFound indicates that the endpoint wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested endpoint.
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
/// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested branch.
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
/// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
/// NonDefaultBranchComputeTimeExceeded indicates that the compute time quota of non-default branches has been
/// exceeded.
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
NonPrimaryBranchComputeTimeExceeded,
NonDefaultBranchComputeTimeExceeded,
/// ActiveTimeQuotaExceeded indicates that the active time quota was exceeded.
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
ActiveTimeQuotaExceeded,
/// ComputeTimeQuotaExceeded indicates that the compute time quota was exceeded.
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
ComputeTimeQuotaExceeded,
/// WrittenDataQuotaExceeded indicates that the written data quota was exceeded.
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
WrittenDataQuotaExceeded,
/// DataTransferQuotaExceeded indicates that the data transfer quota was exceeded.
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
DataTransferQuotaExceeded,
/// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded.
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
/// RunningOperations indicates that the project already has some running operations
/// and scheduling of new ones is prohibited.
#[serde(rename = "RUNNING_OPERATIONS")]
RunningOperations,
/// ConcurrencyLimitReached indicates that the concurrency limit for an action was reached.
#[serde(rename = "CONCURRENCY_LIMIT_REACHED")]
ConcurrencyLimitReached,
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken,
#[default]
#[serde(other)]
Unknown,
Expand All @@ -170,7 +201,7 @@ impl Reason {
}
}

#[derive(Debug, Deserialize)]
#[derive(Copy, Clone, Debug, Deserialize)]
pub struct RetryInfo {
pub retry_delay_ms: u64,
}
Expand Down
48 changes: 30 additions & 18 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use tracing::info;

pub mod errors {
use crate::{
console::messages::{self, ConsoleError},
console::messages::{self, ConsoleError, Reason},
error::{io_error, ReportableError, UserFacingError},
proxy::retry::ShouldRetry,
proxy::retry::CouldRetry,
};
use thiserror::Error;

Expand Down Expand Up @@ -76,21 +76,22 @@ pub mod errors {
ApiError::Console(e) => {
use crate::error::ErrorKind::*;
match e.get_reason() {
crate::console::messages::Reason::RoleProtected => User,
crate::console::messages::Reason::ResourceNotFound => User,
crate::console::messages::Reason::ProjectNotFound => User,
crate::console::messages::Reason::EndpointNotFound => User,
crate::console::messages::Reason::BranchNotFound => User,
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
User
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
crate::console::messages::Reason::Unknown => match &e {
Reason::RoleProtected => User,
Reason::ResourceNotFound => User,
Reason::ProjectNotFound => User,
Reason::EndpointNotFound => User,
Reason::BranchNotFound => User,
Reason::RateLimitExceeded => ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded => User,
Reason::ActiveTimeQuotaExceeded => User,
Reason::ComputeTimeQuotaExceeded => User,
Reason::WrittenDataQuotaExceeded => User,
Reason::DataTransferQuotaExceeded => User,
Reason::LogicalSizeQuotaExceeded => User,
Reason::ConcurrencyLimitReached => ControlPlane,
Reason::LockAlreadyTaken => ControlPlane,
Reason::RunningOperations => ControlPlane,
Reason::Unknown => match &e {
ConsoleError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
Expand Down Expand Up @@ -128,7 +129,7 @@ pub mod errors {
}
}

impl ShouldRetry for ApiError {
impl CouldRetry for ApiError {
fn could_retry(&self) -> bool {
match self {
// retry some transport errors
Expand Down Expand Up @@ -239,6 +240,17 @@ pub mod errors {
}
}
}

impl CouldRetry for WakeComputeError {
fn could_retry(&self) -> bool {
match self {
WakeComputeError::BadComputeAddress(_) => false,
WakeComputeError::ApiError(e) => e.could_retry(),
WakeComputeError::TooManyConnections => false,
WakeComputeError::TooManyConnectionAttempts(_) => false,
}
}
}
}

/// Auth secret which is managed by the cloud.
Expand Down
20 changes: 11 additions & 9 deletions proxy/src/proxy/connect_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
error::ReportableError,
metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
proxy::{
retry::{retry_after, ShouldRetry},
retry::{retry_after, should_retry, CouldRetry},
wake_compute::wake_compute,
},
Host,
Expand All @@ -17,6 +17,8 @@ use pq_proto::StartupMessageParams;
use tokio::time;
use tracing::{error, info, warn};

use super::retry::ShouldRetryWakeCompute;

const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);

/// If we couldn't connect, a cached connection info might be to blame
Expand Down Expand Up @@ -104,7 +106,7 @@ pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
connect_to_compute_retry_config: RetryConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
Expand Down Expand Up @@ -139,10 +141,10 @@ where

error!(error = ?err, "could not connect to compute node");

let node_info = if !node_info.cached() || !err.should_retry_database_address() {
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries, connect_to_compute_retry_config) {
if should_retry(&err, num_retries, connect_to_compute_retry_config) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
Expand Down Expand Up @@ -188,9 +190,8 @@ where
return Ok(res);
}
Err(e) => {
let retriable = e.should_retry(num_retries, connect_to_compute_retry_config);
if !retriable {
error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
if !should_retry(&e, num_retries, connect_to_compute_retry_config) {
error!(error = ?e, num_retries, retriable = false, "couldn't connect to compute node");
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
Expand All @@ -200,9 +201,10 @@ where
);
return Err(e.into());
}
warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");

warn!(error = ?e, num_retries, retriable = true, "couldn't connect to compute node");
}
}
};

let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
num_retries += 1;
Expand Down
Loading

1 comment on commit 6c5d3b5

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests were run or test report is not available

Test coverage report is not available

The comment gets automatically updated with the latest test results
6c5d3b5 at 2024-06-25T18:09:41.127Z :recycle:

Please sign in to comment.