From 067c0aeb56eba2ea2545ffca44c1b608b0bbbbd0 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 18 Sep 2024 12:53:45 -0700 Subject: [PATCH] fix: use retry loop when loading deletion files (#2899) When loading deletion files we were directly accessing `ObjectStore::inner` which bypasses the retry loop. This makes loading deletion files somewhat brittle and was causing issues in scans (which can load a lot of deletion files at the same time they are loading lots of other ranges). This PR changes the deletion file load to use the `ObjectStore` and adds convenience methods to make this easier in the future. In the future we should also move this load to use a `FileScheduler` instead of `ObjectStore` so that these deletion file loads do not violate our max-iops-per-scan but that can be done later. --- rust/lance-io/src/local.rs | 18 ++++- rust/lance-io/src/object_reader.rs | 100 +++++++++++++++++----------- rust/lance-io/src/object_store.rs | 37 ++++++++++ rust/lance-io/src/traits.rs | 6 ++ rust/lance-table/src/io/deletion.rs | 10 +-- 5 files changed, 127 insertions(+), 44 deletions(-) diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index 81368c6de0..a21e57362e 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -4,7 +4,7 @@ //! Optimized local I/Os use std::fs::File; -use std::io::{ErrorKind, SeekFrom}; +use std::io::{ErrorKind, Read, SeekFrom}; use std::ops::Range; use std::sync::Arc; @@ -167,6 +167,22 @@ impl Reader for LocalObjectReader { source: err.into(), }) } + + /// Reads the entire file. + #[instrument(level = "debug", skip(self))] + async fn get_all(&self) -> object_store::Result { + let mut file = self.file.clone(); + tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + file.read_to_end(buf.as_mut())?; + Ok(Bytes::from(buf)) + }) + .await? + .map_err(|err: std::io::Error| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + }) + } } #[cfg(windows)] diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index 58a6385096..fd8fa55873 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use deepsize::DeepSizeOf; use futures::future::BoxFuture; use lance_core::Result; -use object_store::{path::Path, GetOptions, ObjectStore}; +use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result as OSResult}; use tokio::sync::OnceCell; use tracing::instrument; @@ -61,8 +61,8 @@ impl CloudObjectReader { // of the response body. Thus we add an outer retry loop here. async fn do_with_retry<'a, O>( &self, - f: impl Fn() -> BoxFuture<'a, std::result::Result>, - ) -> object_store::Result { + f: impl Fn() -> BoxFuture<'a, OSResult>, + ) -> OSResult { let mut retries = 3; loop { match f().await { @@ -76,6 +76,40 @@ impl CloudObjectReader { } } } + + // We have a separate retry loop here. This is because object_store does not + // attempt retries on downloads that fail during streaming of the response body. + // + // However, this failure is pretty common (e.g. timeout) and we want to retry in these + // situations. In addition, we provide additional logging information in these + // failures cases. + async fn do_get_with_outer_retry<'a>( + &self, + f: impl Fn() -> BoxFuture<'a, OSResult> + Copy, + desc: impl Fn() -> String, + ) -> OSResult { + let mut retries = self.download_retry_count; + loop { + let get_result = self.do_with_retry(f).await?; + match get_result.bytes().await { + Ok(bytes) => return Ok(bytes), + Err(err) => { + if retries == 0 { + log::warn!("Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", desc(), self.path, self.download_retry_count, err); + return Err(err); + } + log::debug!( + "Retrying {} from {} (remaining retries: {}). Error details: {:?}", + desc(), + self.path, + retries, + err + ); + retries -= 1; + } + } + } + } } #[async_trait] @@ -106,41 +140,29 @@ impl Reader for CloudObjectReader { } #[instrument(level = "debug", skip(self))] - async fn get_range(&self, range: Range) -> object_store::Result { - // We have a separate retry loop here. This is because object_store does not - // attempt retries on downloads that fail during streaming of the response body. - // - // However, this failure is pretty common (e.g. timeout) and we want to retry in these - // situations. In addition, we provide additional logging information in these - // failures cases. - let mut retries = self.download_retry_count; - loop { - let get_result = self - .do_with_retry(|| { - let options = GetOptions { - range: Some(range.clone().into()), - ..Default::default() - }; - self.object_store.get_opts(&self.path, options) - }) - .await?; - match get_result.bytes().await { - Ok(bytes) => return Ok(bytes), - Err(err) => { - if retries == 0 { - log::warn!("Failed to download range {:?} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", range, self.path, self.download_retry_count, err); - return Err(err); - } - log::debug!( - "Retrying range {:?} from {} (remaining retries: {}). Error details: {:?}", - range, - self.path, - retries, - err - ); - retries -= 1; - } - } - } + async fn get_range(&self, range: Range) -> OSResult { + self.do_get_with_outer_retry( + || { + let options = GetOptions { + range: Some(range.clone().into()), + ..Default::default() + }; + self.object_store.get_opts(&self.path, options) + }, + || format!("range {:?}", range), + ) + .await + } + + #[instrument(level = "debug", skip_all)] + async fn get_all(&self) -> OSResult { + self.do_get_with_outer_retry( + || { + self.object_store + .get_opts(&self.path, GetOptions::default()) + }, + || "read_all".to_string(), + ) + .await } } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index dc50e2dc12..32f9a8645b 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -4,6 +4,7 @@ //! Extend [object_store::ObjectStore] functionalities use std::collections::HashMap; +use std::ops::Range; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -12,6 +13,7 @@ use std::time::{Duration, SystemTime}; use async_trait::async_trait; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_credential_types::provider::ProvideCredentials; +use bytes::Bytes; use chrono::{DateTime, Utc}; use deepsize::DeepSizeOf; use futures::{future, stream::BoxStream, StreamExt, TryStreamExt}; @@ -649,6 +651,21 @@ impl ObjectStore { pub async fn size(&self, path: &Path) -> Result { Ok(self.inner.head(path).await?.size) } + + /// Convenience function to open a reader and read all the bytes + pub async fn read_one_all(&self, path: &Path) -> Result { + let reader = self.open(path).await?; + Ok(reader.get_all().await?) + } + + /// Convenience function open a reader and make a single request + /// + /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`] + /// and then call [`Reader::get_range`] multiple times. + pub async fn read_one_range(&self, path: &Path, range: Range) -> Result { + let reader = self.open(path).await?; + Ok(reader.get_range(range).await?) + } } /// Options that can be set for multiple object stores @@ -1301,6 +1318,26 @@ mod tests { assert_eq!(buf.as_bytes(), b"LOCAL"); } + #[tokio::test] + async fn test_read_one() { + let temp_dir = tempfile::tempdir().unwrap(); + + let file_path = temp_dir.path().join("test_file"); + let mut writer = ObjectStore::create_local_writer(file_path.as_path()) + .await + .unwrap(); + writer.write_all(b"LOCAL").await.unwrap(); + writer.shutdown().await.unwrap(); + + let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap(); + let obj_store = ObjectStore::local(); + let buf = obj_store.read_one_all(&file_path_os).await.unwrap(); + assert_eq!(buf.as_bytes(), b"LOCAL"); + + let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap(); + assert_eq!(buf.as_bytes(), b"LOCAL"); + } + #[tokio::test] #[cfg(windows)] async fn test_windows_paths() { diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 0863891935..046e4e4a55 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -96,4 +96,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf { /// /// TODO: change to read_at()? async fn get_range(&self, range: Range) -> object_store::Result; + + /// Read all bytes from the object. + /// + /// By default this reads the size in a separate IOP but some implementations + /// may not need the size beforehand. + async fn get_all(&self) -> object_store::Result; } diff --git a/rust/lance-table/src/io/deletion.rs b/rust/lance-table/src/io/deletion.rs index adba33d770..da11327646 100644 --- a/rust/lance-table/src/io/deletion.rs +++ b/rust/lance-table/src/io/deletion.rs @@ -17,6 +17,7 @@ use object_store::path::Path; use rand::Rng; use roaring::bitmap::RoaringBitmap; use snafu::{location, Location, ResultExt}; +use tracing::instrument; use crate::format::{DeletionFile, DeletionFileType, Fragment}; @@ -87,7 +88,7 @@ pub async fn write_deletion_file( // Drop writer so out is no longer borrowed. } - object_store.inner.put(&path, out.into()).await?; + object_store.put(&path, &out).await?; Ok(Some(deletion_file)) } @@ -104,7 +105,7 @@ pub async fn write_deletion_file( let mut out: Vec = Vec::new(); bitmap.serialize_into(&mut out)?; - object_store.inner.put(&path, out.into()).await?; + object_store.put(&path, &out).await?; Ok(Some(deletion_file)) } @@ -116,6 +117,7 @@ pub async fn write_deletion_file( /// Returns the deletion vector if one was present. Otherwise returns `Ok(None)`. /// /// Will return an error if the file is present but invalid. +#[instrument(level = "debug", skip_all)] pub async fn read_deletion_file( base: &Path, fragment: &Fragment, @@ -129,7 +131,7 @@ pub async fn read_deletion_file( DeletionFileType::Array => { let path = deletion_file_path(base, fragment.id, deletion_file); - let data = object_store.inner.get(&path).await?.bytes().await?; + let data = object_store.read_one_all(&path).await?; let data = std::io::Cursor::new(data); let mut batches: Vec = ArrowFileReader::try_new(data, None)? .collect::>() @@ -183,7 +185,7 @@ pub async fn read_deletion_file( DeletionFileType::Bitmap => { let path = deletion_file_path(base, fragment.id, deletion_file); - let data = object_store.inner.get(&path).await?.bytes().await?; + let data = object_store.read_one_all(&path).await?; let reader = data.reader(); let bitmap = RoaringBitmap::deserialize_from(reader) .map_err(box_error)