Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy fix wake compute console retry #8141

Merged
merged 9 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 73 additions & 37 deletions proxy/src/console/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::fmt::{self, Display};
use crate::auth::IpPattern;

use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::proxy::retry::ShouldRetry;
use crate::proxy::retry::CouldRetry;

/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
Expand Down Expand Up @@ -64,45 +64,52 @@ impl Display for ConsoleError {
}
}

impl ShouldRetry for ConsoleError {
impl CouldRetry for ConsoleError {
fn could_retry(&self) -> bool {
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
return match &self {
ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} => !error.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
ref error,
..
} => {
!error.contains("quota exceeded")
&& !error.contains("the limit for current plan reached")
}
_ => false,
};
}

// retry if the response has a retry delay
if let Some(retry_info) = self
if self
.status
.as_ref()
.and_then(|s| s.details.retry_info.as_ref())
.and_then(|status| status.details.retry_info.as_ref())
.is_some()
{
retry_info.retry_delay_ms > 0
} else {
false
return true;
}

if let Some(status) = &self.status {
// retry if the retry info is set.
if let Some(_) = status.details.retry_info {
return true;
}

// if no retry info set, attempt to use the error code to guess the retry state.
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |e| e.reason);
match reason {
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded |
Reason::NonDefaultBranchComputeTimeExceeded |
Reason::ActiveTimeQuotaExceeded |
Reason::ComputeTimeQuotaExceeded |
Reason::WrittenDataQuotaExceeded |
Reason::DataTransferQuotaExceeded |
Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations => true,
Reason::ConcurrencyLimitReached => true,
Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}
}
}
}
Expand All @@ -129,30 +136,59 @@ pub struct ErrorInfo {

#[derive(Clone, Copy, Debug, Deserialize, Default)]
pub enum Reason {
/// RoleProtected indicates that the role is protected and the attempted operation is not permitted on protected roles.
#[serde(rename = "ROLE_PROTECTED")]
RoleProtected,
/// ResourceNotFound indicates that a resource (project, endpoint, branch, etc.) wasn't found,
/// usually due to the provided ID not being correct or because the subject doesn't have enough permissions to
/// access the requested resource.
/// Prefer a more specific reason if possible, e.g., ProjectNotFound, EndpointNotFound, etc.
#[serde(rename = "RESOURCE_NOT_FOUND")]
ResourceNotFound,
/// ProjectNotFound indicates that the project wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested project.
#[serde(rename = "PROJECT_NOT_FOUND")]
ProjectNotFound,
/// EndpointNotFound indicates that the endpoint wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested endpoint.
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
/// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct,
/// or that the subject doesn't have enough permissions to access the requested branch.
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
/// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
/// NonDefaultBranchComputeTimeExceeded indicates that the compute time quota of non-default branches has been
/// exceeded.
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
NonPrimaryBranchComputeTimeExceeded,
NonDefaultBranchComputeTimeExceeded,
/// ActiveTimeQuotaExceeded indicates that the active time quota was exceeded.
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
ActiveTimeQuotaExceeded,
/// ComputeTimeQuotaExceeded indicates that the compute time quota was exceeded.
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
ComputeTimeQuotaExceeded,
/// WrittenDataQuotaExceeded indicates that the written data quota was exceeded.
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
WrittenDataQuotaExceeded,
/// DataTransferQuotaExceeded indicates that the data transfer quota was exceeded.
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
DataTransferQuotaExceeded,
/// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded.
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
/// RunningOperations indicates that the project already has some running operations
/// and scheduling of new ones is prohibited.
#[serde(rename = "RUNNING_OPERATIONS")]
RunningOperations,
/// ConcurrencyLimitReached indicates that the concurrency limit for an action was reached.
#[serde(rename = "CONCURRENCY_LIMIT_REACHED")]
ConcurrencyLimitReached,
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken,
#[default]
#[serde(other)]
Unknown,
Expand Down
15 changes: 13 additions & 2 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod errors {
use crate::{
console::messages::{self, ConsoleError},
error::{io_error, ReportableError, UserFacingError},
proxy::retry::ShouldRetry,
proxy::retry::CouldRetry,
};
use thiserror::Error;

Expand Down Expand Up @@ -128,7 +128,7 @@ pub mod errors {
}
}

impl ShouldRetry for ApiError {
impl CouldRetry for ApiError {
fn could_retry(&self) -> bool {
match self {
// retry some transport errors
Expand Down Expand Up @@ -239,6 +239,17 @@ pub mod errors {
}
}
}

impl CouldRetry for WakeComputeError {
fn could_retry(&self) -> bool {
match self {
WakeComputeError::BadComputeAddress(_) => false,
WakeComputeError::ApiError(e) => e.could_retry(),
WakeComputeError::TooManyConnections => false,
WakeComputeError::TooManyConnectionAttempts(_) => false,
}
}
}
}

/// Auth secret which is managed by the cloud.
Expand Down
20 changes: 11 additions & 9 deletions proxy/src/proxy/connect_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
error::ReportableError,
metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
proxy::{
retry::{retry_after, ShouldRetry},
retry::{retry_after, should_retry, CouldRetry},
wake_compute::wake_compute,
},
Host,
Expand All @@ -17,6 +17,8 @@ use pq_proto::StartupMessageParams;
use tokio::time;
use tracing::{error, info, warn};

use super::retry::ShouldRetryWakeCompute;

const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);

/// If we couldn't connect, a cached connection info might be to blame
Expand Down Expand Up @@ -104,7 +106,7 @@ pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
connect_to_compute_retry_config: RetryConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
Expand Down Expand Up @@ -139,10 +141,10 @@ where

error!(error = ?err, "could not connect to compute node");

let node_info = if !node_info.cached() || !err.should_retry_database_address() {
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries, connect_to_compute_retry_config) {
if should_retry(&err, num_retries, connect_to_compute_retry_config) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
Expand Down Expand Up @@ -188,9 +190,8 @@ where
return Ok(res);
}
Err(e) => {
let retriable = e.should_retry(num_retries, connect_to_compute_retry_config);
if !retriable {
error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
if !should_retry(&e, num_retries, connect_to_compute_retry_config) {
error!(error = ?e, num_retries, retriable = false, "couldn't connect to compute node");
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
Expand All @@ -200,9 +201,10 @@ where
);
return Err(e.into());
}
warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");

warn!(error = ?e, num_retries, retriable = true, "couldn't connect to compute node");
}
}
};

