Skip to content

Commit

Permalink
Implement compress, decompress, and extract progress logging
Browse files Browse the repository at this point in the history
  • Loading branch information
endbr64 committed Oct 18, 2024
1 parent a40eb7c commit 68d6c28
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 5 deletions.
51 changes: 49 additions & 2 deletions packages/server/src/runtime/builtin/compress.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::Runtime;
use byte_unit::Byte;
use num::ToPrimitive;
use std::pin::Pin;
use std::time::Duration;
use tangram_client as tg;
use tokio::io::AsyncRead;

impl Runtime {
pub async fn compress(
&self,
build: &tg::Build,
_remote: Option<String>,
remote: Option<String>,
) -> tg::Result<tg::Value> {
let server = &self.server;

Expand Down Expand Up @@ -36,8 +39,42 @@ impl Runtime {
.parse::<tg::blob::compress::Format>()
.map_err(|source| tg::error!(!source, "invalid format"))?;

// Compress the blob.
let reader = blob.reader(server).await?;
// Spawn a task to log progress.
let compressed = reader.position();
let content_length = reader.size();
let log_task = tokio::spawn({
let server = server.clone();
let build = build.clone();
let remote = remote.clone();
async move {
loop {
let compressed = compressed.load(std::sync::atomic::Ordering::Relaxed);
let percent =
100.0 * compressed.to_f64().unwrap() / content_length.to_f64().unwrap();
let compressed = Byte::from_u64(compressed);
let content_length = Byte::from_u64(content_length);
let message = format!(
"compressing: {compressed:#} of {content_length:#} {percent:.2}%\n"
);
let arg = tg::build::log::post::Arg {
bytes: message.into(),
remote: remote.clone(),
};
let result = build.add_log(&server, arg).await;
if result.is_err() {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
let log_task_abort_handle = log_task.abort_handle();
scopeguard::defer! {
log_task_abort_handle.abort();
};

// Compress the blob.
let reader: Pin<Box<dyn AsyncRead + Send + 'static>> = match format {
tg::blob::compress::Format::Bz2 => {
Box::pin(async_compression::tokio::bufread::BzEncoder::new(reader))
Expand All @@ -54,6 +91,16 @@ impl Runtime {
};
let blob = tg::Blob::with_reader(server, reader).await?;

log_task.abort();

// Log that the compression finished.
let message = format!("finished compressing\n");
let arg = tg::build::log::post::Arg {
bytes: message.into(),
remote: remote.clone(),
};
build.add_log(server, arg).await.ok();

Ok(blob.into())
}
}
52 changes: 50 additions & 2 deletions packages/server/src/runtime/builtin/decompress.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::Runtime;
use byte_unit::Byte;
use num::ToPrimitive;
use std::pin::Pin;
use std::time::Duration;
use tangram_client as tg;
use tokio::io::AsyncRead;

impl Runtime {
pub async fn decompress(
&self,
build: &tg::Build,
_remote: Option<String>,
remote: Option<String>,
) -> tg::Result<tg::Value> {
let server = &self.server;

Expand Down Expand Up @@ -36,8 +39,42 @@ impl Runtime {
.parse::<tg::blob::compress::Format>()
.map_err(|source| tg::error!(!source, "invalid format"))?;

// Decompress the blob.
let reader = blob.reader(server).await?;
// Spawn a task to log progress.
let decompressed = reader.position();
let content_length = reader.size();
let log_task = tokio::spawn({
let server = server.clone();
let build = build.clone();
let remote = remote.clone();
async move {
loop {
let decompressed = decompressed.load(std::sync::atomic::Ordering::Relaxed);
let percent =
100.0 * decompressed.to_f64().unwrap() / content_length.to_f64().unwrap();
let decompressed = Byte::from_u64(decompressed);
let content_length = Byte::from_u64(content_length);
let message = format!(
"decompressing: {decompressed:#} of {content_length:#} {percent:.2}%\n"
);
let arg = tg::build::log::post::Arg {
bytes: message.into(),
remote: remote.clone(),
};
let result = build.add_log(&server, arg).await;
if result.is_err() {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
let log_task_abort_handle = log_task.abort_handle();
scopeguard::defer! {
log_task_abort_handle.abort();
};

// Decompress the blob.
let reader: Pin<Box<dyn AsyncRead + Send + 'static>> = match format {
tg::blob::compress::Format::Bz2 => {
Box::pin(async_compression::tokio::bufread::BzDecoder::new(reader))
Expand All @@ -52,8 +89,19 @@ impl Runtime {
Box::pin(async_compression::tokio::bufread::ZstdDecoder::new(reader))
},
};

let blob = tg::Blob::with_reader(server, reader).await?;

log_task.abort();

// Log that the decompression finished.
let message = format!("finished decompressing\n");
let arg = tg::build::log::post::Arg {
bytes: message.into(),
remote: remote.clone(),
};
build.add_log(server, arg).await.ok();

Ok(blob.into())
}
}
46 changes: 45 additions & 1 deletion packages/server/src/runtime/builtin/extract.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::Runtime;
use crate::tmp::Tmp;
use byte_unit::Byte;
use num::ToPrimitive;
use std::time::Duration;
use tangram_client as tg;
use tokio_util::io::SyncIoBridge;

impl Runtime {
pub async fn extract(
&self,
build: &tg::Build,
_remote: Option<String>,
remote: Option<String>,
) -> tg::Result<tg::Value> {
let server = &self.server;

Expand Down Expand Up @@ -41,6 +44,37 @@ impl Runtime {

// Create the reader.
let reader = blob.reader(server).await?;
let extracted = reader.position();
let content_length = reader.size();
let log_task = tokio::spawn({
let server = server.clone();
let build = build.clone();
let remote = remote.clone();
async move {
loop {
let extracted = extracted.load(std::sync::atomic::Ordering::Relaxed);
let percent =
100.0 * extracted.to_f64().unwrap() / content_length.to_f64().unwrap();
let extracted = Byte::from_u64(extracted);
let content_length = Byte::from_u64(content_length);
let message =
format!("extracting: {extracted:#} of {content_length:#} {percent:.2}%\n");
let arg = tg::build::log::post::Arg {
bytes: message.into(),
remote: remote.clone(),
};
let result = build.add_log(&server, arg).await;
if result.is_err() {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
let log_task_abort_handle = log_task.abort_handle();
scopeguard::defer! {
log_task_abort_handle.abort();
};

// Create a temporary path.
let tmp = Tmp::new(server);
Expand Down Expand Up @@ -77,6 +111,16 @@ impl Runtime {
.await
.unwrap()?;

log_task.abort();

// Log that the extraction finished.
let message = format!("finished extracting\n");
let arg = tg::build::log::post::Arg {
bytes: message.into(),
remote: remote.clone(),
};
build.add_log(server, arg).await.ok();

// Check in the extracted artifact.
let arg = tg::artifact::checkin::Arg {
destructive: true,
Expand Down

0 comments on commit 68d6c28

Please sign in to comment.