Skip to content

Commit

Permalink
Add rate limiter for partial uploads (#8203)
Browse files Browse the repository at this point in the history
Too many concurrect partial uploads can hurt disk performance, this
commit adds a limiter.

Context:
https://neondb.slack.com/archives/C04KGFVUWUQ/p1719489018814669?thread_ts=1719440183.134739&cid=C04KGFVUWUQ
  • Loading branch information
petuhovskiy committed Jun 28, 2024
1 parent babbe12 commit e1a06b4
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 14 deletions.
7 changes: 6 additions & 1 deletion safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use utils::pid_file;
use metrics::set_build_info_metric;
use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::http;
use safekeeper::wal_service;
Expand Down Expand Up @@ -191,6 +192,9 @@ struct Args {
/// Pending updates to control file will be automatically saved after this interval.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
control_file_save_interval: Duration,
/// Number of allowed concurrent uploads of partial segments to remote storage.
#[arg(long, default_value = DEFAULT_PARTIAL_BACKUP_CONCURRENCY)]
partial_backup_concurrency: usize,
}

// Like PathBufValueParser, but allows empty string.
Expand Down Expand Up @@ -344,6 +348,7 @@ async fn main() -> anyhow::Result<()> {
enable_offload: args.enable_offload,
delete_offloaded_wal: args.delete_offloaded_wal,
control_file_save_interval: args.control_file_save_interval,
partial_backup_concurrency: args.partial_backup_concurrency,
};

// initialize sentry if SENTRY_DSN is provided
Expand Down
3 changes: 3 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod defaults {
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -91,6 +92,7 @@ pub struct SafeKeeperConf {
pub enable_offload: bool,
pub delete_offloaded_wal: bool,
pub control_file_save_interval: Duration,
pub partial_backup_concurrency: usize,
}

impl SafeKeeperConf {
Expand Down Expand Up @@ -133,6 +135,7 @@ impl SafeKeeperConf {
enable_offload: false,
delete_offloaded_wal: false,
control_file_save_interval: Duration::from_secs(1),
partial_backup_concurrency: 1,
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};

use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
Expand Down Expand Up @@ -587,6 +587,7 @@ impl Timeline {
shared_state: &mut WriteGuardSharedState<'_>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
Expand Down Expand Up @@ -617,7 +618,7 @@ impl Timeline {

return Err(e);
}
self.bootstrap(conf, broker_active_set);
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
Ok(())
}

Expand All @@ -626,6 +627,7 @@ impl Timeline {
self: &Arc<Timeline>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();

Expand All @@ -637,6 +639,7 @@ impl Timeline {
broker_active_set,
tx,
rx,
partial_backup_rate_limiter,
));
}

Expand Down
16 changes: 14 additions & 2 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
wal_backup_partial::{self, PartialRemoteSegment},
wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
SafeKeeperConf,
};

Expand Down Expand Up @@ -185,6 +185,7 @@ pub(crate) struct Manager {

// misc
pub(crate) access_service: AccessService,
pub(crate) partial_backup_rate_limiter: RateLimiter,
}

/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
Expand All @@ -197,6 +198,7 @@ pub async fn main_task(
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
) {
tli.set_status(Status::Started);

Expand All @@ -209,7 +211,14 @@ pub async fn main_task(
}
};

let mut mgr = Manager::new(tli, conf, broker_active_set, manager_tx).await;
let mut mgr = Manager::new(
tli,
conf,
broker_active_set,
manager_tx,
partial_backup_rate_limiter,
)
.await;

// Start recovery task which always runs on the timeline.
if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
Expand Down Expand Up @@ -321,6 +330,7 @@ impl Manager {
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
Expand All @@ -339,6 +349,7 @@ impl Manager {
partial_backup_uploaded,
access_service: AccessService::new(manager_tx),
tli,
partial_backup_rate_limiter,
}
}

Expand Down Expand Up @@ -525,6 +536,7 @@ impl Manager {
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
self.wal_resident_timeline(),
self.conf.clone(),
self.partial_backup_rate_limiter.clone(),
)));
}

Expand Down
34 changes: 26 additions & 8 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::safekeeper::ServerInfo;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup_partial::RateLimiter;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
Expand All @@ -23,6 +24,7 @@ struct GlobalTimelinesState {
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
partial_backup_rate_limiter: RateLimiter,
}

// Used to prevent concurrent timeline loading.
Expand All @@ -37,8 +39,12 @@ impl GlobalTimelinesState {
}

