Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: drop out of secondary download if iteration time has passed #8198

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 98 additions & 30 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl scheduler::RunningJob for RunningDownload {
struct CompleteDownload {
secondary_state: Arc<SecondaryTenant>,
completed_at: Instant,
result: Result<(), UpdateError>,
}

impl scheduler::Completion for CompleteDownload {
Expand All @@ -286,21 +287,33 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
let CompleteDownload {
secondary_state,
completed_at: _completed_at,
result,
} = completion;

tracing::debug!("Secondary tenant download completed");

let mut detail = secondary_state.detail.lock().unwrap();

let period = detail
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);

// We advance next_download irrespective of errors: we don't want error cases to result in
// expensive busy-polling.
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
match result {
Err(UpdateError::Restart) => {
// Start downloading again as soon as we can. This will involve waiting for the scheduler's
// scheduling interval. This slightly reduces the peak download speed of tenants that hit their
// deadline and keep restarting, but that also helps give other tenants a chance to execute rather
// that letting one big tenant dominate for a long time.
detail.next_download = Some(Instant::now());
}
_ => {
let period = detail
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);

// We advance next_download irrespective of errors: we don't want error cases to result in
// expensive busy-polling.
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
}
}
}

async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
Expand Down Expand Up @@ -396,9 +409,10 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
(RunningDownload { barrier }, Box::pin(async move {
let _completion = completion;

match TenantDownloader::new(conf, &remote_storage, &secondary_state)
let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
.download(&download_ctx)
.await
.await;
match &result
{
Err(UpdateError::NoData) => {
tracing::info!("No heatmap found for tenant. This is fine if it is new.");
Expand All @@ -415,6 +429,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
tracing::error!("Error while downloading tenant: {e}");
},
Err(UpdateError::Restart) => {
tracing::info!("Download reached deadline & will restart to update heatmap")
}
Ok(()) => {}
};

Expand All @@ -436,6 +453,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
CompleteDownload {
secondary_state,
completed_at: Instant::now(),
result
}
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
}
Expand All @@ -452,6 +470,11 @@ struct TenantDownloader<'a> {
/// Errors that may be encountered while updating a tenant
#[derive(thiserror::Error, Debug)]
enum UpdateError {
/// This is not a true failure, but it's how a download indicates that it would like to be restarted by
/// the scheduler, to pick up the latest heatmap
#[error("Reached deadline, restarting downloads")]
Restart,

#[error("No remote data found")]
NoData,
#[error("Insufficient local storage space")]
Expand Down Expand Up @@ -603,6 +626,26 @@ impl<'a> TenantDownloader<'a> {
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
}

// Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
// so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
// spend 10s of minutes downloading layers we don't need.
// (see https://github.com/neondatabase/neon/issues/8182)
let deadline = {
let period = self
.secondary_state
.detail
.lock()
.unwrap()
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);

// Use double the period: we are not promising to complete within the period, this is just a heuristic
// to keep using a "reasonably fresh" heatmap.
Instant::now() + period * 2
};

// Download the layers in the heatmap
for timeline in heatmap.timelines {
let timeline_state = timeline_states
Expand All @@ -618,7 +661,7 @@ impl<'a> TenantDownloader<'a> {
}

let timeline_id = timeline.timeline_id;
self.download_timeline(timeline, timeline_state, ctx)
self.download_timeline(timeline, timeline_state, deadline, ctx)
.instrument(tracing::info_span!(
"secondary_download_timeline",
tenant_id=%tenant_shard_id.tenant_id,
Expand Down Expand Up @@ -827,26 +870,28 @@ impl<'a> TenantDownloader<'a> {
.and_then(|x| x)
}

async fn download_timeline(
/// Download heatmap layers that are not present on local disk, or update their
/// access time if they are already present.
async fn download_timeline_layers(
&self,
tenant_shard_id: &TenantShardId,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();

) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
// Accumulate updates to the state
let mut touched = Vec::new();

tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());

// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return Err(UpdateError::Cancelled);
return (Err(UpdateError::Cancelled), touched);
}

if Instant::now() > deadline {
// We've been running downloads for a while, restart to download latest heatmap.
return (Err(UpdateError::Restart), touched);
}

// Existing on-disk layers: just update their access time.
Expand Down Expand Up @@ -916,20 +961,43 @@ impl<'a> TenantDownloader<'a> {

match self
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
.await?
.await
{
Some(layer) => touched.push(layer),
None => {
Ok(Some(layer)) => touched.push(layer),
Ok(None) => {
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
// things to consider touched.
}
Err(e) => {
return (Err(e), touched);
}
}
}

// Write updates to state to record layers we just downloaded or touched.
(Ok(()), touched)
}

async fn download_timeline(
&self,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
let timeline_id = timeline.timeline_id;

tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());

let (result, touched) = self
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
.await;

// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
{
let mut detail = self.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
let timeline_detail = detail.timelines.entry(timeline_id).or_default();

tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());

Expand All @@ -943,14 +1011,14 @@ impl<'a> TenantDownloader<'a> {
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&timeline_id,
&t.name,
&t.metadata.generation,
);
e.insert(OnDiskState::new(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&timeline_id,
t.name,
t.metadata.clone(),
t.access_time,
Expand All @@ -961,7 +1029,7 @@ impl<'a> TenantDownloader<'a> {
}
}

Ok(())
result
}

/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
Expand Down
Loading