Skip to content

Commit

Permalink
pageserver: update secondary state even on Err from layer downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jun 28, 2024
1 parent 8de8fe3 commit 424f6f5
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,32 +870,28 @@ impl<'a> TenantDownloader<'a> {
.and_then(|x| x)
}

async fn download_timeline(
/// Download heatmap layers that are not present on local disk, or update their
/// access time if they are already present.
async fn download_timeline_layers(
&self,
tenant_shard_id: &TenantShardId,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();

) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
// Accumulate updates to the state
let mut touched = Vec::new();

tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());

// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return Err(UpdateError::Cancelled);
return (Err(UpdateError::Cancelled), touched);
}

if Instant::now() > deadline {
// We've been running downloads for a while, restart to download latest heatmap.
return Err(UpdateError::Restart);
return (Err(UpdateError::Restart), touched);
}

// Existing on-disk layers: just update their access time.
Expand Down Expand Up @@ -965,20 +961,43 @@ impl<'a> TenantDownloader<'a> {

match self
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
.await?
.await
{
Some(layer) => touched.push(layer),
None => {
Ok(Some(layer)) => touched.push(layer),
Ok(None) => {
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
// things to consider touched.
}
Err(e) => {
return (Err(e), touched);
}
}
}

// Write updates to state to record layers we just downloaded or touched.
(Ok(()), touched)
}

async fn download_timeline(
&self,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
let timeline_id = timeline.timeline_id;

tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());

let (result, touched) = self
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
.await;

// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
{
let mut detail = self.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
let timeline_detail = detail.timelines.entry(timeline_id).or_default();

tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());

Expand All @@ -992,14 +1011,14 @@ impl<'a> TenantDownloader<'a> {
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&timeline_id,
&t.name,
&t.metadata.generation,
);
e.insert(OnDiskState::new(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&timeline_id,
t.name,
t.metadata.clone(),
t.access_time,
Expand All @@ -1010,7 +1029,7 @@ impl<'a> TenantDownloader<'a> {
}
}

Ok(())
result
}

/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
Expand Down

0 comments on commit 424f6f5

Please sign in to comment.