/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
(self.get_conf().clone(), self.broker_active_set.clone())
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
(
self.get_conf().clone(),
self.broker_active_set.clone(),
self.partial_backup_rate_limiter.clone(),
)
}

/// Insert timeline into the map. Returns error if timeline with the same id already exists.
Expand Down Expand Up @@ -66,6 +72,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
partial_backup_rate_limiter: RateLimiter::new(1),
})
});

Expand All @@ -79,6 +86,7 @@ impl GlobalTimelines {
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency);
state.conf = Some(conf);

// Iterate through all directories and load tenants for all directories
Expand Down Expand Up @@ -122,7 +130,7 @@ impl GlobalTimelines {
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set) = {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = TIMELINES_STATE.lock().unwrap();
state.get_dependencies()
};
Expand All @@ -145,7 +153,11 @@ impl GlobalTimelines {
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf, broker_active_set.clone());
tli.bootstrap(
&conf,
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
);
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
Expand Down Expand Up @@ -178,7 +190,8 @@ impl GlobalTimelines {
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
let (conf, broker_active_set, partial_backup_rate_limiter) =
TIMELINES_STATE.lock().unwrap().get_dependencies();

match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
Expand All @@ -191,7 +204,7 @@ impl GlobalTimelines {
.timelines
.insert(ttid, tli.clone());

tli.bootstrap(&conf, broker_active_set);
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);

Ok(tli)
}
Expand Down Expand Up @@ -222,7 +235,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set) = {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
Expand Down Expand Up @@ -257,7 +270,12 @@ impl GlobalTimelines {
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline
.init_new(&mut shared_state, &conf, broker_active_set)
.init_new(
&mut shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
)
.await
{
// Note: the most likely reason for init failure is that the timeline
Expand Down
35 changes: 34 additions & 1 deletion safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.

use std::sync::Arc;

use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
Expand All @@ -27,14 +29,38 @@ use tracing::{debug, error, info, instrument, warn};
use utils::lsn::Lsn;

use crate::{
metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
safekeeper::Term,
timeline::WalResidentTimeline,
timeline_manager::StateSnapshot,
wal_backup::{self, remote_timeline_path},
SafeKeeperConf,
};

#[derive(Clone)]
pub struct RateLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
}

impl RateLimiter {
pub fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
}
}

async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_permit_acquire"])
.start_timer();
self.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore is closed")
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum UploadStatus {
/// Upload is in progress. This status should be used only for garbage collection,
Expand Down Expand Up @@ -208,6 +234,9 @@ impl PartialBackup {
/// Upload the latest version of the partial segment and garbage collect older versions.
#[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_do_upload"])
.start_timer();
info!("starting upload {:?}", prepared);

let state_0 = self.state.clone();
Expand Down Expand Up @@ -307,6 +336,7 @@ pub(crate) fn needs_uploading(
pub async fn main_task(
tli: WalResidentTimeline,
conf: SafeKeeperConf,
limiter: RateLimiter,
) -> Option<PartialRemoteSegment> {
debug!("started");
let await_duration = conf.partial_backup_timeout;
Expand Down Expand Up @@ -411,6 +441,9 @@ pub async fn main_task(
continue 'outer;
}

// limit concurrent uploads
let _upload_permit = limiter.acquire_owned().await;

let prepared = backup.prepare_upload().await;
if let Some(seg) = &uploaded_segment {
if seg.eq_without_status(&prepared) {
Expand Down
1 change: 1 addition & 0 deletions safekeeper/tests/walproposer_sim/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
enable_offload: false,
delete_offloaded_wal: false,
control_file_save_interval: Duration::from_secs(1),
partial_backup_concurrency: 1,
};

let mut global = GlobalMap::new(disk, conf.clone())?;
Expand Down

1 comment on commit e1a06b4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3028 tests run: 2900 passed, 2 failed, 126 skipped (full report)


Failures on Postgres 14

  • test_pg_regress[None]: debug
  • test_basebackup_with_high_slru_count[github-actions-selfhosted-sequential-10-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pg_regress[debug-pg14-None] or test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-sequential-10-13-30]"
Flaky tests (2)

Postgres 14

  • test_timeline_size_quota_on_startup: release
  • test_peer_recovery: debug

Test coverage report is not available

The comment gets automatically updated with the latest test results
e1a06b4 at 2024-06-28T18:47:08.015Z :recycle:

Please sign in to comment.