diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 5fab6113bee5..f6f30641dbbb 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -870,32 +870,28 @@ impl<'a> TenantDownloader<'a> { .and_then(|x| x) } - async fn download_timeline( + /// Download heatmap layers that are not present on local disk, or update their + /// access time if they are already present. + async fn download_timeline_layers( &self, + tenant_shard_id: &TenantShardId, timeline: HeatMapTimeline, timeline_state: SecondaryDetailTimeline, deadline: Instant, ctx: &RequestContext, - ) -> Result<(), UpdateError> { - debug_assert_current_span_has_tenant_and_timeline_id(); - let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); - + ) -> (Result<(), UpdateError>, Vec) { // Accumulate updates to the state let mut touched = Vec::new(); - tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); - - // Download heatmap layers that are not present on local disk, or update their - // access time if they are already present. for layer in timeline.layers { if self.secondary_state.cancel.is_cancelled() { tracing::debug!("Cancelled -- dropping out of layer loop"); - return Err(UpdateError::Cancelled); + return (Err(UpdateError::Cancelled), touched); } if Instant::now() > deadline { // We've been running downloads for a while, restart to download latest heatmap. - return Err(UpdateError::Restart); + return (Err(UpdateError::Restart), touched); } // Existing on-disk layers: just update their access time. @@ -965,20 +961,43 @@ impl<'a> TenantDownloader<'a> { match self .download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx) - .await? + .await { - Some(layer) => touched.push(layer), - None => { + Ok(Some(layer)) => touched.push(layer), + Ok(None) => { // Not an error but we didn't download it: remote layer is missing. Don't add it to the list of // things to consider touched. } + Err(e) => { + return (Err(e), touched); + } } } - // Write updates to state to record layers we just downloaded or touched. + (Ok(()), touched) + } + + async fn download_timeline( + &self, + timeline: HeatMapTimeline, + timeline_state: SecondaryDetailTimeline, + deadline: Instant, + ctx: &RequestContext, + ) -> Result<(), UpdateError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + let timeline_id = timeline.timeline_id; + + tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); + + let (result, touched) = self + .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx) + .await; + + // Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful { let mut detail = self.secondary_state.detail.lock().unwrap(); - let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default(); + let timeline_detail = detail.timelines.entry(timeline_id).or_default(); tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); @@ -992,14 +1011,14 @@ impl<'a> TenantDownloader<'a> { let local_path = local_layer_path( self.conf, tenant_shard_id, - &timeline.timeline_id, + &timeline_id, &t.name, &t.metadata.generation, ); e.insert(OnDiskState::new( self.conf, tenant_shard_id, - &timeline.timeline_id, + &timeline_id, t.name, t.metadata.clone(), t.access_time, @@ -1010,7 +1029,7 @@ impl<'a> TenantDownloader<'a> { } } - Ok(()) + result } /// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics