Skip to content

Commit

Permalink
feat: add an environment variable LANCE_INITIAL_UPLOAD_SIZE (lancedb#…
Browse files Browse the repository at this point in the history
…2806)

I feel like I am creating a lot of environment variables lately
:cold_sweat: but this one would be pretty tricky to push down.

I am testing with GCS, creating a 2TiB file, and it fails pretty much
100% of the time when following our gradually growing part size
strategy. I would like to know why, and plan to continue investigating.
However, in the meantime, it would help if we had some control over the
part size.

This PR allows the user to pick the initial part size buffer size. It
does not change the growth strategy though. At each step it will pick
either the larger of "user supplied value" or "value we would have used
with default strategy". So if the user picks a very large value (e.g.
256MB) then the growth strategy will still start growing the buffer size
at some point.

I also add a debug log statement when uploading a part in a multipart
upload.
  • Loading branch information
westonpace authored and gagan-bhullar-tech committed Sep 13, 2024
1 parent 1a30241 commit a3b0c86
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions rust/lance-io/src/object_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::traits::Writer;
use snafu::{location, Location};

/// Start at 5MB.
const INITIAL_UPLOAD_SIZE: usize = 1024 * 1024 * 5;
const INITIAL_UPLOAD_STEP: usize = 1024 * 1024 * 5;

fn max_upload_parallelism() -> usize {
static MAX_UPLOAD_PARALLELISM: OnceLock<usize> = OnceLock::new();
Expand All @@ -45,6 +45,25 @@ fn max_conn_reset_retries() -> u16 {
})
}

fn initial_upload_size() -> usize {
static LANCE_INITIAL_UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
*LANCE_INITIAL_UPLOAD_SIZE.get_or_init(|| {
std::env::var("LANCE_INITIAL_UPLOAD_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.inspect(|size| {
if *size < INITIAL_UPLOAD_STEP {
// Minimum part size in GCS and S3
panic!("LANCE_INITIAL_UPLOAD_SIZE must be at least 5MB");
} else if *size > 1024 * 1024 * 1024 * 5 {
// Maximum part size in GCS and S3
panic!("LANCE_INITIAL_UPLOAD_SIZE must be at most 5GB");
}
})
.unwrap_or(INITIAL_UPLOAD_STEP)
})
}

/// Writer to an object in an object store.
///
/// If the object is small enough, the writer will upload the object in a single
Expand Down Expand Up @@ -128,7 +147,7 @@ impl ObjectWriter {
cursor: 0,
path: Arc::new(path.clone()),
connection_resets: 0,
buffer: Vec::with_capacity(INITIAL_UPLOAD_SIZE),
buffer: Vec::with_capacity(initial_upload_size()),
use_constant_size_upload_parts: object_store.use_constant_size_upload_parts,
})
}
Expand All @@ -138,10 +157,10 @@ impl ObjectWriter {
fn next_part_buffer(buffer: &mut Vec<u8>, part_idx: u16, constant_upload_size: bool) -> Bytes {
let new_capacity = if constant_upload_size {
// The store does not support variable part sizes, so use the initial size.
INITIAL_UPLOAD_SIZE
initial_upload_size()
} else {
// Increase the upload size every 100 parts. This gives maximum part size of 2.5TB.
((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_SIZE
initial_upload_size().max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP)
};
let new_buffer = Vec::with_capacity(new_capacity);
let part = std::mem::replace(buffer, new_buffer);
Expand All @@ -154,6 +173,10 @@ impl ObjectWriter {
part_idx: u16,
sleep: Option<std::time::Duration>,
) -> BoxFuture<'static, std::result::Result<(), UploadPutError>> {
log::debug!(
"MultipartUpload submitting part with {} bytes",
buffer.len()
);
let fut = upload.put_part(buffer.clone().into());
Box::pin(async move {
if let Some(sleep) = sleep {
Expand Down

0 comments on commit a3b0c86

Please sign in to comment.