Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Commit

Permalink
Separate ChildProcess transport to a submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
Xanewok committed Feb 24, 2019
1 parent 39fe80c commit 735212f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 70 deletions.
55 changes: 55 additions & 0 deletions tests/support/client/child_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::io::{Read, Write};
use std::process::{Command, Stdio};
use std::rc::Rc;

use futures::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_process::{Child, CommandExt};

pub struct ChildProcess {
stdin: tokio_process::ChildStdin,
stdout: tokio_process::ChildStdout,
child: Rc<tokio_process::Child>,
}

impl Read for ChildProcess {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Read::read(&mut self.stdout, buf)
}
}

impl Write for ChildProcess {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Write::write(&mut self.stdin, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Write::flush(&mut self.stdin)
}
}

impl AsyncRead for ChildProcess {}
impl AsyncWrite for ChildProcess {
fn shutdown(&mut self) -> Poll<(), std::io::Error> {
AsyncWrite::shutdown(&mut self.stdin)
}
}

impl ChildProcess {
pub fn spawn_from_command(mut cmd: Command) -> Result<ChildProcess, std::io::Error> {
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn_async()?;

Ok(ChildProcess {
stdout: child.stdout().take().unwrap(),
stdin: child.stdin().take().unwrap(),
child: Rc::new(child),
})
}

/// Returns a handle to the underlying `Child` process.
/// Useful when waiting until child process exits.
pub fn child(&self) -> Rc<Child> {
Rc::clone(&self.child)
}
}
90 changes: 20 additions & 70 deletions tests/support/client.rs → tests/support/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
//! receiver (thus, implementing the Future<Item = Value> model).

use std::cell::{Ref, RefCell};
use std::io::{Read, Write};
use std::process::{Command, Stdio};
use std::rc::Rc;

use futures::Poll;
use futures::sink::Sink;
use futures::stream::{SplitSink, Stream};
use futures::unsync::oneshot;
Expand All @@ -28,11 +26,14 @@ use serde_json::{json, Value};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::runtime::current_thread::Runtime;
use tokio::util::{FutureExt, StreamExt};
use tokio_process::{Child, CommandExt};

use super::project_builder::Project;
use super::{rls_exe, rls_timeout};

use child_process::ChildProcess;

mod child_process;

// `Rc` because we share those in message reader stream and the RlsHandle.
// `RefCell` because borrows don't overlap. This is safe, because `process_msg`
// is only called (synchronously) when we execute some work on the runtime,
Expand All @@ -41,61 +42,13 @@ use super::{rls_exe, rls_timeout};
type Messages = Rc<RefCell<Vec<Value>>>;
type Channels = Rc<RefCell<Vec<(Box<Fn(&Value) -> bool>, oneshot::Sender<Value>)>>>;

pub struct ChildProcess {
stdin: tokio_process::ChildStdin,
stdout: tokio_process::ChildStdout,
child: Rc<tokio_process::Child>,
}

impl Read for ChildProcess {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Read::read(&mut self.stdout, buf)
}
}

impl Write for ChildProcess {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Write::write(&mut self.stdin, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Write::flush(&mut self.stdin)
}
}

impl AsyncRead for ChildProcess {}
impl AsyncWrite for ChildProcess {
fn shutdown(&mut self) -> Poll<(), std::io::Error> {
AsyncWrite::shutdown(&mut self.stdin)
}
}

impl ChildProcess {
pub fn spawn_from_command(mut cmd: Command) -> Result<ChildProcess, std::io::Error> {
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn_async()?;

Ok(ChildProcess {
stdout: child.stdout().take().unwrap(),
stdin: child.stdin().take().unwrap(),
child: Rc::new(child),
})
}

/// Returns a handle to the underlying `Child` process.
/// Useful when waiting until child process exits.
pub fn child(&self) -> Rc<Child> {
Rc::clone(&self.child)
}
}

type LspFramed<T> = tokio::codec::Framed<T, LspCodec>;

trait LspFramedExt<T: AsyncRead + AsyncWrite> {
fn from_transport(transport: T) -> Self;
}

impl<T> LspFramedExt<T> for LspFramed<T> where T: AsyncRead + AsyncWrite {
impl<T: AsyncRead + AsyncWrite> LspFramedExt<T> for LspFramed<T> {
fn from_transport(transport: T) -> Self {
tokio::codec::Framed::new(transport, LspCodec::default())
}
Expand All @@ -106,19 +59,28 @@ impl Project {
let mut cmd = Command::new(rls_exe());
cmd.current_dir(self.root());
cmd.stderr(Stdio::inherit());

cmd
}

pub fn spawn_rls_with_runtime(&self, mut rt: Runtime) -> RlsHandle<ChildProcess> {
let cmd = self.rls_cmd();
pub fn spawn_rls_async(&self) -> RlsHandle<ChildProcess> {
let rt = Runtime::new().unwrap();

let cmd = self.rls_cmd();
let process = ChildProcess::spawn_from_command(cmd).unwrap();
let (sink, stream) = LspFramed::from_transport(process).split();

self.spawn_rls_with_params(rt, process)
}

fn spawn_rls_with_params<T>(&self, mut rt: Runtime, transport: T) -> RlsHandle<T>
where
T: AsyncRead + AsyncWrite + 'static,
{
let (finished_reading, reader_closed) = oneshot::channel();
let msgs = Messages::default();
let chans = Channels::default();

let (finished_reading, reader_closed) = oneshot::channel();
let (sink, stream) = LspFramed::from_transport(transport).split();

let reader = stream
.timeout(rls_timeout())
Expand All @@ -133,19 +95,7 @@ impl Project {

let sink = Some(sink);

RlsHandle {
writer: sink,
runtime: rt,
reader_closed,
messages: msgs,
channels: chans
}
}

pub fn spawn_rls_async(&self) -> RlsHandle<ChildProcess> {
let rt = Runtime::new().unwrap();

self.spawn_rls_with_runtime(rt)
RlsHandle { writer: sink, runtime: rt, reader_closed, messages: msgs, channels: chans }
}
}

Expand Down Expand Up @@ -326,4 +276,4 @@ impl<T: AsyncRead + AsyncWrite> Drop for RlsHandle<T> {

self.runtime.block_on(reader_closed).unwrap();
}
}
}

0 comments on commit 735212f

Please sign in to comment.