Skip to content

Commit

Permalink
feat: API support for receiving delayed entries
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jun 30, 2023
1 parent d00e6c5 commit 29b744b
Show file tree
Hide file tree
Showing 11 changed files with 499 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions gix-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ encoding_rs = "0.8.32"
bstr = { version = "1.5.0", default-features = false, features = ["std"] }
thiserror = "1.0.38"


[dev-dependencies]
once_cell = "1.18.0"
gix-testtools = { path = "../tests/tools" }
134 changes: 109 additions & 25 deletions gix-filter/examples/ident.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bstr::io::BufReadExt;
use bstr::{ByteSlice, ByteVec};
use gix_filter::driver::process;
use std::io::{stdin, stdout, Read, Write};
Expand All @@ -9,48 +8,86 @@ static PREFIX: &str = "➡";
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut args = std::env::args();
let sub_command = args.nth(1).ok_or("Need sub-command")?;
let filename = args.next(); // possibly %f
let needs_failure = filename.as_deref().map_or(false, |file| file.ends_with("fail"));
let next_arg = args.next(); // possibly %f
let needs_failure = next_arg.as_deref().map_or(false, |file| file.ends_with("fail"));
if needs_failure {
panic!("failure requested for {sub_command}");
}

match sub_command.as_str() {
"process" => {
let disallow_delay = next_arg.as_deref().map_or(false, |arg| arg == "disallow-delay");
let mut srv = gix_filter::driver::process::Server::handshake(
stdin(),
stdout(),
"git-filter",
|versions| versions.contains(&2).then_some(2),
&["clean", "smudge", "wait-1-s"],
if disallow_delay {
&["clean", "smudge"]
} else {
&["clean", "smudge", "delay"]
},
)?;

let mut next_smudge_aborts = false;
let mut next_smudge_fails_permanently = false; // a test validates that we don't actually hang
let mut delayed = Vec::new();
while let Some(mut request) = srv.next_request()? {
let needs_failure = request
.meta
.iter()
.find_map(|(key, value)| (key == "pathname").then_some(value))
.map_or(false, |path| path.ends_with(b"fail"));
let pathname = request
.meta
.iter()
.find_map(|(key, value)| (key == "pathname").then(|| value.clone()));
if needs_failure {
panic!("process failure requested: {:?}", request.meta);
}
let can_delay = request
.meta
.iter()
.any(|(key, value)| key == "can-delay" && value == "1");
match request.command.as_str() {
"clean" => {
let mut buf = Vec::new();
request.as_read().read_to_end(&mut buf)?;
request.write_status(process::Status::success())?;
request.write_status(if can_delay {
process::Status::delayed()
} else {
process::Status::success()
})?;

let mut lines = Vec::new();
for mut line in buf.lines_with_terminator() {
if line.starts_with(PREFIX.as_bytes()) {
line = &line[PREFIX.len()..];
let lines = if let Some(delayed_lines) = buf
.is_empty()
.then(|| {
delayed
.iter()
.position(|(cmd, path, _)| {
*cmd == request.command.as_str() && Some(path) == pathname.as_ref()
})
.map(|pos| delayed.remove(pos).2)
})
.flatten()
{
delayed_lines
} else {
let mut lines = Vec::new();
for mut line in buf.lines_with_terminator() {
if line.starts_with(PREFIX.as_bytes()) {
line = &line[PREFIX.len()..];
}
lines.push_str(line);
}
lines.push_str(line);
lines
};
if can_delay {
delayed.push(("clean", pathname.expect("needed for delayed operation"), lines));
} else {
request.as_write().write_all(&lines)?;
request.write_status(process::Status::Previous)?;
}
request.as_write().write_all(&lines)?;
request.write_status(process::Status::Previous)?;
}
"smudge" => {
let mut buf = Vec::new();
Expand All @@ -60,20 +97,60 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
process::Status::abort()
} else if next_smudge_fails_permanently {
process::Status::exit()
} else if can_delay {
process::Status::delayed()
} else {
process::Status::success()
};
request.write_status(status)?;

let mut lines = Vec::new();
for line in buf.lines_with_terminator() {
if !line.starts_with(PREFIX.as_bytes()) {
lines.push_str(PREFIX.as_bytes());
let lines = if let Some(delayed_lines) = buf
.is_empty()
.then(|| {
delayed
.iter()
.position(|(cmd, path, _)| {
*cmd == request.command.as_str() && Some(path) == pathname.as_ref()
})
.map(|pos| delayed.remove(pos).2)
})
.flatten()
{
delayed_lines
} else {
let mut lines = Vec::new();
for line in buf.lines_with_terminator() {
if !line.starts_with(PREFIX.as_bytes()) {
lines.push_str(PREFIX.as_bytes());
}
lines.push_str(line);
}
lines
};

if can_delay {
delayed.push(("smudge", pathname.expect("needed for delayed operation"), lines));
} else {
request.as_write().write_all(&lines)?;
request.write_status(process::Status::Previous)?;
}
}
"list_available_blobs" => {
{
let mut out = request.as_write();
let mut last_cmd = None;
let mut buf = Vec::<u8>::new();
for (cmd, path, _) in &delayed {
if last_cmd.get_or_insert(*cmd) != cmd {
panic!("the API doesn't support mixing cmds as paths might not be unique anymore")
}
buf.clear();
buf.push_str("pathname=");
buf.extend_from_slice(path);
out.write_all(&buf)?
}
lines.push_str(line);
}
request.as_write().write_all(&lines)?;
request.write_status(process::Status::Previous)?;
request.write_status(process::Status::success())?;
}
"wait-1-s" => {
std::io::copy(&mut request.as_read(), &mut std::io::sink())?;
Expand All @@ -94,25 +171,32 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
}
// simple filters actually don't support streaming - they have to first read all input, then produce all output,
// but can't mix reading stdin and write to stdout at the same time as `git` (or `gitoxide`) don't read the output while
// writing the input.
"clean" => {
let mut stdin = stdin().lock();
let mut stdout = stdout().lock();
stdin.for_byte_line_with_terminator(|mut line| {
let mut buf = Vec::new();
std::io::copy(&mut stdin, &mut buf)?;
for mut line in buf.lines_with_terminator() {
if line.starts_with(PREFIX.as_bytes()) {
line = &line[PREFIX.len()..];
}
stdout.write_all(line).map(|_| true)
})?;
stdout.write_all(line).map(|_| true)?;
}
}
"smudge" => {
let mut stdin = stdin().lock();
let mut stdout = stdout().lock();
stdin.for_byte_line_with_terminator(|line| {
let mut buf = Vec::new();
std::io::copy(&mut stdin, &mut buf)?;
for line in buf.lines_with_terminator() {
if !line.starts_with(PREFIX.as_bytes()) {
stdout.write_all(PREFIX.as_bytes())?;
}
stdout.write_all(line).map(|_| true)
})?;
stdout.write_all(line).map(|_| true)?;
}
}
unknown => panic!("Unknown sub-command: {unknown}"),
}
Expand Down
36 changes: 22 additions & 14 deletions gix-filter/src/driver/apply.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::driver::process::client::invoke;
use crate::driver::{process, Operation, Process, State};
use crate::{driver, Driver};
use bstr::BStr;
use bstr::{BStr, BString};
use std::collections::HashMap;

/// What to do if delay is supported by a process filter.
#[derive(Debug, Copy, Clone)]
Expand All @@ -23,6 +24,8 @@ pub enum Error {
Init(#[from] driver::init::Error),
#[error("Could not write entire object to driver")]
WriteSource(#[from] std::io::Error),
#[error("Filter process delayed an entry even though that was not requested")]
DelayNotAllowed,
#[error("Failed to invoke '{command}' command")]
ProcessInvoke {
source: process::client::invoke::Error,
Expand Down Expand Up @@ -85,6 +88,9 @@ impl State {
}

/// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not.
///
/// Poll [`list_delayed_paths()`][Self::list_delayed_paths()] until it is empty and query the available paths again.
/// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
pub fn apply_delayed<'a>(
&'a mut self,
driver: &Driver,
Expand All @@ -102,11 +108,7 @@ impl State {
}))))
}
Some(Process::MultiFile { client, key }) => {
let command = match operation {
Operation::Clean => "clean",
Operation::Smudge => "smudge",
};

let command = operation.as_str();
if !client.capabilities().contains(command) {
return Ok(None);
}
Expand Down Expand Up @@ -134,21 +136,18 @@ impl State {
Ok(status) => status,
Err(err) => {
let invoke::Error::Io(io_err) = &err;
if matches!(
io_err.kind(),
std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::UnexpectedEof
) {
self.running.remove(&key.0).expect("present or we wouldn't be here");
}

handle_io_err(io_err, &mut self.running, key.0.as_ref());
return Err(Error::ProcessInvoke {
command: command.into(),
source: err,
});
}
};

if matches!(delay, Delay::Allow) && status.is_delayed() {
if status.is_delayed() {
if matches!(delay, Delay::Forbid) {
return Err(Error::DelayNotAllowed);
}
Ok(Some(MaybeDelayed::Delayed(key)))
} else if status.is_success() {
// TODO: find a way to not have to do the 'borrow-dance'.
Expand Down Expand Up @@ -200,6 +199,15 @@ struct ReadFilterOutput {
child: Option<(std::process::Child, std::process::Command)>,
}

pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
if matches!(
err.kind(),
std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::UnexpectedEof
) {
running.remove(process).expect("present or we wouldn't be here");
}
}

impl std::io::Read for ReadFilterOutput {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.inner.as_mut() {
Expand Down
Loading

0 comments on commit 29b744b

Please sign in to comment.