Skip to content

Commit

Permalink
Wire filewatching to daemon server (#5916)
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Soltis committed Sep 14, 2023
1 parent 8deaa68 commit eeb0b56
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 274 deletions.
31 changes: 28 additions & 3 deletions crates/turborepo-filewatch/src/globwatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, HashSet},
future::IntoFuture,
str::FromStr,
};

use notify::Event;
Expand All @@ -23,6 +24,30 @@ pub struct GlobSet {
exclude: Any<'static>,
}

impl GlobSet {
pub fn from_raw(
raw_includes: Vec<String>,
raw_excludes: Vec<String>,
) -> Result<Self, wax::BuildError> {
let include = raw_includes
.into_iter()
.map(|raw_glob| {
let glob = Glob::from_str(&raw_glob)?.to_owned();
Ok((raw_glob, glob))
})
.collect::<Result<HashMap<_, _>, wax::BuildError>>()?;
let excludes = raw_excludes
.into_iter()
.map(|raw_glob| {
let glob = Glob::from_str(&raw_glob)?.to_owned();
Ok(glob)
})
.collect::<Result<Vec<_>, wax::BuildError>>()?;
let exclude = wax::any(excludes)?.to_owned();
Ok(Self { include, exclude })
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Expand Down Expand Up @@ -369,7 +394,7 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
Expand Down Expand Up @@ -447,7 +472,7 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
Expand Down Expand Up @@ -535,7 +560,7 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
Expand Down
32 changes: 16 additions & 16 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use {
walkdir::WalkDir,
};

mod cookie_jar;
pub mod cookie_jar;
#[cfg(target_os = "macos")]
mod fsevent;
mod globwatcher;
pub mod globwatcher;

#[cfg(not(target_os = "macos"))]
type Backend = RecommendedWatcher;
Expand Down Expand Up @@ -88,15 +88,15 @@ pub struct FileSystemWatcher {
}

impl FileSystemWatcher {
pub fn new(root: &AbsoluteSystemPath) -> Result<Self, WatchError> {
pub async fn new(root: &AbsoluteSystemPath) -> Result<Self, WatchError> {
let (sender, _) = broadcast::channel(1024);
let (send_file_events, mut recv_file_events) = mpsc::channel(1024);
let watch_root = root.to_owned();
let broadcast_sender = sender.clone();
let watcher = run_watcher(&watch_root, send_file_events).unwrap();
let watcher = run_watcher(&watch_root, send_file_events)?;
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
// Ensure we are ready to receive new events, not events for existing state
futures::executor::block_on(wait_for_cookie(root, &mut recv_file_events))?;
wait_for_cookie(root, &mut recv_file_events).await?;
tokio::task::spawn(watch_events(
watcher,
watch_root,
Expand Down Expand Up @@ -436,7 +436,7 @@ mod test {
let sibling_path = parent_path.join_component("sibling");
sibling_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();

expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;
Expand Down Expand Up @@ -494,7 +494,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();

expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;
Expand Down Expand Up @@ -536,7 +536,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -565,7 +565,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -597,7 +597,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -628,7 +628,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -669,7 +669,7 @@ mod test {
let symlink_path = repo_root.join_component("symlink");
symlink_path.symlink_to_dir(child_path.as_str()).unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -708,7 +708,7 @@ mod test {
let symlink_path = repo_root.join_component("symlink");
symlink_path.symlink_to_dir(child_path.as_str()).unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -752,7 +752,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -790,7 +790,7 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand All @@ -811,7 +811,7 @@ mod test {
let mut recv = {
// create and immediately drop the watcher, which should trigger the exit
// channel
let watcher = FileSystemWatcher::new(&repo_root).unwrap();
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
watcher.subscribe()
};

Expand Down
22 changes: 20 additions & 2 deletions crates/turborepo-lib/src/commands/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::time::Duration;

use camino::Utf8PathBuf;
use futures::FutureExt;
use pidlock::PidlockError::AlreadyOwned;
use time::{format_description, OffsetDateTime};
use tokio::signal::ctrl_c;
use tracing::{trace, warn};
use turbopath::AbsoluteSystemPathBuf;

Expand Down Expand Up @@ -162,8 +164,24 @@ pub async fn daemon_server(
.map_err(|_| DaemonError::InvalidTimeout(idle_time.to_owned()))
.map(|d| Duration::from_nanos(d as u64))?;

let server = crate::daemon::DaemonServer::new(base, timeout, log_file)?;
let reason = server.serve().await;
let daemon_root = base.daemon_file_root();
let exit_signal = ctrl_c().map(|result| {
if let Err(e) = result {
tracing::error!("Error with signal handling: {}", e);
}
CloseReason::Interrupt
});
// TODO: be more methodical about this choice:
let cookie_dir = base.repo_root.join_component(".git");
let reason = crate::daemon::serve(
&base.repo_root,
cookie_dir,
&daemon_root,
log_file,
timeout,
exit_signal,
)
.await;

match reason {
CloseReason::SocketOpenError(SocketOpenError::LockError(AlreadyOwned)) => {
Expand Down
8 changes: 4 additions & 4 deletions crates/turborepo-lib/src/daemon/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::Stream;
use tokio::io::{AsyncRead, AsyncWrite};
use tonic::transport::server::Connected;
use tracing::{debug, trace};
use turbopath::AbsoluteSystemPathBuf;
use turbopath::AbsoluteSystemPath;

#[derive(thiserror::Error, Debug)]
pub enum SocketOpenError {
Expand All @@ -30,7 +30,7 @@ const WINDOWS_POLL_DURATION: Duration = Duration::from_millis(1);
/// code path to shut down the non-blocking polling
#[tracing::instrument]
pub async fn listen_socket(
path: AbsoluteSystemPathBuf,
path: &AbsoluteSystemPath,
#[allow(unused)] running: Arc<AtomicBool>,
) -> Result<
(
Expand Down Expand Up @@ -202,7 +202,7 @@ mod test {
pid_path.create_with_contents("100000").unwrap();

let running = Arc::new(AtomicBool::new(true));
let result = listen_socket(pid_path, running).await;
let result = listen_socket(&pid_path, running).await;

// Note: PidLock doesn't implement Debug, so we can't unwrap_err()

Expand Down Expand Up @@ -232,7 +232,7 @@ mod test {
.unwrap();

let running = Arc::new(AtomicBool::new(true));
let result = listen_socket(pid_path, running).await;
let result = listen_socket(&pid_path, running).await;

// Note: PidLock doesn't implement Debug, so we can't unwrap_err()

Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod server;

pub use client::{DaemonClient, DaemonError};
pub use connector::DaemonConnector;
pub use server::{CloseReason, DaemonServer};
pub use server::{serve, CloseReason};

pub(crate) mod proto {
tonic::include_proto!("turbodprotocol");
Expand Down
Loading

0 comments on commit eeb0b56

Please sign in to comment.