let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
num_retries += 1;
Expand Down
54 changes: 31 additions & 23 deletions proxy/src/proxy/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ use crate::{compute, config::RetryConfig};
use std::{error::Error, io};
use tokio::time;

pub trait ShouldRetry {
pub trait CouldRetry {
/// Returns true if the error could be retried
fn could_retry(&self) -> bool;
fn should_retry(&self, num_retries: u32, config: RetryConfig) -> bool {
match self {
_ if num_retries >= config.max_retries => false,
err => err.could_retry(),
}
}
fn should_retry_database_address(&self) -> bool {
true
}
}

impl ShouldRetry for io::Error {
pub trait ShouldRetryWakeCompute {
/// Returns true if we need to invalidate the cache for this node.
/// If false, we can continue retrying with the current node cache.
fn should_retry_wake_compute(&self) -> bool;
}

pub fn should_retry(err: &impl CouldRetry, num_retries: u32, config: RetryConfig) -> bool {
num_retries < config.max_retries && err.could_retry()
}

impl CouldRetry for io::Error {
fn could_retry(&self) -> bool {
use std::io::ErrorKind;
matches!(
Expand All @@ -25,7 +27,7 @@ impl ShouldRetry for io::Error {
}
}

impl ShouldRetry for tokio_postgres::error::DbError {
impl CouldRetry for tokio_postgres::error::DbError {
fn could_retry(&self) -> bool {
use tokio_postgres::error::SqlState;
matches!(
Expand All @@ -36,7 +38,9 @@ impl ShouldRetry for tokio_postgres::error::DbError {
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
)
}
fn should_retry_database_address(&self) -> bool {
}
impl ShouldRetryWakeCompute for tokio_postgres::error::DbError {
fn should_retry_wake_compute(&self) -> bool {
use tokio_postgres::error::SqlState;
// Here are errors that happens after the user successfully authenticated to the database.
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
Expand All @@ -53,7 +57,7 @@ impl ShouldRetry for tokio_postgres::error::DbError {
}
}

impl ShouldRetry for tokio_postgres::Error {
impl CouldRetry for tokio_postgres::Error {
fn could_retry(&self) -> bool {
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
io::Error::could_retry(io_err)
Expand All @@ -63,29 +67,33 @@ impl ShouldRetry for tokio_postgres::Error {
false
}
}
fn should_retry_database_address(&self) -> bool {
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
io::Error::should_retry_database_address(io_err)
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
tokio_postgres::error::DbError::should_retry_database_address(db_err)
}
impl ShouldRetryWakeCompute for tokio_postgres::Error {
fn should_retry_wake_compute(&self) -> bool {
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
tokio_postgres::error::DbError::should_retry_wake_compute(db_err)
} else {
// likely an IO error. Possible the compute has shutdown and the
// cache is stale.
true
}
}
}

impl ShouldRetry for compute::ConnectionError {
impl CouldRetry for compute::ConnectionError {
fn could_retry(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.could_retry(),
compute::ConnectionError::CouldNotConnect(err) => err.could_retry(),
compute::ConnectionError::WakeComputeError(err) => err.could_retry(),
_ => false,
}
}
fn should_retry_database_address(&self) -> bool {
}
impl ShouldRetryWakeCompute for compute::ConnectionError {
fn should_retry_wake_compute(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.should_retry_database_address(),
compute::ConnectionError::CouldNotConnect(err) => err.should_retry_database_address(),
compute::ConnectionError::Postgres(err) => err.should_retry_wake_compute(),
// the cache entry was not checked for validity
compute::ConnectionError::TooManyConnectionAttempts(_) => false,
_ => true,
Expand Down
Loading
Loading