diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index dbd64fb5a631..8e590b17c486 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -34,7 +34,7 @@ use utils::backoff; use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; use crate::{ - error::Cancelled, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, + config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel, }; diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs new file mode 100644 index 000000000000..8a8f6212e99b --- /dev/null +++ b/libs/remote_storage/src/config.rs @@ -0,0 +1,277 @@ +use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration}; + +use anyhow::bail; +use aws_sdk_s3::types::StorageClass; +use camino::Utf8PathBuf; + +use serde::{Deserialize, Serialize}; + +use crate::{ + DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT, + DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, +}; + +/// External backup storage configuration, enough for creating a client for that storage. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct RemoteStorageConfig { + /// The storage connection configuration. + #[serde(flatten)] + pub storage: RemoteStorageKind, + /// A common timeout enforced for all requests after concurrency limiter permit has been + /// acquired. + #[serde( + with = "humantime_serde", + default = "default_timeout", + skip_serializing_if = "is_default_timeout" + )] + pub timeout: Duration, +} + +fn default_timeout() -> Duration { + RemoteStorageConfig::DEFAULT_TIMEOUT +} + +fn is_default_timeout(d: &Duration) -> bool { + *d == RemoteStorageConfig::DEFAULT_TIMEOUT +} + +/// A kind of a remote storage to connect to, with its connection configuration. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[serde(untagged)] +pub enum RemoteStorageKind { + /// Storage based on local file system. + /// Specify a root folder to place all stored files into. + LocalFs { local_path: Utf8PathBuf }, + /// AWS S3 based storage, storing all files in the S3 bucket + /// specified by the config + AwsS3(S3Config), + /// Azure Blob based storage, storing all files in the container + /// specified by the config + AzureContainer(AzureConfig), +} + +/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). +#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct S3Config { + /// Name of the bucket to connect to. + pub bucket_name: String, + /// The region where the bucket is located at. + pub bucket_region: String, + /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once. + pub prefix_in_bucket: Option, + /// A base URL to send S3 requests to. + /// By default, the endpoint is derived from a region name, assuming it's + /// an AWS S3 region name, erroring on wrong region name. + /// Endpoint provides a way to support other S3 flavors and their regions. + /// + /// Example: `http://127.0.0.1:5000` + pub endpoint: Option, + /// AWS S3 has various limits on its API calls, we need not to exceed those. + /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. + #[serde(default = "default_remote_storage_s3_concurrency_limit")] + pub concurrency_limit: NonZeroUsize, + #[serde(default = "default_max_keys_per_list_response")] + pub max_keys_per_list_response: Option, + #[serde( + deserialize_with = "deserialize_storage_class", + serialize_with = "serialize_storage_class", + default + )] + pub upload_storage_class: Option, +} + +fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize { + DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT + .try_into() + .unwrap() +} + +fn default_max_keys_per_list_response() -> Option { + DEFAULT_MAX_KEYS_PER_LIST_RESPONSE +} + +impl Debug for S3Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("S3Config") + .field("bucket_name", &self.bucket_name) + .field("bucket_region", &self.bucket_region) + .field("prefix_in_bucket", &self.prefix_in_bucket) + .field("concurrency_limit", &self.concurrency_limit) + .field( + "max_keys_per_list_response", + &self.max_keys_per_list_response, + ) + .finish() + } +} + +/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write). +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AzureConfig { + /// Name of the container to connect to. + pub container_name: String, + /// Name of the storage account the container is inside of + pub storage_account: Option, + /// The region where the bucket is located at. + pub container_region: String, + /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once. + pub prefix_in_container: Option, + /// Azure has various limits on its API calls, we need not to exceed those. + /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details. + #[serde(default = "default_remote_storage_azure_concurrency_limit")] + pub concurrency_limit: NonZeroUsize, + #[serde(default = "default_max_keys_per_list_response")] + pub max_keys_per_list_response: Option, +} + +fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize { + NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap() +} + +impl Debug for AzureConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AzureConfig") + .field("bucket_name", &self.container_name) + .field("storage_account", &self.storage_account) + .field("bucket_region", &self.container_region) + .field("prefix_in_container", &self.prefix_in_container) + .field("concurrency_limit", &self.concurrency_limit) + .field( + "max_keys_per_list_response", + &self.max_keys_per_list_response, + ) + .finish() + } +} + +fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>( + deserializer: D, +) -> Result, D::Error> { + Option::::deserialize(deserializer).and_then(|s| { + if let Some(s) = s { + use serde::de::Error; + let storage_class = StorageClass::from_str(&s).expect("infallible"); + #[allow(deprecated)] + if matches!(storage_class, StorageClass::Unknown(_)) { + return Err(D::Error::custom(format!( + "Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", + StorageClass::values() + ))); + } + Ok(Some(storage_class)) + } else { + Ok(None) + } + }) +} + +fn serialize_storage_class( + val: &Option, + serializer: S, +) -> Result { + let val = val.as_ref().map(StorageClass::as_str); + Option::<&str>::serialize(&val, serializer) +} + +impl RemoteStorageConfig { + pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120); + + pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result> { + let document: toml_edit::Document = match toml { + toml_edit::Item::Table(toml) => toml.clone().into(), + toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => { + toml.clone().into_table().into() + } + _ => bail!("toml not a table or inline table"), + }; + + if document.is_empty() { + return Ok(None); + } + + Ok(Some(toml_edit::de::from_document(document)?)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(input: &str) -> anyhow::Result> { + let toml = input.parse::().unwrap(); + RemoteStorageConfig::from_toml(toml.as_item()) + } + + #[test] + fn parse_localfs_config_with_timeout() { + let input = "local_path = '.' +timeout = '5s'"; + + let config = parse(input).unwrap().expect("it exists"); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::LocalFs { + local_path: Utf8PathBuf::from(".") + }, + timeout: Duration::from_secs(5) + } + ); + } + + #[test] + fn test_s3_parsing() { + let toml = "\ + bucket_name = 'foo-bar' + bucket_region = 'eu-central-1' + upload_storage_class = 'INTELLIGENT_TIERING' + timeout = '7s' + "; + + let config = parse(toml).unwrap().expect("it exists"); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::AwsS3(S3Config { + bucket_name: "foo-bar".into(), + bucket_region: "eu-central-1".into(), + prefix_in_bucket: None, + endpoint: None, + concurrency_limit: default_remote_storage_s3_concurrency_limit(), + max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + upload_storage_class: Some(StorageClass::IntelligentTiering), + }), + timeout: Duration::from_secs(7) + } + ); + } + + #[test] + fn test_azure_parsing() { + let toml = "\ + container_name = 'foo-bar' + container_region = 'westeurope' + upload_storage_class = 'INTELLIGENT_TIERING' + timeout = '7s' + "; + + let config = parse(toml).unwrap().expect("it exists"); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::AzureContainer(AzureConfig { + container_name: "foo-bar".into(), + storage_account: None, + container_region: "westeurope".into(), + prefix_in_container: None, + concurrency_limit: default_remote_storage_azure_concurrency_limit(), + max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + }), + timeout: Duration::from_secs(7) + } + ); + } +} diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e39ac581c758..d440c03a0e65 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -10,6 +10,7 @@ #![deny(clippy::undocumented_unsafe_blocks)] mod azure_blob; +mod config; mod error; mod local_fs; mod metrics; @@ -18,17 +19,10 @@ mod simulate_failures; mod support; use std::{ - collections::HashMap, - fmt::Debug, - num::{NonZeroU32, NonZeroUsize}, - pin::Pin, - str::FromStr, - sync::Arc, - time::{Duration, SystemTime}, + collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime, }; -use anyhow::{bail, Context}; -use aws_sdk_s3::types::StorageClass; +use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use bytes::Bytes; @@ -44,6 +38,8 @@ pub use self::{ }; use s3_bucket::RequestKind; +pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config}; + /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here. pub use azure_core::Etag; @@ -525,168 +521,6 @@ impl From<[(&str, &str); N]> for StorageMetadata { } } -/// External backup storage configuration, enough for creating a client for that storage. -#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] -pub struct RemoteStorageConfig { - /// The storage connection configuration. - #[serde(flatten)] - pub storage: RemoteStorageKind, - /// A common timeout enforced for all requests after concurrency limiter permit has been - /// acquired. - #[serde(with = "humantime_serde", default = "default_timeout")] - pub timeout: Duration, -} - -fn default_timeout() -> Duration { - RemoteStorageConfig::DEFAULT_TIMEOUT -} - -/// A kind of a remote storage to connect to, with its connection configuration. -#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] -#[serde(untagged)] -pub enum RemoteStorageKind { - /// Storage based on local file system. - /// Specify a root folder to place all stored files into. - LocalFs { local_path: Utf8PathBuf }, - /// AWS S3 based storage, storing all files in the S3 bucket - /// specified by the config - AwsS3(S3Config), - /// Azure Blob based storage, storing all files in the container - /// specified by the config - AzureContainer(AzureConfig), -} - -/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). -#[derive(Clone, PartialEq, Eq, serde::Deserialize)] -pub struct S3Config { - /// Name of the bucket to connect to. - pub bucket_name: String, - /// The region where the bucket is located at. - pub bucket_region: String, - /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once. - pub prefix_in_bucket: Option, - /// A base URL to send S3 requests to. - /// By default, the endpoint is derived from a region name, assuming it's - /// an AWS S3 region name, erroring on wrong region name. - /// Endpoint provides a way to support other S3 flavors and their regions. - /// - /// Example: `http://127.0.0.1:5000` - pub endpoint: Option, - /// AWS S3 has various limits on its API calls, we need not to exceed those. - /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. - #[serde(default = "default_remote_storage_s3_concurrency_limit")] - pub concurrency_limit: NonZeroUsize, - #[serde(default = "default_max_keys_per_list_response")] - pub max_keys_per_list_response: Option, - #[serde(deserialize_with = "deserialize_storage_class", default)] - pub upload_storage_class: Option, -} - -fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize { - DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT - .try_into() - .unwrap() -} - -fn default_max_keys_per_list_response() -> Option { - DEFAULT_MAX_KEYS_PER_LIST_RESPONSE -} - -impl Debug for S3Config { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("S3Config") - .field("bucket_name", &self.bucket_name) - .field("bucket_region", &self.bucket_region) - .field("prefix_in_bucket", &self.prefix_in_bucket) - .field("concurrency_limit", &self.concurrency_limit) - .field( - "max_keys_per_list_response", - &self.max_keys_per_list_response, - ) - .finish() - } -} - -/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write). -#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct AzureConfig { - /// Name of the container to connect to. - pub container_name: String, - /// Name of the storage account the container is inside of - pub storage_account: Option, - /// The region where the bucket is located at. - pub container_region: String, - /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once. - pub prefix_in_container: Option, - /// Azure has various limits on its API calls, we need not to exceed those. - /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details. - #[serde(default = "default_remote_storage_azure_concurrency_limit")] - pub concurrency_limit: NonZeroUsize, - #[serde(default = "default_max_keys_per_list_response")] - pub max_keys_per_list_response: Option, -} - -fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize { - NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap() -} - -impl Debug for AzureConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AzureConfig") - .field("bucket_name", &self.container_name) - .field("storage_account", &self.storage_account) - .field("bucket_region", &self.container_region) - .field("prefix_in_container", &self.prefix_in_container) - .field("concurrency_limit", &self.concurrency_limit) - .field( - "max_keys_per_list_response", - &self.max_keys_per_list_response, - ) - .finish() - } -} - -fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>( - deserializer: D, -) -> Result, D::Error> { - Option::::deserialize(deserializer).and_then(|s| { - if let Some(s) = s { - use serde::de::Error; - let storage_class = StorageClass::from_str(&s).expect("infallible"); - #[allow(deprecated)] - if matches!(storage_class, StorageClass::Unknown(_)) { - return Err(D::Error::custom(format!( - "Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}", - StorageClass::values() - ))); - } - Ok(Some(storage_class)) - } else { - Ok(None) - } - }) -} - -impl RemoteStorageConfig { - pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120); - - pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result> { - let document: toml_edit::Document = match toml { - toml_edit::Item::Table(toml) => toml.clone().into(), - toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => { - toml.clone().into_table().into() - } - _ => bail!("toml not a table or inline table"), - }; - - if document.is_empty() { - return Ok(None); - } - - Ok(Some(toml_edit::de::from_document(document)?)) - } -} - struct ConcurrencyLimiter { // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. @@ -733,11 +567,6 @@ impl ConcurrencyLimiter { mod tests { use super::*; - fn parse(input: &str) -> anyhow::Result> { - let toml = input.parse::().unwrap(); - RemoteStorageConfig::from_toml(toml.as_item()) - } - #[test] fn test_object_name() { let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap(); @@ -759,77 +588,4 @@ mod tests { let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths"); assert_eq!(err.to_string(), "Path \"/\" is not relative"); } - - #[test] - fn parse_localfs_config_with_timeout() { - let input = "local_path = '.' -timeout = '5s'"; - - let config = parse(input).unwrap().expect("it exists"); - - assert_eq!( - config, - RemoteStorageConfig { - storage: RemoteStorageKind::LocalFs { - local_path: Utf8PathBuf::from(".") - }, - timeout: Duration::from_secs(5) - } - ); - } - - #[test] - fn test_s3_parsing() { - let toml = "\ - bucket_name = 'foo-bar' - bucket_region = 'eu-central-1' - upload_storage_class = 'INTELLIGENT_TIERING' - timeout = '7s' - "; - - let config = parse(toml).unwrap().expect("it exists"); - - assert_eq!( - config, - RemoteStorageConfig { - storage: RemoteStorageKind::AwsS3(S3Config { - bucket_name: "foo-bar".into(), - bucket_region: "eu-central-1".into(), - prefix_in_bucket: None, - endpoint: None, - concurrency_limit: default_remote_storage_s3_concurrency_limit(), - max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - upload_storage_class: Some(StorageClass::IntelligentTiering), - }), - timeout: Duration::from_secs(7) - } - ); - } - - #[test] - fn test_azure_parsing() { - let toml = "\ - container_name = 'foo-bar' - container_region = 'westeurope' - upload_storage_class = 'INTELLIGENT_TIERING' - timeout = '7s' - "; - - let config = parse(toml).unwrap().expect("it exists"); - - assert_eq!( - config, - RemoteStorageConfig { - storage: RemoteStorageKind::AzureContainer(AzureConfig { - container_name: "foo-bar".into(), - storage_account: None, - container_region: "westeurope".into(), - prefix_in_container: None, - concurrency_limit: default_remote_storage_azure_concurrency_limit(), - max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, - }), - timeout: Duration::from_secs(7) - } - ); - } } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 76cf3eac80ef..ef1bd2c04730 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -46,12 +46,12 @@ use utils::backoff; use super::StorageMetadata; use crate::{ + config::S3Config, error::Cancelled, metrics::{start_counting_cancelled_wait, start_measuring_requests}, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, - S3Config, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, - REMOTE_STORAGE_PREFIX_SEPARATOR, + TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; use crate::metrics::AttemptOutcome;