Skip to content

Commit

Permalink
pageserver: revert concurrent secondary downloads, make DownloadStrea…
Browse files Browse the repository at this point in the history
…m always yield Err after cancel (#7866)

## Problem

Ongoing hunt for secondary location shutdown hang issues.

## Summary of changes

- Revert the functional changes from #7675 
- Tweak a log in secondary downloads to make it more apparent when we
drop out on cancellation
- Modify DownloadStream's behavior to always return an Err after it has
been cancelled. This _should_ not impact anything, but it makes the
behavior simpler to reason about (e.g. even if the poll function somehow
got called again, it could never end up in an un-cancellable state)

Related #neondatabase/cloud#13576
  • Loading branch information
jcsp committed May 24, 2024
1 parent 3860bc9 commit 1455f5a
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 169 deletions.
5 changes: 0 additions & 5 deletions libs/remote_storage/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use http_types::{StatusCode, Url};
use tokio_util::sync::CancellationToken;
use tracing::debug;

use crate::RemoteStorageActivity;
use crate::{
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
Expand Down Expand Up @@ -526,10 +525,6 @@ impl RemoteStorage for AzureBlobStorage {
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Err(TimeTravelError::Unimplemented)
}

fn activity(&self) -> RemoteStorageActivity {
self.concurrency_limiter.activity()
}
}

pin_project_lite::pin_project! {
Expand Down
34 changes: 0 additions & 34 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
done_if_after: SystemTime,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError>;

/// Query how busy we currently are: may be used by callers which wish to politely
/// back off if there are already a lot of operations underway.
fn activity(&self) -> RemoteStorageActivity;
}

pub struct RemoteStorageActivity {
pub read_available: usize,
pub read_total: usize,
pub write_available: usize,
pub write_total: usize,
}

/// DownloadStream is sensitive to the timeout and cancellation used with the original
Expand Down Expand Up @@ -455,15 +444,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
}

pub fn activity(&self) -> RemoteStorageActivity {
match self {
Self::LocalFs(s) => s.activity(),
Self::AwsS3(s) => s.activity(),
Self::AzureBlob(s) => s.activity(),
Self::Unreliable(s) => s.activity(),
}
}
}

impl GenericRemoteStorage {
Expand Down Expand Up @@ -794,9 +774,6 @@ struct ConcurrencyLimiter {
// The helps to ensure we don't exceed the thresholds.
write: Arc<Semaphore>,
read: Arc<Semaphore>,

write_total: usize,
read_total: usize,
}

impl ConcurrencyLimiter {
Expand Down Expand Up @@ -825,21 +802,10 @@ impl ConcurrencyLimiter {
Arc::clone(self.for_kind(kind)).acquire_owned().await
}

fn activity(&self) -> RemoteStorageActivity {
RemoteStorageActivity {
read_available: self.read.available_permits(),
read_total: self.read_total,
write_available: self.write.available_permits(),
write_total: self.write_total,
}
}

fn new(limit: usize) -> ConcurrencyLimiter {
Self {
read: Arc::new(Semaphore::new(limit)),
write: Arc::new(Semaphore::new(limit)),
read_total: limit,
write_total: limit,
}
}
}
Expand Down
14 changes: 2 additions & 12 deletions libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
use utils::crashsafe::path_with_suffix_extension;

use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity,
TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};

use super::{RemoteStorage, StorageMetadata};
Expand Down Expand Up @@ -605,16 +605,6 @@ impl RemoteStorage for LocalFs {
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}

fn activity(&self) -> RemoteStorageActivity {
// LocalFS has no concurrency limiting: give callers the impression that plenty of units are available
RemoteStorageActivity {
read_available: 16,
read_total: 16,
write_available: 16,
write_total: 16,
}
}
}

fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
Expand Down
8 changes: 2 additions & 6 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use utils::backoff;
use super::StorageMetadata;
use crate::{
error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config,
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};

pub(super) mod metrics;
Expand Down Expand Up @@ -975,10 +975,6 @@ impl RemoteStorage for S3Bucket {
}
Ok(())
}

fn activity(&self) -> RemoteStorageActivity {
self.concurrency_limiter.activity()
}
}

/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
Expand Down
6 changes: 1 addition & 5 deletions libs/remote_storage/src/simulate_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;

use crate::{
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
RemoteStorageActivity, StorageMetadata, TimeTravelError,
StorageMetadata, TimeTravelError,
};

pub struct UnreliableWrapper {
Expand Down Expand Up @@ -213,8 +213,4 @@ impl RemoteStorage for UnreliableWrapper {
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}

fn activity(&self) -> RemoteStorageActivity {
self.inner.activity()
}
}
120 changes: 13 additions & 107 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ use crate::tenant::{

use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::{Future, StreamExt};
use futures::Future;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity};
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};

