Skip to content

Commit

Permalink
feat: add logging to compaction (lancedb#2791)
Browse files Browse the repository at this point in the history
Adds basic logging to compaction so that users can see some indication
of progress on very long-running compaction jobs. Example output:

```
2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 155cbe87-d8fe-44c7-baa2-2072f4353efa: Begin compacting 300 rows across 2 fragments
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 37e5da68-cd91-4e64-a3e4-3702714332b1: Begin compacting 300 rows across 2 fragments
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 155cbe87-d8fe-44c7-baa2-2072f4353efa: Read progress 112/300
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 155cbe87-d8fe-44c7-baa2-2072f4353efa: Read progress 225/300
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 37e5da68-cd91-4e64-a3e4-3702714332b1: Read progress 112/300
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 37e5da68-cd91-4e64-a3e4-3702714332b1: Read progress 225/300
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 155cbe87-d8fe-44c7-baa2-2072f4353efa: file written
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 155cbe87-d8fe-44c7-baa2-2072f4353efa: reserving fragment ids and transposing row ids
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 37e5da68-cd91-4e64-a3e4-3702714332b1: file written
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 37e5da68-cd91-4e64-a3e4-3702714332b1: reserving fragment ids and transposing row ids
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 155cbe87-d8fe-44c7-baa2-2072f4353efa: completed
[2024-08-26T17:36:31Z INFO  lance::dataset::optimize] Compaction task 37e5da68-cd91-4e64-a3e4-3702714332b1: completed
```
  • Loading branch information
westonpace authored and gagan-bhullar-tech committed Sep 13, 2024
1 parent aa9a461 commit ca80871
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use std::collections::HashMap;
use std::ops::{AddAssign, Range};
use std::sync::{Arc, RwLock};

use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{StreamExt, TryStreamExt};
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
Expand Down Expand Up @@ -647,12 +648,24 @@ async fn rewrite_files(
// num deletions recorded. If that's the case, we need to grab and set that
// information.
let fragments = migrate_fragments(dataset.as_ref(), &task.fragments, recompute_stats).await?;
let num_rows = fragments
.iter()
.map(|f| f.physical_rows.unwrap() as u64)
.sum::<u64>();
// If we aren't using move-stable row ids, then we need to remap indices.
let needs_remapping = !dataset.manifest.uses_move_stable_row_ids();
let mut scanner = dataset.scan();
if let Some(batch_size) = options.batch_size {
scanner.batch_size(batch_size);
}
// Generate an ID for logging purposes
let task_id = uuid::Uuid::new_v4();
log::info!(
"Compaction task {}: Begin compacting {} rows across {} fragments",
task_id,
num_rows,
fragments.len()
);
scanner
.with_fragments(fragments.clone())
.scan_in_order(true);
Expand All @@ -667,6 +680,19 @@ async fn rewrite_files(
(None, data)
};

let mut rows_read = 0;
let schema = reader.schema().clone();
let reader = reader.inspect_ok(move |batch| {
rows_read += batch.num_rows();
log::info!(
"Compaction task {}: Read progress {}/{}",
task_id,
rows_read,
num_rows,
);
});
let reader = Box::pin(RecordBatchStreamAdapter::new(schema, reader));

let mut params = WriteParams {
max_rows_per_file: options.target_rows_per_fragment,
max_rows_per_group: options.max_rows_per_group,
Expand All @@ -686,16 +712,23 @@ async fn rewrite_files(
)
.await?;

log::info!("Compaction task {}: file written", task_id);

let row_id_map = if let Some(row_ids) = row_ids {
let row_ids = Arc::try_unwrap(row_ids)
.expect("Row ids lock still owned")
.into_inner()
.expect("Row ids mutex still locked");

log::info!(
"Compaction task {}: reserving fragment ids and transposing row ids",
task_id
);
reserve_fragment_ids(&dataset, new_fragments.iter_mut()).await?;

remapping::transpose_row_ids(row_ids, &fragments, &new_fragments)
} else {
log::info!("Compaction task {}: rechunking stable row ids", task_id);
rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?;

HashMap::new()
Expand All @@ -713,6 +746,8 @@ async fn rewrite_files(
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();

log::info!("Compaction task {}: completed", task_id);

Ok(RewriteResult {
metrics,
new_fragments,
Expand Down

0 comments on commit ca80871

Please sign in to comment.