Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use retry loop when loading deletion files #2899

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Optimized local I/Os

use std::fs::File;
use std::io::{ErrorKind, SeekFrom};
use std::io::{ErrorKind, Read, SeekFrom};
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -167,6 +167,22 @@ impl Reader for LocalObjectReader {
source: err.into(),
})
}

/// Reads the entire file.
#[instrument(level = "debug", skip(self))]
async fn get_all(&self) -> object_store::Result<Bytes> {
let mut file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
file.read_to_end(buf.as_mut())?;
Ok(Bytes::from(buf))
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
}
}

#[cfg(windows)]
Expand Down
100 changes: 61 additions & 39 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::Bytes;
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, GetOptions, ObjectStore};
use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result as OSResult};
use tokio::sync::OnceCell;
use tracing::instrument;

Expand Down Expand Up @@ -61,8 +61,8 @@ impl CloudObjectReader {
// of the response body. Thus we add an outer retry loop here.
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> object_store::Result<O> {
f: impl Fn() -> BoxFuture<'a, OSResult<O>>,
) -> OSResult<O> {
let mut retries = 3;
loop {
match f().await {
Expand All @@ -76,6 +76,40 @@ impl CloudObjectReader {
}
}
}

// We have a separate retry loop here. This is because object_store does not
// attempt retries on downloads that fail during streaming of the response body.
//
// However, this failure is pretty common (e.g. timeout) and we want to retry in these
// situations. In addition, we provide additional logging information in these
// failures cases.
async fn do_get_with_outer_retry<'a>(
&self,
f: impl Fn() -> BoxFuture<'a, OSResult<GetResult>> + Copy,
desc: impl Fn() -> String,
) -> OSResult<Bytes> {
let mut retries = self.download_retry_count;
loop {
let get_result = self.do_with_retry(f).await?;
match get_result.bytes().await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
log::warn!("Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", desc(), self.path, self.download_retry_count, err);
return Err(err);
}
log::debug!(
"Retrying {} from {} (remaining retries: {}). Error details: {:?}",
desc(),
self.path,
retries,
err
);
retries -= 1;
}
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -106,41 +140,29 @@ impl Reader for CloudObjectReader {
}

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
// We have a separate retry loop here. This is because object_store does not
// attempt retries on downloads that fail during streaming of the response body.
//
// However, this failure is pretty common (e.g. timeout) and we want to retry in these
// situations. In addition, we provide additional logging information in these
// failures cases.
let mut retries = self.download_retry_count;
loop {
let get_result = self
.do_with_retry(|| {
let options = GetOptions {
range: Some(range.clone().into()),
..Default::default()
};
self.object_store.get_opts(&self.path, options)
})
.await?;
match get_result.bytes().await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
log::warn!("Failed to download range {:?} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", range, self.path, self.download_retry_count, err);
return Err(err);
}
log::debug!(
"Retrying range {:?} from {} (remaining retries: {}). Error details: {:?}",
range,
self.path,
retries,
err
);
retries -= 1;
}
}
}
async fn get_range(&self, range: Range<usize>) -> OSResult<Bytes> {
self.do_get_with_outer_retry(
|| {
let options = GetOptions {
range: Some(range.clone().into()),
..Default::default()
};
self.object_store.get_opts(&self.path, options)
},
|| format!("range {:?}", range),
)
.await
}

#[instrument(level = "debug", skip_all)]
async fn get_all(&self) -> OSResult<Bytes> {
self.do_get_with_outer_retry(
|| {
self.object_store
.get_opts(&self.path, GetOptions::default())
},
|| "read_all".to_string(),
)
.await
}
}
37 changes: 37 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Extend [object_store::ObjectStore] functionalities

