Skip to content

Commit

Permalink
pageserver: drop out of secondary download if iteration time has passed
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jun 28, 2024
1 parent 32b75e7 commit 8de8fe3
Showing 1 changed file with 61 additions and 12 deletions.
73 changes: 61 additions & 12 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl scheduler::RunningJob for RunningDownload {
struct CompleteDownload {
secondary_state: Arc<SecondaryTenant>,
completed_at: Instant,
result: Result<(), UpdateError>,
}

impl scheduler::Completion for CompleteDownload {
Expand All @@ -286,21 +287,33 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
let CompleteDownload {
secondary_state,
completed_at: _completed_at,
result,
} = completion;

tracing::debug!("Secondary tenant download completed");

let mut detail = secondary_state.detail.lock().unwrap();

let period = detail
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);

// We advance next_download irrespective of errors: we don't want error cases to result in
// expensive busy-polling.
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
match result {
Err(UpdateError::Restart) => {
// Start downloading again as soon as we can. This will involve waiting for the scheduler's
// scheduling interval. This slightly reduces the peak download speed of tenants that hit their
// deadline and keep restarting, but that also helps give other tenants a chance to execute rather
// that letting one big tenant dominate for a long time.
detail.next_download = Some(Instant::now());
}
_ => {
let period = detail
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);

// We advance next_download irrespective of errors: we don't want error cases to result in
// expensive busy-polling.
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
}
}
}

async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
Expand Down Expand Up @@ -396,9 +409,10 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
(RunningDownload { barrier }, Box::pin(async move {
let _completion = completion;

match TenantDownloader::new(conf, &remote_storage, &secondary_state)
let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
.download(&download_ctx)
.await
.await;
match &result
{
Err(UpdateError::NoData) => {
tracing::info!("No heatmap found for tenant. This is fine if it is new.");
Expand All @@ -415,6 +429,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
tracing::error!("Error while downloading tenant: {e}");
},
Err(UpdateError::Restart) => {
tracing::info!("Download reached deadline & will restart to update heatmap")
}
Ok(()) => {}
};

Expand All @@ -436,6 +453,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
CompleteDownload {
secondary_state,
completed_at: Instant::now(),
result
}
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
}
Expand All @@ -452,6 +470,11 @@ struct TenantDownloader<'a> {
/// Errors that may be encountered while updating a tenant
#[derive(thiserror::Error, Debug)]
enum UpdateError {
/// This is not a true failure, but it's how a download indicates that it would like to be restarted by
/// the scheduler, to pick up the latest heatmap
#[error("Reached deadline, restarting downloads")]
Restart,

#[error("No remote data found")]
NoData,
#[error("Insufficient local storage space")]
Expand Down Expand Up @@ -603,6 +626,26 @@ impl<'a> TenantDownloader<'a> {
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
}

// Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
// so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
// spend 10s of minutes downloading layers we don't need.
// (see https://github.com/neondatabase/neon/issues/8182)
let deadline = {
let period = self
.secondary_state
.detail
.lock()
.unwrap()
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);

// Use double the period: we are not promising to complete within the period, this is just a heuristic
// to keep using a "reasonably fresh" heatmap.
Instant::now() + period * 2
};

// Download the layers in the heatmap
for timeline in heatmap.timelines {
let timeline_state = timeline_states
Expand All @@ -618,7 +661,7 @@ impl<'a> TenantDownloader<'a> {
}

let timeline_id = timeline.timeline_id;
self.download_timeline(timeline, timeline_state, ctx)
self.download_timeline(timeline, timeline_state, deadline, ctx)
.instrument(tracing::info_span!(
"secondary_download_timeline",
tenant_id=%tenant_shard_id.tenant_id,
Expand Down Expand Up @@ -831,6 +874,7 @@ impl<'a> TenantDownloader<'a> {
&self,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
Expand All @@ -849,6 +893,11 @@ impl<'a> TenantDownloader<'a> {
return Err(UpdateError::Cancelled);
}

if Instant::now() > deadline {
// We've been running downloads for a while, restart to download latest heatmap.
return Err(UpdateError::Restart);
}

// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
tracing::debug!("Layer {} is already on disk", layer.name);
Expand Down

0 comments on commit 8de8fe3

Please sign in to comment.