diff --git a/Cargo.lock b/Cargo.lock index 77b30ce6d66..f308baf561e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1654,6 +1654,7 @@ dependencies = [ "gix-command", "gix-hash 0.11.3", "gix-object 0.31.0", + "gix-packetline-blocking", "gix-path 0.8.2", "gix-quote 0.4.5", "gix-trace 0.1.1", diff --git a/gix-filter/Cargo.toml b/gix-filter/Cargo.toml index 81a03f39d79..b9c65471f9c 100644 --- a/gix-filter/Cargo.toml +++ b/gix-filter/Cargo.toml @@ -18,6 +18,7 @@ gix-object = { version = "^0.31.0", path = "../gix-object" } gix-command = { version = "^0.2.6", path = "../gix-command" } gix-quote = { version = "^0.4.5", path = "../gix-quote" } gix-path = { version = "^0.8.2", path = "../gix-path" } +gix-packetline = { package = "gix-packetline-blocking", version = "^0.16.3", path = "../gix-packetline-blocking" } encoding_rs = "0.8.32" bstr = { version = "1.5.0", default-features = false, features = ["std"] } diff --git a/gix-filter/examples/ident.rs b/gix-filter/examples/ident.rs index 2e8981daa5d..bd359e59d95 100644 --- a/gix-filter/examples/ident.rs +++ b/gix-filter/examples/ident.rs @@ -1,5 +1,7 @@ use bstr::io::BufReadExt; -use std::io::{stdin, stdout, Write}; +use bstr::{ByteSlice, ByteVec}; +use gix_filter::driver::process; +use std::io::{stdin, stdout, Read, Write}; fn main() -> Result<(), Box> { let mut args = std::env::args(); @@ -11,6 +13,60 @@ fn main() -> Result<(), Box> { } match sub_command.as_str() { + "process" => { + let mut srv = gix_filter::driver::process::Server::handshake( + stdin(), + stdout(), + "git-filter", + |versions| versions.contains(&2).then_some(2), + &["clean", "smudge"], + )?; + + loop { + let mut request = srv.next_request()?; + let needs_failure = request + .meta + .iter() + .find_map(|(key, value)| (key == "pathname").then_some(value)) + .map_or(false, |path| path.ends_with(b"fail")); + if needs_failure { + panic!("process failure requested: {:?}", request.meta); + } + match request.command.as_str() { + "clean" => { + let mut buf = Vec::new(); + request.as_read().read_to_end(&mut buf)?; + request.write_status(process::Status::success())?; + + let mut lines = Vec::new(); + for mut line in buf.lines_with_terminator() { + if line.starts_with(b"\t") { + line = &line[1..]; + } + lines.push_str(line); + } + request.as_write().write_all(&lines)?; + request.write_status(process::Status::Previous)?; + } + "smudge" => { + let mut buf = Vec::new(); + request.as_read().read_to_end(&mut buf)?; + request.write_status(process::Status::success())?; + + let mut lines = Vec::new(); + for line in buf.lines_with_terminator() { + if !line.starts_with(b"\t") { + lines.push(b'\t'); + } + lines.push_str(line); + } + request.as_write().write_all(&lines)?; + request.write_status(process::Status::Previous)?; + } + unknown => panic!("Unknown capability requested: {unknown}"), + } + } + } "clean" => { let mut stdin = stdin().lock(); let mut stdout = stdout().lock(); diff --git a/gix-filter/src/driver.rs b/gix-filter/src/driver.rs deleted file mode 100644 index 10c4246f7c0..00000000000 --- a/gix-filter/src/driver.rs +++ /dev/null @@ -1,150 +0,0 @@ -use crate::Driver; -use bstr::{BStr, BString, ByteSlice, ByteVec}; -use std::collections::HashMap; -use std::ffi::OsString; -use std::process::Stdio; - -/// -pub mod apply { - use crate::driver::Operation; - use bstr::BString; - - /// The error returned by [State::apply()][super::State::apply()]. - #[derive(Debug, thiserror::Error)] - #[allow(missing_docs)] - pub enum Error { - #[error("Failed to spawn driver: {command:?}")] - SpawnCommand { - source: std::io::Error, - command: std::process::Command, - }, - #[error("A {operation:?} program is not configured for the driver named '{driver}'")] - MissingCommand { driver: BString, operation: Operation }, - #[error("Could not write entire object to driver")] - WriteSource(#[from] std::io::Error), - } -} - -/// The kind of operation to apply using a driver -#[derive(Debug, Copy, Clone)] -pub enum Operation { - /// Turn worktree content into content suitable for storage in `git`. - Clean, - /// Turn content stored in `git` to content suitable for the working tree. - Smudge, -} - -/// State required to handle `process` filters, which are running until all their work is done. -/// -/// These can be significantly faster on some platforms as they are launched only once, while supporting asynchronous processing. -#[derive(Default)] -pub struct State { - /// The list of currently running processes. These are preferred over simple clean-and-smudge programs. - /// - /// Note that these processes are expected to shut-down once their stdin/stdout are dropped, so nothing else - /// needs to be done to clean them up after drop. - _running: HashMap, -} - -impl State { - /// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output - /// produced by the filter. `rela_path` is the repo-relative path of the entry to handle. - /// - /// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter, - /// which is only launched once. - pub fn apply( - &mut self, - driver: &Driver, - mut src: impl std::io::Read, - operation: Operation, - rela_path: &BStr, - ) -> Result { - match driver.process.as_ref() { - Some(_process) => todo!("find existing or launch process"), - None => { - let cmd = match operation { - Operation::Clean => driver - .clean - .as_ref() - .map(|cmd| substitute_f_parameter(cmd.as_ref(), rela_path)), - - Operation::Smudge => driver - .smudge - .as_ref() - .map(|cmd| substitute_f_parameter(cmd.as_ref(), rela_path)), - }; - - let cmd = cmd.ok_or_else(|| apply::Error::MissingCommand { - operation, - driver: driver.name.clone(), - })?; - - let mut cmd: std::process::Command = gix_command::prepare(gix_path::from_bstr(cmd).into_owned()) - .with_shell() - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .into(); - let mut child = match cmd.spawn() { - Ok(child) => child, - Err(err) => { - return Err(apply::Error::SpawnCommand { - source: err, - command: cmd, - }) - } - }; - - std::io::copy(&mut src, &mut child.stdin.take().expect("configured"))?; - Ok(StdoutErrCheck { - inner: child.stdout.take(), - child: driver.required.then_some((child, cmd)), - }) - } - } - } -} - -struct StdoutErrCheck { - inner: Option, - /// The child is present if we need its exit code to be positive. - child: Option<(std::process::Child, std::process::Command)>, -} - -impl std::io::Read for StdoutErrCheck { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self.inner.as_mut() { - Some(inner) => { - let num_read = inner.read(buf)?; - if num_read == 0 { - self.inner.take(); - if let Some((mut child, cmd)) = self.child.take() { - let status = child.wait()?; - if !status.success() { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Driver process {cmd:?} failed"), - )); - } - } - } - Ok(num_read) - } - None => Ok(0), - } - } -} - -/// Substitute `path` as shell-save version into `cmd` which could be something like `cmd something %f`. -fn substitute_f_parameter(cmd: &BStr, path: &BStr) -> BString { - let mut buf: BString = Vec::with_capacity(cmd.len()).into(); - - let mut ofs = 0; - while let Some(pos) = cmd[ofs..].find(b"%f") { - buf.push_str(&cmd[..ofs + pos]); - buf.extend_from_slice(&gix_quote::single(path)); - ofs += pos + 2; - } - buf.push_str(&cmd[ofs..]); - buf -} diff --git a/gix-filter/src/driver/apply.rs b/gix-filter/src/driver/apply.rs new file mode 100644 index 00000000000..7ca0d3fe4aa --- /dev/null +++ b/gix-filter/src/driver/apply.rs @@ -0,0 +1,256 @@ +use crate::driver::{process, substitute_f_parameter, Operation, State}; +use crate::{driver, Driver}; +use bstr::{BStr, BString}; +use std::process::Stdio; + +/// What to do if delay is supported by a process filter. +#[derive(Debug, Copy, Clone)] +pub enum Delay { + /// Use delayed processing for this entry. + /// + /// Note that it's up to the filter to determine whether or not the processing should be delayed. + Allow, + /// Do not delay the processing, and force it to happen immediately. In this case, no delayed processing will occur + /// even if the filter supports it. + Forbid, +} + +/// The error returned by [State::apply()][super::State::apply()]. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("Failed to spawn driver: {command:?}")] + SpawnCommand { + source: std::io::Error, + command: std::process::Command, + }, + #[error("A {operation:?} program is not configured for the driver named '{driver}'")] + MissingCommand { driver: BString, operation: Operation }, + #[error("Could not write entire object to driver")] + WriteSource(#[from] std::io::Error), + #[error("Process handshake with command {command:?} failed")] + ProcessHandshake { + source: process::client::handshake::Error, + command: std::process::Command, + }, + #[error("Failed to invoke '{command}' command")] + ProcessInvoke { + source: process::client::invoke::Error, + command: String, + }, + #[error("Asked for capability named '{wanted}', but process only had {} available.", available.join(", "))] + ProcessMissesCapability { wanted: String, available: Vec }, + #[error("The invoked command '{command}' in process indicated an error: '{error}'")] + ProcessStatus { error: String, command: String }, +} + +/// Additional information for use in the [`State::apply()`] method. +#[derive(Debug, Copy, Clone)] +pub struct Context<'a> { + /// The repo-relative using slashes as separator of the entry currently being processed. + pub rela_path: &'a BStr, + /// The name of the reference that `HEAD` is pointing to. It's passed to `process` filters if present. + pub ref_name: Option<&'a BStr>, + /// The root-level tree that contains the current entry directly or indirectly, or the commit owning the tree (if available). + /// + /// This is passed to `process` filters if present. + pub treeish: Option, + /// The actual blob-hash of the data we are processing. It's passed to `process` filters if present. + /// + /// Note that this hash might be different from the `$Id$` of the respective `ident` filter, as the latter generates the hash itself. + pub blob: Option, +} + +impl State { + /// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output + /// produced by the filter. `rela_path` is the repo-relative path of the entry to handle. + /// + /// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter, + /// which is only launched once and maintained using this state. + pub fn apply<'a>( + &'a mut self, + driver: &Driver, + src: impl std::io::Read, + operation: Operation, + ctx: Context<'_>, + ) -> Result, Error> { + match self.apply_delayed(driver, src, operation, Delay::Forbid, ctx)? { + MaybeDelayed::Delayed(_) => { + unreachable!("we forbid delaying the entry") + } + MaybeDelayed::Immediate(read) => Ok(read), + } + } + + /// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not. + pub fn apply_delayed<'a>( + &'a mut self, + driver: &Driver, + mut src: impl std::io::Read, + operation: Operation, + delay: Delay, + ctx: Context<'_>, + ) -> Result, Error> { + match driver.process.as_ref() { + Some(process) => { + let client = match self.running.remove(process) { + Some(c) => c, + None => { + let (child, cmd) = spawn_driver(process.clone())?; + process::Client::handshake(child, "git-filter", &[2], &["clean", "smudge", "delay"]).map_err( + |err| Error::ProcessHandshake { + source: err, + command: cmd, + }, + )? + } + }; + + // this strangeness is to workaround the borrowchecker, who otherwise won't let us return a reader. Quite sad :/. + // One would want to `get_mut()` or insert essentially, but it won't work. + self.running.insert(process.clone(), client); + let client = self.running.get_mut(process).expect("just inserted"); + + let command = match operation { + Operation::Clean => "clean", + Operation::Smudge => "smudge", + }; + + if !client.capabilities().contains(command) { + return Err(Error::ProcessMissesCapability { + wanted: command.into(), + available: client.capabilities().iter().cloned().collect(), + }); + } + + let status = client + .invoke( + command, + [ + ("pathname", Some(ctx.rela_path.to_owned())), + ("ref", ctx.ref_name.map(ToOwned::to_owned)), + ("treeish", ctx.treeish.map(|id| id.to_hex().to_string().into())), + ("blob", ctx.blob.map(|id| id.to_hex().to_string().into())), + ( + "can-delay", + match delay { + Delay::Allow if client.capabilities().contains("delay") => Some("1".into()), + Delay::Forbid | Delay::Allow => None, + }, + ), + ] + .into_iter() + .filter_map(|(key, value)| value.map(|v| (key, v))), + src, + ) + .map_err(|err| Error::ProcessInvoke { + command: command.into(), + source: err, + })?; + if matches!(delay, Delay::Allow) && status.is_delayed() { + Ok(MaybeDelayed::Delayed(driver::Key(process.clone()))) + } else if status.is_success() { + Ok(MaybeDelayed::Immediate(Box::new(client.as_read()))) + } else { + Err(Error::ProcessStatus { + command: command.into(), + error: match status { + process::Status::Named(error) => error, + process::Status::Previous => { + unreachable!("at this point a single status must be given") + } + }, + }) + } + } + None => { + let cmd = match operation { + Operation::Clean => driver + .clean + .as_ref() + .map(|cmd| substitute_f_parameter(cmd.as_ref(), ctx.rela_path)), + + Operation::Smudge => driver + .smudge + .as_ref() + .map(|cmd| substitute_f_parameter(cmd.as_ref(), ctx.rela_path)), + }; + + let cmd = cmd.ok_or_else(|| Error::MissingCommand { + operation, + driver: driver.name.clone(), + })?; + + let (mut child, cmd) = spawn_driver(cmd)?; + std::io::copy(&mut src, &mut child.stdin.take().expect("configured"))?; + Ok(MaybeDelayed::Immediate(Box::new(ReadFilterOutput { + inner: child.stdout.take(), + child: driver.required.then_some((child, cmd)), + }))) + } + } + } +} + +fn spawn_driver(cmd: BString) -> Result<(std::process::Child, std::process::Command), Error> { + let mut cmd: std::process::Command = gix_command::prepare(gix_path::from_bstr(cmd).into_owned()) + .with_shell() + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .into(); + let child = match cmd.spawn() { + Ok(child) => child, + Err(err) => { + return Err(Error::SpawnCommand { + source: err, + command: cmd, + }) + } + }; + Ok((child, cmd)) +} + +/// A utility type to represent delayed or immediate apply-filter results. +pub enum MaybeDelayed<'a> { + /// Using the delayed protocol, this entry has been sent to a long-running process and needs to be + /// checked for again, later, using the [`driver::Key`] to refer to the filter who owes a response. + /// + /// Note that the path to the entry is also needed to obtain the filtered result later. + Delayed(driver::Key), + /// The filtered result can be read from the contained reader right away. + /// + /// Note that it must be consumed in full or till a read error occurs. + Immediate(Box), +} + +/// A utility type to facilitate streaming the output of a filter process. +struct ReadFilterOutput { + inner: Option, + /// The child is present if we need its exit code to be positive. + child: Option<(std::process::Child, std::process::Command)>, +} + +impl std::io::Read for ReadFilterOutput { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self.inner.as_mut() { + Some(inner) => { + let num_read = inner.read(buf)?; + if num_read == 0 { + self.inner.take(); + if let Some((mut child, cmd)) = self.child.take() { + let status = child.wait()?; + if !status.success() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Driver process {cmd:?} failed"), + )); + } + } + } + Ok(num_read) + } + None => Ok(0), + } + } +} diff --git a/gix-filter/src/driver/mod.rs b/gix-filter/src/driver/mod.rs new file mode 100644 index 00000000000..5459ccab9e7 --- /dev/null +++ b/gix-filter/src/driver/mod.rs @@ -0,0 +1,47 @@ +use bstr::{BStr, BString, ByteSlice, ByteVec}; +use std::collections::HashMap; + +/// +pub mod apply; + +/// +pub mod process; + +/// The kind of operation to apply using a driver +#[derive(Debug, Copy, Clone)] +pub enum Operation { + /// Turn worktree content into content suitable for storage in `git`. + Clean, + /// Turn content stored in `git` to content suitable for the working tree. + Smudge, +} + +/// State required to handle `process` filters, which are running until all their work is done. +/// +/// These can be significantly faster on some platforms as they are launched only once, while supporting asynchronous processing. +#[derive(Default)] +pub struct State { + /// The list of currently running processes. These are preferred over simple clean-and-smudge programs. + /// + /// Note that these processes are expected to shut-down once their stdin/stdout are dropped, so nothing else + /// needs to be done to clean them up after drop. + running: HashMap, +} + +/// A way to reference a running filter for later acquisition of delayed output. +#[derive(Debug, Clone)] +pub struct Key(BString); + +/// Substitute `path` as shell-save version into `cmd` which could be something like `cmd something %f`. +fn substitute_f_parameter(cmd: &BStr, path: &BStr) -> BString { + let mut buf: BString = Vec::with_capacity(cmd.len()).into(); + + let mut ofs = 0; + while let Some(pos) = cmd[ofs..].find(b"%f") { + buf.push_str(&cmd[..ofs + pos]); + buf.extend_from_slice(&gix_quote::single(path)); + ofs += pos + 2; + } + buf.push_str(&cmd[ofs..]); + buf +} diff --git a/gix-filter/src/driver/process/client.rs b/gix-filter/src/driver/process/client.rs new file mode 100644 index 00000000000..3aaf26e289c --- /dev/null +++ b/gix-filter/src/driver/process/client.rs @@ -0,0 +1,236 @@ +use crate::driver::process; +use crate::driver::process::{Client, PacketlineReader}; +use bstr::{BString, ByteVec}; +use std::collections::HashSet; +use std::io::Write; +use std::str::FromStr; + +/// +pub mod handshake { + /// The error returned by [Client::handshake()][super::Client::handshake()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read or write to the process")] + Io(#[from] std::io::Error), + #[error("{msg} '{actual}'")] + Protocol { msg: String, actual: String }, + #[error("The server sent the '{name}' capability which isn't among the ones we desire can support")] + UnsupportedCapability { name: String }, + } +} + +/// +pub mod invoke { + /// The error returned by [Client::invoke()][super::Client::invoke()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read or write to the process")] + Io(#[from] std::io::Error), + } +} + +/// Protocol implementation +impl Client { + /// Given a spawned `process` as created from `cmd`, use the 'long-running-process' protocol to send `welcome-prefix` and supported + /// `versions`, along with the `desired_capabilities`, and perform the handshake to negotiate a version to use along with + /// obtaining supported capabilities, which may be a sub-set of the desired capabilities. + pub fn handshake( + mut process: std::process::Child, + welcome_prefix: &str, + versions: &[usize], + desired_capabilities: &[&str], + ) -> Result { + let mut out = + gix_packetline::Writer::new(process.stdin.take().expect("configured stdin when spawning")).text_mode(); + out.write_all(format!("{welcome_prefix}-client").as_bytes())?; + for version in versions { + out.write_all(format!("version={version}").as_bytes())?; + } + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush()?; + + let mut input = gix_packetline::StreamingPeekableIter::new( + process.stdout.take().expect("configured stdout when spawning"), + &[gix_packetline::PacketLineRef::Flush], + ); + let mut read = input.as_read(); + let mut buf = String::new(); + read.read_line_to_string(&mut buf)?; + if buf + .strip_prefix(welcome_prefix) + .map_or(true, |rest| rest.trim_end() != "-server") + { + return Err(handshake::Error::Protocol { + msg: format!("Wanted '{welcome_prefix}-server, got "), + actual: buf, + }); + } + + let chosen_version; + buf.clear(); + read.read_line_to_string(&mut buf)?; + match buf + .strip_prefix("version=") + .and_then(|version| usize::from_str(version.trim_end()).ok()) + { + Some(version) => { + chosen_version = version; + } + None => { + return Err(handshake::Error::Protocol { + msg: "Needed 'version=', got ".into(), + actual: buf, + }) + } + } + + if !versions.contains(&chosen_version) { + return Err(handshake::Error::Protocol { + msg: format!("Server offered {chosen_version}, we only support "), + actual: versions.iter().map(|v| v.to_string()).collect::>().join(", "), + }); + } + + if read.read_line_to_string(&mut buf)? != 0 { + return Err(handshake::Error::Protocol { + msg: "expected flush packet, got".into(), + actual: buf, + }); + } + for capability in desired_capabilities { + out.write_all(format!("capability={capability}").as_bytes())?; + } + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush()?; + + read.reset_with(&[gix_packetline::PacketLineRef::Flush]); + let mut capabilities = HashSet::new(); + loop { + buf.clear(); + let num_read = read.read_line_to_string(&mut buf)?; + if num_read == 0 { + break; + } + match buf.strip_prefix("capability=") { + Some(cap) => { + let cap = cap.trim_end(); + if !desired_capabilities.contains(&cap) { + return Err(handshake::Error::UnsupportedCapability { name: cap.into() }); + } + capabilities.insert(cap.to_owned()); + } + None => continue, + } + } + + drop(read); + Ok(Client { + _child: process, + out: input, + input: out, + capabilities, + version: chosen_version, + }) + } + + /// Invoke `command` and send all `meta` data before sending all `content` in full. + pub fn invoke<'a>( + &mut self, + command: &str, + meta: impl IntoIterator, + mut content: impl std::io::Read, + ) -> Result { + self.input.enable_text_mode(); + self.input.write_all(format!("command={command}").as_bytes())?; + let mut buf = BString::default(); + for (key, value) in meta { + buf.clear(); + buf.push_str(key); + buf.push(b'='); + buf.push_str(&value); + self.input.write_all(&buf)?; + } + gix_packetline::encode::flush_to_write(self.input.inner_mut())?; + self.input.enable_binary_mode(); + std::io::copy(&mut content, &mut self.input)?; + gix_packetline::encode::flush_to_write(self.input.inner_mut())?; + self.input.flush()?; + Ok(self.read_status()?) + } + + /// Return a `Read` implementation that reads the server process output until the next flush package, and validates + /// the status. If the status indicates failure, the last read will also fail. + pub fn as_read(&mut self) -> impl std::io::Read + '_ { + self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]); + ReadProcessOutputAndStatus { + inner: self.out.as_read(), + } + } + + /// Read a `status=` line from the process output until it is exhausted. + /// Note that the last sent status line wins and no status line means that the `Previous` still counts. + pub fn read_status(&mut self) -> std::io::Result { + read_status(&mut self.out.as_read()) + } +} + +fn read_status(read: &mut PacketlineReader<'_>) -> std::io::Result { + let mut status = process::Status::Previous; + let mut buf = String::new(); + loop { + buf.clear(); + let num_read = read.read_line_to_string(&mut buf)?; + if num_read == 0 { + break; + } + if let Some(name) = buf.strip_prefix("status=") { + status = process::Status::Named(name.trim_end().into()); + } + } + read.reset_with(&[gix_packetline::PacketLineRef::Flush]); + Ok(status) +} + +struct ReadProcessOutputAndStatus<'a> { + inner: PacketlineReader<'a>, +} + +impl<'a> std::io::Read for ReadProcessOutputAndStatus<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let num_read = self.inner.read(buf)?; + if num_read == 0 { + self.inner.reset_with(&[gix_packetline::PacketLineRef::Flush]); + let status = read_status(&mut self.inner)?; + if status.is_success() { + Ok(0) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Process indicated error after reading: {}", + status.message().unwrap_or_default() + ), + )) + } + } else { + Ok(num_read) + } + } +} + +/// Access +impl Client { + /// Return the list of capabilities reported by the serving process. + pub fn capabilities(&self) -> &HashSet { + &self.capabilities + } + + /// Return the negotiated version of the protocol. + /// + /// Note that it is the highest one that both the client and the server support. + pub fn version(&self) -> usize { + self.version + } +} diff --git a/gix-filter/src/driver/process/mod.rs b/gix-filter/src/driver/process/mod.rs new file mode 100644 index 00000000000..9dde7cc1366 --- /dev/null +++ b/gix-filter/src/driver/process/mod.rs @@ -0,0 +1,86 @@ +use std::collections::HashSet; + +/// A handle to a client that allows communicating to a long-running process. +pub struct Client { + /// The child process we are communicating with. + _child: std::process::Child, + /// The names of the obtained capabilities after the handshake. + capabilities: HashSet, + /// The negotiated version of the protocol. + version: usize, + /// A way to send packet-line encoded information to the process. + input: gix_packetline::Writer, + /// A way to read information sent to us by the process. + out: gix_packetline::StreamingPeekableIter, +} + +/// A handle to facilitate typical server interactions that include the handshake and command-invocations. +pub struct Server { + /// The names of the capabilities we can expect the client to use. + capabilities: HashSet, + /// The negotiated version of the protocol, it's the highest supported one. + version: usize, + /// A way to receive information from the client. + input: gix_packetline::StreamingPeekableIter>, + /// A way to send information to the client. + out: gix_packetline::Writer>, +} + +/// The return status of an [invoked command][Client::invoke()]. +#[derive(Debug, Clone)] +pub enum Status { + /// No new status was set, and instead we are to assume the previous status is still in effect. + Previous, + /// Assume the given named status. + Named(String), +} + +/// Initialization +impl Status { + /// Create a new instance that represents a successful operation. + pub fn success() -> Self { + Status::Named("success".into()) + } + + /// Create a new instance that represents an error with the given `message`. + pub fn error(message: impl Into) -> Self { + Status::Named(message.into()) + } +} + +/// Access +impl Status { + /// Note that this is assumed true even if no new status is set, hence we assume that upon error, the caller will not continue + /// interacting with the process. + pub fn is_success(&self) -> bool { + match self { + Status::Previous => true, + Status::Named(n) => n == "success", + } + } + + /// Return true if the status is explicitly set to indicated delayed output processing + pub fn is_delayed(&self) -> bool { + match self { + Status::Previous => false, + Status::Named(n) => n == "delayed", + } + } + + /// Return the status message if present. + pub fn message(&self) -> Option<&str> { + match self { + Status::Previous => None, + Status::Named(msg) => msg.as_str().into(), + } + } +} + +/// +pub mod client; + +/// +pub mod server; + +type PacketlineReader<'a, T = std::process::ChildStdout> = + gix_packetline::read::WithSidebands<'a, T, fn(bool, &[u8]) -> gix_packetline::read::ProgressAction>; diff --git a/gix-filter/src/driver/process/server.rs b/gix-filter/src/driver/process/server.rs new file mode 100644 index 00000000000..96b830798f1 --- /dev/null +++ b/gix-filter/src/driver/process/server.rs @@ -0,0 +1,263 @@ +use crate::driver::process::Server; +use bstr::{BString, ByteSlice}; +use std::collections::HashSet; +use std::io::Write; +use std::str::FromStr; + +/// A request to be handled by the server, typically done in a loop. +pub struct Request<'a> { + parent: &'a mut Server, + /// The command to execute with this request. + pub command: String, + /// A list of key-value pairs of meta-data related to `command`. + pub meta: Vec<(String, BString)>, +} + +/// +pub mod next_request { + use bstr::BString; + + /// The error returned by [Server::next_request()][super::Server::next_request()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read from the client")] + Io(#[from] std::io::Error), + #[error("{msg} '{actual}'")] + Protocol { msg: String, actual: BString }, + #[error(transparent)] + PacketlineDecode(#[from] gix_packetline::decode::Error), + } +} + +/// +pub mod handshake { + /// The error returned by [Server::handshake()][super::Server::handshake()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read or write to the client")] + Io(#[from] std::io::Error), + #[error("{msg} '{actual}'")] + Protocol { msg: String, actual: String }, + #[error("Could not select supported version from the one sent by the client: {}", actual.iter().map(ToString::to_string).collect::>().join(", "))] + VersionMismatch { actual: Vec }, + } +} + +impl Server { + /// Perform a handshake with the client sending information to our `stdin` and receiving information through our `stdout` + /// in packetline format. + /// `pick_version` is called with all versions supported by the client to pick one from, or `None` to indicate the handshake + /// should stop. + /// Use `available_capabilities` to match our capabilities with the ones from the client, so we communicate at most a subset of these. + /// + /// ### Note + /// + /// The server claims exclusive access over stdout and stdin, so all kinds of other output has to be steered towards stderr or there + /// will be a deadlock. + pub fn handshake( + stdin: std::io::Stdin, + stdout: std::io::Stdout, + welcome_prefix: &str, + pick_version: impl FnOnce(&[usize]) -> Option, + available_capabilities: &[&str], + ) -> Result { + let mut input = + gix_packetline::StreamingPeekableIter::new(stdin.lock(), &[gix_packetline::PacketLineRef::Flush]); + let mut read = input.as_read(); + let mut buf = String::new(); + read.read_line_to_string(&mut buf)?; + if buf + .strip_prefix(welcome_prefix) + .map_or(true, |rest| rest.trim_end() != "-client") + { + return Err(handshake::Error::Protocol { + msg: format!("Expected '{welcome_prefix}-client, got"), + actual: buf, + }); + } + + let mut versions = Vec::new(); + loop { + buf.clear(); + let num_read = read.read_line_to_string(&mut buf)?; + if num_read == 0 { + break; + } + versions.push( + match buf + .strip_prefix("version=") + .and_then(|version| usize::from_str(version.trim_end()).ok()) + { + Some(version) => version, + None => { + return Err(handshake::Error::Protocol { + msg: "Expected 'version=', got".into(), + actual: buf, + }) + } + }, + ); + } + let version = pick_version(&versions).ok_or(handshake::Error::VersionMismatch { actual: versions })?; + read.reset_with(&[gix_packetline::PacketLineRef::Flush]); + let mut out = gix_packetline::Writer::new(stdout.lock()).text_mode(); + out.write_all(format!("{welcome_prefix}-server").as_bytes())?; + out.write_all(format!("version={version}").as_bytes())?; + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush()?; + + let mut capabilities = HashSet::new(); + loop { + buf.clear(); + let num_read = read.read_line_to_string(&mut buf)?; + if num_read == 0 { + break; + } + match buf.strip_prefix("capability=") { + Some(cap) => { + let cap = cap.trim_end(); + if available_capabilities.contains(&cap) { + capabilities.insert(cap.to_owned()); + } + } + None => continue, + }; + } + + for cap in &capabilities { + out.write_all(format!("capability={cap}").as_bytes())?; + } + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush()?; + + drop(read); + Ok(Server { + capabilities, + version, + out, + input, + }) + } + + /// Read the next request and return it, provided its [`command`][Request::command] is supported by us. + pub fn next_request(&mut self) -> Result, next_request::Error> { + let mut buf = String::new(); + let mut read = self.input.as_read(); + + read.read_line_to_string(&mut buf)?; + let command = match buf + .strip_prefix("command=") + .map(|cmd| cmd.trim_end()) + .filter(|cmd| self.capabilities.contains(*cmd)) + .map(ToOwned::to_owned) + { + Some(cmd) => cmd, + None => { + return Err(next_request::Error::Protocol { + msg: "Wanted known command that matches our capability, got ".into(), + actual: buf.into(), + }) + } + }; + + let mut meta = Vec::with_capacity(1); + while let Some(res) = read.read_data_line() { + let line = res??; + let line = line + .as_bstr() + .ok_or_else(|| next_request::Error::Protocol { + msg: "expected data line, got ".into(), + actual: format!("{line:?}").into(), + })? + .trim(); + let mut tokens = line.splitn(2, |b| *b == b'='); + let (key, value) = tokens + .next() + .zip(tokens.next()) + .ok_or_else(|| next_request::Error::Protocol { + msg: "Expected 'key=value' metadata, got".into(), + actual: line.into(), + })?; + assert!(tokens.next().is_none(), "configured to yield at most two tokens"); + meta.push((key.as_bstr().to_string(), value.into())) + } + + drop(read); + self.input.reset_with(&[gix_packetline::PacketLineRef::Flush]); + + Ok(Request { + parent: self, + command, + meta, + }) + } +} + +mod request { + use crate::driver::process; + use crate::driver::process::server::Request; + use crate::driver::process::PacketlineReader; + use std::io::Write; + + impl Request<'_> { + /// Turn ourselves into a reader that can read until the next flush packet. + pub fn as_read(&mut self) -> PacketlineReader<'_, std::io::StdinLock<'static>> { + self.parent.input.as_read() + } + + /// Provide the write-end of the underlying process. + pub fn as_write(&mut self) -> impl std::io::Write + '_ { + self.parent.out.enable_binary_mode(); + WriteAndFlushOnDrop { + inner: &mut self.parent.out, + } + } + + /// Write the `status` message followed by a flush packet. + pub fn write_status(&mut self, status: process::Status) -> std::io::Result<()> { + self.parent.out.enable_text_mode(); + let out = &mut self.parent.out; + if let Some(message) = status.message() { + out.write_all(message.as_bytes())?; + } + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush() + } + } + + struct WriteAndFlushOnDrop<'a> { + inner: &'a mut gix_packetline::Writer>, + } + + impl std::io::Write for WriteAndFlushOnDrop<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } + } + + impl Drop for WriteAndFlushOnDrop<'_> { + fn drop(&mut self) { + gix_packetline::encode::flush_to_write(self.inner.inner_mut()).ok(); + self.inner.flush().ok(); + } + } +} + +/// Access +impl Server { + /// Return the list of capabilities we are allowed to use. + pub fn capabilities(&self) -> &HashSet { + &self.capabilities + } + + /// Return the negotiated version of the protocol. + pub fn version(&self) -> usize { + self.version + } +} diff --git a/gix-filter/tests/driver/mod.rs b/gix-filter/tests/driver/mod.rs index 1c15b7f2670..a9023da267e 100644 --- a/gix-filter/tests/driver/mod.rs +++ b/gix-filter/tests/driver/mod.rs @@ -21,71 +21,77 @@ static DRIVER: Lazy = Lazy::new(|| { }); mod apply { - mod no_process { - fn driver_no_process() -> Driver { - let mut exe = DRIVER.to_string_lossy().into_owned(); - if cfg!(windows) { - exe = exe.replace('\\', "/"); - } - Driver { - name: "ident".into(), - clean: Some((exe.clone() + " clean %f").into()), - smudge: Some((exe + " smudge %f").into()), - process: None, - required: true, - } + fn driver_no_process() -> Driver { + let mut driver = driver_with_process(); + driver.process = None; + driver + } + + fn driver_with_process() -> Driver { + let mut exe = DRIVER.to_string_lossy().into_owned(); + if cfg!(windows) { + exe = exe.replace('\\', "/"); + } + Driver { + name: "ident".into(), + clean: Some((exe.clone() + " clean %f").into()), + smudge: Some((exe.clone() + " smudge %f").into()), + process: Some((exe + " process").into()), + required: true, } + } - use crate::driver::DRIVER; - use bstr::ByteSlice; - use gix_filter::{driver, Driver}; - use std::io::Read; + use crate::driver::DRIVER; + use bstr::ByteSlice; + use gix_filter::driver::apply; + use gix_filter::{driver, Driver}; + use std::io::Read; - #[test] - fn smudge_and_clean_failure_is_translated_to_observable_error_for_required_drivers() -> crate::Result { - let mut state = gix_filter::driver::State::default(); - let driver = driver_no_process(); - assert!(driver.required); + #[test] + fn smudge_and_clean_failure_is_translated_to_observable_error_for_required_drivers() -> crate::Result { + let mut state = gix_filter::driver::State::default(); + let driver = driver_no_process(); + assert!(driver.required); - let mut filtered = state.apply( - &driver, - &b"hello\nthere\n"[..], - driver::Operation::Smudge, - "do/fail".into(), - )?; - let mut buf = Vec::new(); - let err = filtered.read_to_end(&mut buf).unwrap_err(); - assert!(err.to_string().ends_with(" failed")); + let mut filtered = state.apply( + &driver, + &b"hello\nthere\n"[..], + driver::Operation::Smudge, + context_from_path("do/fail"), + )?; + let mut buf = Vec::new(); + let err = filtered.read_to_end(&mut buf).unwrap_err(); + assert!(err.to_string().ends_with(" failed")); - Ok(()) - } + Ok(()) + } - #[test] - fn smudge_and_clean_failure_means_nothing_if_required_is_false() -> crate::Result { - let mut state = gix_filter::driver::State::default(); - let mut driver = driver_no_process(); - driver.required = false; + #[test] + fn smudge_and_clean_failure_means_nothing_if_required_is_false() -> crate::Result { + let mut state = gix_filter::driver::State::default(); + let mut driver = driver_no_process(); + driver.required = false; - let mut filtered = state.apply( - &driver, - &b"hello\nthere\n"[..], - driver::Operation::Clean, - "do/fail".into(), - )?; - let mut buf = Vec::new(); - let num_read = filtered.read_to_end(&mut buf)?; - assert_eq!( - num_read, 0, - "the example fails right away so no output is produced to stdout" - ); + let mut filtered = state.apply( + &driver, + &b"hello\nthere\n"[..], + driver::Operation::Clean, + context_from_path("do/fail"), + )?; + let mut buf = Vec::new(); + let num_read = filtered.read_to_end(&mut buf)?; + assert_eq!( + num_read, 0, + "the example fails right away so no output is produced to stdout" + ); - Ok(()) - } + Ok(()) + } - #[test] - fn smudge_and_clean() -> crate::Result { - let mut state = gix_filter::driver::State::default(); - let driver = driver_no_process(); + #[test] + fn smudge_and_clean_series() -> crate::Result { + let mut state = gix_filter::driver::State::default(); + for driver in [driver_no_process(), driver_with_process()] { assert!( driver.required, "we want errors to definitely show, and don't expect them" @@ -96,10 +102,11 @@ mod apply { &driver, input.as_bytes(), driver::Operation::Smudge, - "some/path.txt".into(), + context_from_path("some/path.txt"), )?; let mut buf = Vec::new(); filtered.read_to_end(&mut buf)?; + drop(filtered); assert_eq!( buf.as_bstr(), "\thello\n\tthere\n", @@ -111,7 +118,7 @@ mod apply { &driver, smudge_result.as_bytes(), driver::Operation::Clean, - "some/path.txt".into(), + context_from_path("some/path.txt"), )?; buf.clear(); filtered.read_to_end(&mut buf)?; @@ -120,8 +127,16 @@ mod apply { input, "the clean filter reverses the smudge filter (and we call the right one)" ); + } + Ok(()) + } - Ok(()) + fn context_from_path(path: &str) -> apply::Context<'_> { + apply::Context { + rela_path: path.into(), + ref_name: None, + treeish: None, + blob: None, } } }