use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
Expand All @@ -67,12 +67,6 @@ use super::{
/// download, if the uploader populated it.
const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);

/// Range of concurrency we may use when downloading layers within a timeline. This is independent
/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
/// `PageServerConf::secondary_download_concurrency`
const MAX_LAYER_CONCURRENCY: usize = 16;
const MIN_LAYER_CONCURRENCY: usize = 1;

pub(super) async fn downloader_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
Expand All @@ -81,15 +75,14 @@ pub(super) async fn downloader_task(
cancel: CancellationToken,
root_ctx: RequestContext,
) {
// How many tenants' secondary download operations we will run concurrently
let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency;
let concurrency = tenant_manager.get_conf().secondary_download_concurrency;

let generator = SecondaryDownloader {
tenant_manager,
remote_storage,
root_ctx,
};
let mut scheduler = Scheduler::new(generator, tenant_concurrency);
let mut scheduler = Scheduler::new(generator, concurrency);

scheduler
.run(command_queue, background_jobs_can_start, cancel)
Expand Down Expand Up @@ -414,7 +407,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
tracing::warn!("Insufficient space while downloading. Will retry later.");
}
Err(UpdateError::Cancelled) => {
tracing::debug!("Shut down while downloading");
tracing::info!("Shut down while downloading");
},
Err(UpdateError::Deserialize(e)) => {
tracing::error!("Corrupt content while downloading tenant: {e}");
Expand Down Expand Up @@ -848,8 +841,6 @@ impl<'a> TenantDownloader<'a> {

tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());

let mut download_futs = Vec::new();

// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
Expand Down Expand Up @@ -922,31 +913,14 @@ impl<'a> TenantDownloader<'a> {
}
}

download_futs.push(self.download_layer(
tenant_shard_id,
&timeline.timeline_id,
layer,
ctx,
));
}

// Break up layer downloads into chunks, so that for each chunk we can re-check how much
// concurrency to use based on activity level of remote storage.
while !download_futs.is_empty() {
let chunk =
download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY));

let concurrency = Self::layer_concurrency(self.remote_storage.activity());

let mut result_stream = futures::stream::iter(chunk).buffered(concurrency);
let mut result_stream = std::pin::pin!(result_stream);
while let Some(result) = result_stream.next().await {
match result {
Err(e) => return Err(e),
Ok(None) => {
// No error, but we didn't download the layer. Don't mark it touched
}
Ok(Some(layer)) => touched.push(layer),
match self
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
.await?
{
Some(layer) => touched.push(layer),
None => {
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
// things to consider touched.
}
}
}
Expand Down Expand Up @@ -1081,19 +1055,6 @@ impl<'a> TenantDownloader<'a> {

Ok(Some(layer))
}

/// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage
fn layer_concurrency(activity: RemoteStorageActivity) -> usize {
// When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping
// of our concurrency range to the units available within the remaining 25%.
let clamp_at = (activity.read_total * 3) / 4;
if activity.read_available > clamp_at {
(MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at))
/ (activity.read_total - clamp_at)
} else {
MIN_LAYER_CONCURRENCY
}
}
}

/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
Expand Down Expand Up @@ -1217,58 +1178,3 @@ async fn init_timeline_state(

detail
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn layer_concurrency() {
// Totally idle
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 16,
read_total: 16,
write_available: 16,
write_total: 16
}),
MAX_LAYER_CONCURRENCY
);

// Totally busy
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 0,
read_total: 16,

write_available: 16,
write_total: 16
}),
MIN_LAYER_CONCURRENCY
);

// Edge of the range at which we interpolate
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 12,
read_total: 16,

write_available: 16,
write_total: 16
}),
MIN_LAYER_CONCURRENCY
);

// Midpoint of the range in which we interpolate
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 14,
read_total: 16,

write_available: 16,
write_total: 16
}),
MAX_LAYER_CONCURRENCY / 2
);
}
}

1 comment on commit 1455f5a

@github-actions
Copy link

@github-actions github-actions bot commented on 1455f5a May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3250 tests run: 3110 passed, 0 failed, 140 skipped (full report)


Flaky tests (5)

Postgres 16

  • test_vm_bit_clear_on_heap_lock: debug

Postgres 15

  • test_sharding_split_smoke: debug

Postgres 14

  • test_sharding_autosplit[github-actions-selfhosted]: release
  • test_storage_controller_many_tenants[github-actions-selfhosted]: release
  • test_scrubber_tenant_snapshot[4]: debug

Code coverage* (full report)

  • functions: 31.4% (6448 of 20536 functions)
  • lines: 48.3% (49917 of 103299 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
1455f5a at 2024-05-24T18:30:03.116Z :recycle:

Please sign in to comment.