diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 20650490b1ae..c81373c77c7d 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -29,7 +29,8 @@ use utils::pid_file; use metrics::set_build_info_metric; use safekeeper::defaults::{ DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, - DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, + DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY, + DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, }; use safekeeper::http; use safekeeper::wal_service; @@ -191,6 +192,9 @@ struct Args { /// Pending updates to control file will be automatically saved after this interval. #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)] control_file_save_interval: Duration, + /// Number of allowed concurrent uploads of partial segments to remote storage. + #[arg(long, default_value = DEFAULT_PARTIAL_BACKUP_CONCURRENCY)] + partial_backup_concurrency: usize, } // Like PathBufValueParser, but allows empty string. @@ -344,6 +348,7 @@ async fn main() -> anyhow::Result<()> { enable_offload: args.enable_offload, delete_offloaded_wal: args.delete_offloaded_wal, control_file_save_interval: args.control_file_save_interval, + partial_backup_concurrency: args.partial_backup_concurrency, }; // initialize sentry if SENTRY_DSN is provided diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 067e425570e7..5cd676d8570c 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -52,6 +52,7 @@ pub mod defaults { pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m"; pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s"; + pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5"; } #[derive(Debug, Clone)] @@ -91,6 +92,7 @@ pub struct SafeKeeperConf { pub enable_offload: bool, pub delete_offloaded_wal: bool, pub control_file_save_interval: Duration, + pub partial_backup_concurrency: usize, } impl SafeKeeperConf { @@ -133,6 +135,7 @@ impl SafeKeeperConf { enable_offload: false, delete_offloaded_wal: false, control_file_save_interval: Duration::from_secs(1), + partial_backup_concurrency: 1, } } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 6b83270c181b..132e5ec32f4f 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -36,7 +36,7 @@ use crate::timeline_guard::ResidenceGuard; use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timelines_set::TimelinesSet; use crate::wal_backup::{self}; -use crate::wal_backup_partial::PartialRemoteSegment; +use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter}; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS}; @@ -587,6 +587,7 @@ impl Timeline { shared_state: &mut WriteGuardSharedState<'_>, conf: &SafeKeeperConf, broker_active_set: Arc, + partial_backup_rate_limiter: RateLimiter, ) -> Result<()> { match fs::metadata(&self.timeline_dir).await { Ok(_) => { @@ -617,7 +618,7 @@ impl Timeline { return Err(e); } - self.bootstrap(conf, broker_active_set); + self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter); Ok(()) } @@ -626,6 +627,7 @@ impl Timeline { self: &Arc, conf: &SafeKeeperConf, broker_active_set: Arc, + partial_backup_rate_limiter: RateLimiter, ) { let (tx, rx) = self.manager_ctl.bootstrap_manager(); @@ -637,6 +639,7 @@ impl Timeline { broker_active_set, tx, rx, + partial_backup_rate_limiter, )); } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 66c62ce19785..62142162de8c 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -32,7 +32,7 @@ use crate::{ timeline_guard::{AccessService, GuardId, ResidenceGuard}, timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, - wal_backup_partial::{self, PartialRemoteSegment}, + wal_backup_partial::{self, PartialRemoteSegment, RateLimiter}, SafeKeeperConf, }; @@ -185,6 +185,7 @@ pub(crate) struct Manager { // misc pub(crate) access_service: AccessService, + pub(crate) partial_backup_rate_limiter: RateLimiter, } /// This task gets spawned alongside each timeline and is responsible for managing the timeline's @@ -197,6 +198,7 @@ pub async fn main_task( broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, mut manager_rx: tokio::sync::mpsc::UnboundedReceiver, + partial_backup_rate_limiter: RateLimiter, ) { tli.set_status(Status::Started); @@ -209,7 +211,14 @@ pub async fn main_task( } }; - let mut mgr = Manager::new(tli, conf, broker_active_set, manager_tx).await; + let mut mgr = Manager::new( + tli, + conf, + broker_active_set, + manager_tx, + partial_backup_rate_limiter, + ) + .await; // Start recovery task which always runs on the timeline. if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled { @@ -321,6 +330,7 @@ impl Manager { conf: SafeKeeperConf, broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, + partial_backup_rate_limiter: RateLimiter, ) -> Manager { let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; Manager { @@ -339,6 +349,7 @@ impl Manager { partial_backup_uploaded, access_service: AccessService::new(manager_tx), tli, + partial_backup_rate_limiter, } } @@ -525,6 +536,7 @@ impl Manager { self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task( self.wal_resident_timeline(), self.conf.clone(), + self.partial_backup_rate_limiter.clone(), ))); } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 45e08ede3c0a..9ce1112cec43 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -5,6 +5,7 @@ use crate::safekeeper::ServerInfo; use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}; use crate::timelines_set::TimelinesSet; +use crate::wal_backup_partial::RateLimiter; use crate::SafeKeeperConf; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; @@ -23,6 +24,7 @@ struct GlobalTimelinesState { conf: Option, broker_active_set: Arc, load_lock: Arc>, + partial_backup_rate_limiter: RateLimiter, } // Used to prevent concurrent timeline loading. @@ -37,8 +39,12 @@ impl GlobalTimelinesState { } /// Get dependencies for a timeline constructor. - fn get_dependencies(&self) -> (SafeKeeperConf, Arc) { - (self.get_conf().clone(), self.broker_active_set.clone()) + fn get_dependencies(&self) -> (SafeKeeperConf, Arc, RateLimiter) { + ( + self.get_conf().clone(), + self.broker_active_set.clone(), + self.partial_backup_rate_limiter.clone(), + ) } /// Insert timeline into the map. Returns error if timeline with the same id already exists. @@ -66,6 +72,7 @@ static TIMELINES_STATE: Lazy> = Lazy::new(|| { conf: None, broker_active_set: Arc::new(TimelinesSet::default()), load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)), + partial_backup_rate_limiter: RateLimiter::new(1), }) }); @@ -79,6 +86,7 @@ impl GlobalTimelines { // lock, so use explicit block let tenants_dir = { let mut state = TIMELINES_STATE.lock().unwrap(); + state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency); state.conf = Some(conf); // Iterate through all directories and load tenants for all directories @@ -122,7 +130,7 @@ impl GlobalTimelines { /// this function is called during init when nothing else is running, so /// this is fine. async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> { - let (conf, broker_active_set) = { + let (conf, broker_active_set, partial_backup_rate_limiter) = { let state = TIMELINES_STATE.lock().unwrap(); state.get_dependencies() }; @@ -145,7 +153,11 @@ impl GlobalTimelines { .unwrap() .timelines .insert(ttid, tli.clone()); - tli.bootstrap(&conf, broker_active_set.clone()); + tli.bootstrap( + &conf, + broker_active_set.clone(), + partial_backup_rate_limiter.clone(), + ); } // If we can't load a timeline, it's most likely because of a corrupted // directory. We will log an error and won't allow to delete/recreate @@ -178,7 +190,8 @@ impl GlobalTimelines { _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>, ttid: TenantTimelineId, ) -> Result> { - let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies(); + let (conf, broker_active_set, partial_backup_rate_limiter) = + TIMELINES_STATE.lock().unwrap().get_dependencies(); match Timeline::load_timeline(&conf, ttid) { Ok(timeline) => { @@ -191,7 +204,7 @@ impl GlobalTimelines { .timelines .insert(ttid, tli.clone()); - tli.bootstrap(&conf, broker_active_set); + tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter); Ok(tli) } @@ -222,7 +235,7 @@ impl GlobalTimelines { commit_lsn: Lsn, local_start_lsn: Lsn, ) -> Result> { - let (conf, broker_active_set) = { + let (conf, broker_active_set, partial_backup_rate_limiter) = { let state = TIMELINES_STATE.lock().unwrap(); if let Ok(timeline) = state.get(&ttid) { // Timeline already exists, return it. @@ -257,7 +270,12 @@ impl GlobalTimelines { // Bootstrap is transactional, so if it fails, the timeline will be deleted, // and the state on disk should remain unchanged. if let Err(e) = timeline - .init_new(&mut shared_state, &conf, broker_active_set) + .init_new( + &mut shared_state, + &conf, + broker_active_set, + partial_backup_rate_limiter, + ) .await { // Note: the most likely reason for init failure is that the timeline diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 9c7cd0888d83..825851c97c9a 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -18,6 +18,8 @@ //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. +use std::sync::Arc; + use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; @@ -27,7 +29,7 @@ use tracing::{debug, error, info, instrument, warn}; use utils::lsn::Lsn; use crate::{ - metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, + metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, safekeeper::Term, timeline::WalResidentTimeline, timeline_manager::StateSnapshot, @@ -35,6 +37,30 @@ use crate::{ SafeKeeperConf, }; +#[derive(Clone)] +pub struct RateLimiter { + semaphore: Arc, +} + +impl RateLimiter { + pub fn new(permits: usize) -> Self { + Self { + semaphore: Arc::new(tokio::sync::Semaphore::new(permits)), + } + } + + async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit { + let _timer = MISC_OPERATION_SECONDS + .with_label_values(&["partial_permit_acquire"]) + .start_timer(); + self.semaphore + .clone() + .acquire_owned() + .await + .expect("semaphore is closed") + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum UploadStatus { /// Upload is in progress. This status should be used only for garbage collection, @@ -208,6 +234,9 @@ impl PartialBackup { /// Upload the latest version of the partial segment and garbage collect older versions. #[instrument(name = "upload", skip_all, fields(name = %prepared.name))] async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> { + let _timer = MISC_OPERATION_SECONDS + .with_label_values(&["partial_do_upload"]) + .start_timer(); info!("starting upload {:?}", prepared); let state_0 = self.state.clone(); @@ -307,6 +336,7 @@ pub(crate) fn needs_uploading( pub async fn main_task( tli: WalResidentTimeline, conf: SafeKeeperConf, + limiter: RateLimiter, ) -> Option { debug!("started"); let await_duration = conf.partial_backup_timeout; @@ -411,6 +441,9 @@ pub async fn main_task( continue 'outer; } + // limit concurrent uploads + let _upload_permit = limiter.acquire_owned().await; + let prepared = backup.prepare_upload().await; if let Some(seg) = &uploaded_segment { if seg.eq_without_status(&prepared) { diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 43835c7f4411..6bbf96d71df4 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -187,6 +187,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { enable_offload: false, delete_offloaded_wal: false, control_file_save_interval: Duration::from_secs(1), + partial_backup_concurrency: 1, }; let mut global = GlobalMap::new(disk, conf.clone())?;