Skip to content

Commit

Permalink
tar: Hold open input stream as long as possible
Browse files Browse the repository at this point in the history
I'm hoping this will help us debug coreos/rpm-ostree#4567
```
[2023-08-30T15:00:16.554Z] Aug 30 15:00:15 qemu0 kola-runext-container-image[1957]: error: Importing: Parsing layer blob sha256:00623c39da63781bdd3fb00fedb36f8b9ec95e42cdb4d389f692457f24c67144: Failed to invoke skopeo proxy method FinishPipe: remote error: write |1: broken pipe
```

I haven't been able to reproduce it outside of CI yet, but we had
a prior ugly hack for this in
a27dac8

As the comments say - the goal is to hold open the input stream
as long as feasibly possible.
  • Loading branch information
cgwalters committed Aug 31, 2023
1 parent e39b727 commit 13455cc
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions lib/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,28 @@ async fn filter_tar_async(
mut dest: impl AsyncWrite + Send + Unpin,
) -> Result<BTreeMap<String, u32>> {
let (tx_buf, mut rx_buf) = tokio::io::duplex(8192);
// The source must be moved to the heap so we know it is stable for passing to the worker thread
let src = Box::pin(src);
let tar_transformer = tokio::task::spawn_blocking(move || -> Result<_> {
let src = tokio_util::io::SyncIoBridge::new(src);
let tar_transformer = tokio::task::spawn_blocking(move || {
let mut src = tokio_util::io::SyncIoBridge::new(src);
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);
filter_tar(src, dest)
let r = filter_tar(&mut src, dest);
// Pass ownership of the input stream back to the caller - see below.
(r, src)
});
let copier = tokio::io::copy(&mut rx_buf, &mut dest);
let (r, v) = tokio::join!(tar_transformer, copier);
let _v: u64 = v?;
r?
let (r, src) = r?;
// Note that the worker thread took temporary ownership of the input stream; we only close
// it at this point, after we're sure we've done all processing of the input. The reason
// for this is that both the skopeo process *or* us could encounter an error (see join_fetch).
// By ensuring we hold the stream open as long as possible, it ensures that we're going to
// see a remote error first, instead of the remote skopeo process seeing us close the pipe
// because we found an error.
drop(src);
// And pass back the result
r
}

/// Write the contents of a tarball as an ostree commit.
Expand Down

0 comments on commit 13455cc

Please sign in to comment.