use std::collections::HashMap;
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -12,6 +13,7 @@
use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_credential_types::provider::ProvideCredentials;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use deepsize::DeepSizeOf;
use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -629,7 +631,7 @@
pub fn remove_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<Result<Path>> {

Check warning on line 634 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name
self.inner
.delete_stream(locations.err_into::<ObjectStoreError>().boxed())
.err_into::<Error>()
Expand All @@ -649,6 +651,21 @@
pub async fn size(&self, path: &Path) -> Result<usize> {
Ok(self.inner.head(path).await?.size)
}

/// Convenience function to open a reader and read all the bytes
pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
let reader = self.open(path).await?;
Ok(reader.get_all().await?)
}

/// Convenience function open a reader and make a single request
///
/// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
/// and then call [`Reader::get_range`] multiple times.
pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
let reader = self.open(path).await?;
Ok(reader.get_range(range).await?)
}
}

/// Options that can be set for multiple object stores
Expand Down Expand Up @@ -1301,6 +1318,26 @@
assert_eq!(buf.as_bytes(), b"LOCAL");
}

#[tokio::test]
async fn test_read_one() {
let temp_dir = tempfile::tempdir().unwrap();

let file_path = temp_dir.path().join("test_file");
let mut writer = ObjectStore::create_local_writer(file_path.as_path())
.await
.unwrap();
writer.write_all(b"LOCAL").await.unwrap();
writer.shutdown().await.unwrap();

let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
let obj_store = ObjectStore::local();
let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
assert_eq!(buf.as_bytes(), b"LOCAL");

let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
assert_eq!(buf.as_bytes(), b"LOCAL");
}

#[tokio::test]
#[cfg(windows)]
async fn test_windows_paths() {
Expand Down
6 changes: 6 additions & 0 deletions rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;

/// Read all bytes from the object.
///
/// By default this reads the size in a separate IOP but some implementations
/// may not need the size beforehand.
async fn get_all(&self) -> object_store::Result<Bytes>;
}
10 changes: 6 additions & 4 deletions rust/lance-table/src/io/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use object_store::path::Path;
use rand::Rng;
use roaring::bitmap::RoaringBitmap;
use snafu::{location, Location, ResultExt};
use tracing::instrument;

use crate::format::{DeletionFile, DeletionFileType, Fragment};

Expand Down Expand Up @@ -87,7 +88,7 @@ pub async fn write_deletion_file(
// Drop writer so out is no longer borrowed.
}

object_store.inner.put(&path, out.into()).await?;
object_store.put(&path, &out).await?;

Ok(Some(deletion_file))
}
Expand All @@ -104,7 +105,7 @@ pub async fn write_deletion_file(
let mut out: Vec<u8> = Vec::new();
bitmap.serialize_into(&mut out)?;

object_store.inner.put(&path, out.into()).await?;
object_store.put(&path, &out).await?;

Ok(Some(deletion_file))
}
Expand All @@ -116,6 +117,7 @@ pub async fn write_deletion_file(
/// Returns the deletion vector if one was present. Otherwise returns `Ok(None)`.
///
/// Will return an error if the file is present but invalid.
#[instrument(level = "debug", skip_all)]
pub async fn read_deletion_file(
base: &Path,
fragment: &Fragment,
Expand All @@ -129,7 +131,7 @@ pub async fn read_deletion_file(
DeletionFileType::Array => {
let path = deletion_file_path(base, fragment.id, deletion_file);

let data = object_store.inner.get(&path).await?.bytes().await?;
let data = object_store.read_one_all(&path).await?;
let data = std::io::Cursor::new(data);
let mut batches: Vec<RecordBatch> = ArrowFileReader::try_new(data, None)?
.collect::<std::result::Result<_, ArrowError>>()
Expand Down Expand Up @@ -183,7 +185,7 @@ pub async fn read_deletion_file(
DeletionFileType::Bitmap => {
let path = deletion_file_path(base, fragment.id, deletion_file);

let data = object_store.inner.get(&path).await?.bytes().await?;
let data = object_store.read_one_all(&path).await?;
let reader = data.reader();
let bitmap = RoaringBitmap::deserialize_from(reader)
.map_err(box_error)
Expand Down
Loading