Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add an optional process-wide limit for IOPS #2928

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 120 additions & 4 deletions rust/lance-io/src/scheduler.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a semaphore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Switched to a sempahore which simplified things a bit.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::sync::Notify;
use tokio::sync::{Notify, Semaphore, SemaphorePermit};

use lance_core::{Error, Result};

Expand All @@ -25,6 +25,103 @@ const BACKPRESSURE_MIN: u64 = 5;
// Don't log backpressure warnings more than once / minute
const BACKPRESSURE_DEBOUNCE: u64 = 60;

// There are two structures that control the I/O scheduler concurrency. First,
// we have a hard limit on the number of IOPS that can be issued concurrently.
// This limit is process-wide.
//
// Second, we try and limit how many I/O requests can be buffered in memory without
// being consumed by a decoder of some kind. This limit is per-scheduler. We cannot
// make this limit process wide without introducing deadlock (because the decoder for
// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
// and vice-versa.
//
// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
//
// The process-wide limit exists when users need a hard limit on the number of parallel
// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
// the network. (Note: a process-wide limit of X will not neccesarily limit the number of
// open TCP connections to exactly X. The underlying object store may open more connections
// anyways)
//
// However, it can be too tough in some cases, e.g. when some scans are reading from
// cloud storage and other scans are reading from local disk. In these cases users don't
// need to set a process-limit and can rely on the per-scan limits.

// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
// on the number of IOPS that can be issued concurrently.
//
// The per-scan limits are enforced by IoQueue
struct IopsQuota {
// An Option is used here to avoid mutex overhead if no limit is set
iops_avail: Option<Semaphore>,
}

/// A reservation on the global IOPS quota
///
/// When the reservation is dropped, the IOPS quota is released unless
/// [`Self::forget`] is called.
struct IopsReservation<'a> {
value: Option<SemaphorePermit<'a>>,
}

impl<'a> IopsReservation<'a> {
// Forget the reservation, so it won't be released on drop
fn forget(&mut self) {
if let Some(value) = self.value.take() {
value.forget();
}
}
}

impl IopsQuota {
// By default, there is no process-wide limit on IOPS
//
// However, the user can request one by setting the environment variable
// LANCE_PROCESS_IO_THREADS_LIMIT to a positive integer.
fn new() -> Self {
let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
.map(|s| {
let limit = s
.parse::<i32>()
.expect("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer");
if limit <= 0 {
panic!("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer. To disable the limit, unset the environment variable");
}
limit
})
// The default (-1) does not apply any limit
.unwrap_or(-1);
let iops_avail = if initial_capacity < 0 {
None
} else {
Some(Semaphore::new(initial_capacity as usize))
};
Self { iops_avail }
}

// Return a reservation on the global IOPS quota
fn release(&self) {
if let Some(iops_avail) = self.iops_avail.as_ref() {
iops_avail.add_permits(1);
}
}

// Acquire a reservation on the global IOPS quota
async fn acquire(&self) -> IopsReservation {
if let Some(iops_avail) = self.iops_avail.as_ref() {
IopsReservation {
value: Some(iops_avail.acquire().await.unwrap()),
}
} else {
IopsReservation { value: None }
}
}
}

lazy_static::lazy_static! {
static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
}

// We want to allow requests that have a lower priority than any
// currently in-flight request. This helps avoid potential deadlocks
// related to backpressure. Unfortunately, it is quite expensive to
Expand Down Expand Up @@ -77,7 +174,8 @@ struct IoQueueState {
// Priorities of in-flight requests
priorities_in_flight: PrioritiesInFlight,
// Set when the scheduler is finished to notify the I/O loop to shut down
closed: bool,
// once all outstanding requests have been completed.
done_scheduling: bool,
// Time when the scheduler started
start: Instant,
// Last time we warned about backpressure
Expand All @@ -91,12 +189,16 @@ impl IoQueueState {
bytes_avail: io_buffer_size as i64,
pending_requests: BinaryHeap::new(),
priorities_in_flight: PrioritiesInFlight::new(io_capacity),
closed: false,
done_scheduling: false,
start: Instant::now(),
last_warn: AtomicU64::from(0),
}
}

fn finished(&self) -> bool {
self.done_scheduling && self.pending_requests.is_empty()
}

fn warn_if_needed(&self) {
let seconds_elapsed = self.start.elapsed().as_secs();
let last_warn = self.last_warn.load(Ordering::Acquire);
Expand Down Expand Up @@ -181,10 +283,23 @@ impl IoQueue {
async fn pop(&self) -> Option<IoTask> {
loop {
{
// First, grab a reservation on the global IOPS quota
// If we then get a task to run, transfer the reservation
// to the task. Otherwise, the reservation will be released
// when iop_res is dropped.
let mut iop_res = IOPS_QUOTA.acquire().await;
// Next, try and grab a reservation from the queue
let mut state = self.state.lock().unwrap();
if let Some(task) = state.next_task() {
// Reservation sucessfully acquired, we will release the global
// global reservation after task has run.
iop_res.forget();
return Some(task);
}

if state.finished() {
return None;
}
}

self.notify.notified().await;
Expand Down Expand Up @@ -212,7 +327,7 @@ impl IoQueue {

fn close(&self) {
let mut state = self.state.lock().unwrap();
state.closed = true;
state.done_scheduling = true;
drop(state);

self.notify.notify_one();
Expand Down Expand Up @@ -338,6 +453,7 @@ impl IoTask {
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize);
let bytes = bytes_fut.await.map_err(Error::from);
IOPS_QUOTA.release();
(self.when_done)(bytes);
}
}
Expand Down
Loading