diff --git a/packages/server/src/runtime/builtin/compress.rs b/packages/server/src/runtime/builtin/compress.rs index 05087b5c..56aa6034 100644 --- a/packages/server/src/runtime/builtin/compress.rs +++ b/packages/server/src/runtime/builtin/compress.rs @@ -1,5 +1,8 @@ use super::Runtime; +use byte_unit::Byte; +use num::ToPrimitive; use std::pin::Pin; +use std::time::Duration; use tangram_client as tg; use tokio::io::AsyncRead; @@ -7,7 +10,7 @@ impl Runtime { pub async fn compress( &self, build: &tg::Build, - _remote: Option, + remote: Option, ) -> tg::Result { let server = &self.server; @@ -36,8 +39,42 @@ impl Runtime { .parse::() .map_err(|source| tg::error!(!source, "invalid format"))?; - // Compress the blob. let reader = blob.reader(server).await?; + // Spawn a task to log progress. + let compressed = reader.position(); + let content_length = reader.size(); + let log_task = tokio::spawn({ + let server = server.clone(); + let build = build.clone(); + let remote = remote.clone(); + async move { + loop { + let compressed = compressed.load(std::sync::atomic::Ordering::Relaxed); + let percent = + 100.0 * compressed.to_f64().unwrap() / content_length.to_f64().unwrap(); + let compressed = Byte::from_u64(compressed); + let content_length = Byte::from_u64(content_length); + let message = format!( + "compressing: {compressed:#} of {content_length:#} {percent:.2}%\n" + ); + let arg = tg::build::log::post::Arg { + bytes: message.into(), + remote: remote.clone(), + }; + let result = build.add_log(&server, arg).await; + if result.is_err() { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + let log_task_abort_handle = log_task.abort_handle(); + scopeguard::defer! { + log_task_abort_handle.abort(); + }; + + // Compress the blob. let reader: Pin> = match format { tg::blob::compress::Format::Bz2 => { Box::pin(async_compression::tokio::bufread::BzEncoder::new(reader)) @@ -54,6 +91,16 @@ impl Runtime { }; let blob = tg::Blob::with_reader(server, reader).await?; + log_task.abort(); + + // Log that the compression finished. + let message = format!("finished compressing\n"); + let arg = tg::build::log::post::Arg { + bytes: message.into(), + remote: remote.clone(), + }; + build.add_log(server, arg).await.ok(); + Ok(blob.into()) } } diff --git a/packages/server/src/runtime/builtin/decompress.rs b/packages/server/src/runtime/builtin/decompress.rs index 39fe6386..f28cbda9 100644 --- a/packages/server/src/runtime/builtin/decompress.rs +++ b/packages/server/src/runtime/builtin/decompress.rs @@ -1,5 +1,8 @@ use super::Runtime; +use byte_unit::Byte; +use num::ToPrimitive; use std::pin::Pin; +use std::time::Duration; use tangram_client as tg; use tokio::io::AsyncRead; @@ -7,7 +10,7 @@ impl Runtime { pub async fn decompress( &self, build: &tg::Build, - _remote: Option, + remote: Option, ) -> tg::Result { let server = &self.server; @@ -36,8 +39,42 @@ impl Runtime { .parse::() .map_err(|source| tg::error!(!source, "invalid format"))?; - // Decompress the blob. let reader = blob.reader(server).await?; + // Spawn a task to log progress. + let decompressed = reader.position(); + let content_length = reader.size(); + let log_task = tokio::spawn({ + let server = server.clone(); + let build = build.clone(); + let remote = remote.clone(); + async move { + loop { + let decompressed = decompressed.load(std::sync::atomic::Ordering::Relaxed); + let percent = + 100.0 * decompressed.to_f64().unwrap() / content_length.to_f64().unwrap(); + let decompressed = Byte::from_u64(decompressed); + let content_length = Byte::from_u64(content_length); + let message = format!( + "decompressing: {decompressed:#} of {content_length:#} {percent:.2}%\n" + ); + let arg = tg::build::log::post::Arg { + bytes: message.into(), + remote: remote.clone(), + }; + let result = build.add_log(&server, arg).await; + if result.is_err() { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + let log_task_abort_handle = log_task.abort_handle(); + scopeguard::defer! { + log_task_abort_handle.abort(); + }; + + // Decompress the blob. let reader: Pin> = match format { tg::blob::compress::Format::Bz2 => { Box::pin(async_compression::tokio::bufread::BzDecoder::new(reader)) @@ -52,8 +89,19 @@ impl Runtime { Box::pin(async_compression::tokio::bufread::ZstdDecoder::new(reader)) }, }; + let blob = tg::Blob::with_reader(server, reader).await?; + log_task.abort(); + + // Log that the decompression finished. + let message = format!("finished decompressing\n"); + let arg = tg::build::log::post::Arg { + bytes: message.into(), + remote: remote.clone(), + }; + build.add_log(server, arg).await.ok(); + Ok(blob.into()) } } diff --git a/packages/server/src/runtime/builtin/extract.rs b/packages/server/src/runtime/builtin/extract.rs index 0aecf3fb..d5611ae9 100644 --- a/packages/server/src/runtime/builtin/extract.rs +++ b/packages/server/src/runtime/builtin/extract.rs @@ -1,5 +1,8 @@ use super::Runtime; use crate::tmp::Tmp; +use byte_unit::Byte; +use num::ToPrimitive; +use std::time::Duration; use tangram_client as tg; use tokio_util::io::SyncIoBridge; @@ -7,7 +10,7 @@ impl Runtime { pub async fn extract( &self, build: &tg::Build, - _remote: Option, + remote: Option, ) -> tg::Result { let server = &self.server; @@ -41,6 +44,37 @@ impl Runtime { // Create the reader. let reader = blob.reader(server).await?; + let extracted = reader.position(); + let content_length = reader.size(); + let log_task = tokio::spawn({ + let server = server.clone(); + let build = build.clone(); + let remote = remote.clone(); + async move { + loop { + let extracted = extracted.load(std::sync::atomic::Ordering::Relaxed); + let percent = + 100.0 * extracted.to_f64().unwrap() / content_length.to_f64().unwrap(); + let extracted = Byte::from_u64(extracted); + let content_length = Byte::from_u64(content_length); + let message = + format!("extracting: {extracted:#} of {content_length:#} {percent:.2}%\n"); + let arg = tg::build::log::post::Arg { + bytes: message.into(), + remote: remote.clone(), + }; + let result = build.add_log(&server, arg).await; + if result.is_err() { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + let log_task_abort_handle = log_task.abort_handle(); + scopeguard::defer! { + log_task_abort_handle.abort(); + }; // Create a temporary path. let tmp = Tmp::new(server); @@ -77,6 +111,16 @@ impl Runtime { .await .unwrap()?; + log_task.abort(); + + // Log that the extraction finished. + let message = format!("finished extracting\n"); + let arg = tg::build::log::post::Arg { + bytes: message.into(), + remote: remote.clone(), + }; + build.add_log(server, arg).await.ok(); + // Check in the extracted artifact. let arg = tg::artifact::checkin::Arg { destructive: true,