From a770337e27fb2bef9600adc74b473fefc2267d15 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Wed, 30 Aug 2023 13:21:41 -0700 Subject: [PATCH 1/9] Implement glob watching --- Cargo.lock | 23 +- crates/turborepo-filewatch/Cargo.toml | 2 +- crates/turborepo-filewatch/src/fsevent.rs | 4 +- crates/turborepo-filewatch/src/globwatcher.rs | 582 ++++++++++++++++++ crates/turborepo-filewatch/src/lib.rs | 1 + .../src/absolute_system_path.rs | 10 +- .../src/absolute_system_path_buf.rs | 19 +- .../turborepo-paths/src/relative_unix_path.rs | 6 + 8 files changed, 606 insertions(+), 41 deletions(-) create mode 100644 crates/turborepo-filewatch/src/globwatcher.rs diff --git a/Cargo.lock b/Cargo.lock index e3cb085d5189f..32ccb594f1e93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2494,15 +2494,6 @@ dependencies = [ "log", ] -[[package]] -name = "file-id" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13be71e6ca82e91bc0cb862bebaac0b2d1924a5a1d970c822b2f98b63fda8c3" -dependencies = [ - "winapi-util", -] - [[package]] name = "filetime" version = "0.2.22" @@ -4698,18 +4689,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "notify-debouncer-full" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416969970ec751a5d702a88c6cd19ac1332abe997fce43f96db0418550426241" -dependencies = [ - "file-id", - "notify 6.1.1", - "parking_lot", - "walkdir", -] - [[package]] name = "notify-debouncer-mini" version = "0.2.1" @@ -9977,7 +9956,6 @@ dependencies = [ "itertools", "libc", "notify 6.1.1", - "notify-debouncer-full", "tempfile", "thiserror", "tokio", @@ -9985,6 +9963,7 @@ dependencies = [ "tracing", "turbopath", "walkdir", + "wax", ] [[package]] diff --git a/crates/turborepo-filewatch/Cargo.toml b/crates/turborepo-filewatch/Cargo.toml index ef88e39f701f1..a18307e61684c 100644 --- a/crates/turborepo-filewatch/Cargo.toml +++ b/crates/turborepo-filewatch/Cargo.toml @@ -12,12 +12,12 @@ dashmap = { workspace = true } futures = { version = "0.3.26" } itertools = { workspace = true } notify = "6.0.1" -notify-debouncer-full = { version = "0.2.0", default-features = false } thiserror = "1.0.38" tokio = { workspace = true, features = ["full", "time"] } tracing = "0.1.37" turbopath = { workspace = true } walkdir = "2.3.3" +wax = { workspace = true } [target."cfg(target_os=\"macos\")".dependencies.fsevent-sys] optional = true diff --git a/crates/turborepo-filewatch/src/fsevent.rs b/crates/turborepo-filewatch/src/fsevent.rs index 4bca8d6263f55..6353ab8123ac9 100644 --- a/crates/turborepo-filewatch/src/fsevent.rs +++ b/crates/turborepo-filewatch/src/fsevent.rs @@ -480,7 +480,9 @@ impl FsEventWatcher { cur_runloop, cf::kCFRunLoopDefaultMode, ); - fs::FSEventStreamStart(stream); + if fs::FSEventStreamStart(stream) == 0x0 { + panic!("FSEventStream failed to start"); + } // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in // drop() diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs new file mode 100644 index 0000000000000..f75b92457ec28 --- /dev/null +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -0,0 +1,582 @@ +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + future::IntoFuture, +}; + +use notify::Event; +use thiserror::Error; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tracing::warn; +use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath}; +use wax::{Any, Glob, Pattern}; + +use crate::{ + cookie_jar::{CookieError, CookieJar}, + NotifyError, +}; + +type Hash = String; + +#[derive(Debug)] +pub struct GlobSet { + include: HashMap>, + exclude: Any<'static>, +} + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + CookieError(#[from] CookieError), + #[error("globwatcher has closed")] + Closed, +} + +impl From> for Error { + fn from(_: mpsc::error::SendError) -> Self { + Error::Closed + } +} + +impl From for Error { + fn from(_: oneshot::error::RecvError) -> Self { + Error::Closed + } +} + +pub struct GlobWatcher { + cookie_jar: CookieJar, + // _exit_ch exists to trigger a close on the receiver when an instance + // of this struct is dropped. The task that is receiving events will exit, + // dropping the other sender for the broadcast channel, causing all receivers + // to be notified of a close. + _exit_ch: oneshot::Sender<()>, + query_ch: mpsc::Sender, +} + +#[derive(Debug)] +pub enum Query { + WatchGlobs { + hash: Hash, + glob_set: GlobSet, + resp: oneshot::Sender>, + }, + GetChangedGlobs { + hash: Hash, + candidates: HashSet, + resp: oneshot::Sender, Error>>, + }, +} + +struct GlobTracker { + root: AbsoluteSystemPathBuf, + + /// maintains the list of to watch for a given hash + hash_globs: HashMap, + + /// maps a string glob to the compiled glob and the hashes for which this + /// glob hasn't changed + glob_statuses: HashMap, HashSet)>, + + exit_signal: oneshot::Receiver<()>, + + recv: broadcast::Receiver>, + + query_recv: mpsc::Receiver, +} + +impl GlobWatcher { + pub fn new( + root: &AbsoluteSystemPath, + cookie_jar: CookieJar, + recv: broadcast::Receiver>, + ) -> Self { + let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); + let (query_ch, query_recv) = mpsc::channel(256); + tokio::task::spawn( + GlobTracker::new(root.to_owned(), exit_signal, recv, query_recv).watch(), + ); + Self { + cookie_jar, + _exit_ch: exit_ch, + query_ch, + } + } + + pub async fn watch_globs(&self, hash: Hash, globs: GlobSet) -> Result<(), Error> { + self.cookie_jar.wait_for_cookie().await?; + let (tx, rx) = oneshot::channel(); + self.query_ch + .send(Query::WatchGlobs { + hash: hash, + glob_set: globs, + resp: tx, + }) + .await?; + rx.await? + } + + pub async fn get_changed_globs( + &self, + hash: Hash, + candidates: HashSet, + ) -> Result, Error> { + self.cookie_jar.wait_for_cookie().await?; + let (tx, rx) = oneshot::channel(); + self.query_ch + .send(Query::GetChangedGlobs { + hash, + candidates, + resp: tx, + }) + .await?; + rx.await? + } +} + +#[derive(Debug, Error)] +enum WatchError { + #[error(transparent)] + Recv(#[from] broadcast::error::RecvError), + #[error(transparent)] + Notify(#[from] NotifyError), +} + +impl GlobTracker { + fn new( + root: AbsoluteSystemPathBuf, + exit_signal: oneshot::Receiver<()>, + recv: broadcast::Receiver>, + query_recv: mpsc::Receiver, + ) -> Self { + Self { + root, + hash_globs: HashMap::new(), + glob_statuses: HashMap::new(), + exit_signal, + recv, + query_recv, + } + } + + async fn watch(mut self) { + loop { + tokio::select! { + _ = &mut self.exit_signal => return, + Some(query) = self.query_recv.recv().into_future() => { + match query { + Query::WatchGlobs { + hash, + glob_set, + resp + } => { + // Assume cookie handling has happened external to this component. + // Other tasks _could_ write to the + // same output directories, however we are relying on task + // execution dependencies to prevent that. + for (glob_str, glob) in glob_set.include.iter() { + let glob_str = glob_str.to_owned(); + match self.glob_statuses.entry(glob_str) { + Entry::Occupied(mut existing) => { + existing.get_mut().1.insert(hash.clone()); + }, + Entry::Vacant(vacancy) => { + let mut hashes = HashSet::new(); + hashes.insert(hash.clone()); + vacancy.insert((glob.clone(), hashes)); + } + } + } + self.hash_globs.insert(hash.clone(), glob_set); + let _ = resp.send(Ok(())); + }, + Query::GetChangedGlobs { + hash, + mut candidates, + resp + } => { + // Assume cookie handling has happened external to this component. + // Build a set of candidate globs that *may* have changed. + // An empty set translates to all globs have not changed. + if let Some(unchanged_globs) = self.hash_globs.get(&hash) { + candidates.retain(|glob_str| { + // We are keeping the globs from candidates that + // we don't have a record of as unchanged. + // If we do have a record, drop it from candidates. + !unchanged_globs.include.contains_key(glob_str) + }); + } + // If the client has gone away, we don't care about the error + let _ = resp.send(Ok(candidates)); + } + } + }, + file_event = self.recv.recv().into_future() => { + match file_event { + Err(broadcast::error::RecvError::Closed) => return, + Err(e @ broadcast::error::RecvError::Lagged(_)) => self.on_error(e.into()), + Ok(Err(error)) => self.on_error(error.into()), + Ok(Ok(file_event)) => { + for path in file_event.paths { + let path = AbsoluteSystemPathBuf::try_from(path).expect("filewatching should produce absolute paths"); + let Ok(to_match) = self.root.anchor(path) else { + // irrelevant filesystem update + continue; + }; + self.handle_path_change(&to_match.to_unix()); + } + } + } + } + } + } + } + + /// on_error takes the conservative approach of considering everything + /// changed in the event of any error related to filewatching + fn on_error(&mut self, err: WatchError) { + warn!( + "encountered filewatching error, flushing all globs: {}", + err + ); + self.hash_globs.clear(); + self.glob_statuses.clear(); + } + + fn handle_path_change(&mut self, path: &RelativeUnixPath) { + self.glob_statuses.retain(|glob_str, (glob, hash_globs)| { + // If this is not a match, we aren't modifying this glob, bail early and mark + // for retention. + if !glob.is_match(path) { + return true; + } + // We have a match. Check which hashes need invalidation. + hash_globs.retain(|hash| { + let Some(glob_set) = self.hash_globs.get_mut(hash) else { + return true; + }; + // If we match an exclusion, don't invalidate this hash + if glob_set.exclude.is_match(path) { + return true; + } + // We didn't match any exclusions. Safe to invalidate this hash + let remove_hash = if let Some(globs_for_hash) = self.hash_globs.get_mut(hash) { + globs_for_hash.include.remove(glob_str); + globs_for_hash.include.is_empty() + } else { + false + }; + // If we removed the last glob for this hash, drop the hash entirely + if remove_hash { + self.hash_globs.remove(hash); + } + false + }); + !hash_globs.is_empty() + }); + } +} + +#[cfg(test)] +mod test { + use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + time::Duration, + }; + + use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; + use wax::{any, Glob}; + + use crate::{ + cookie_jar::CookieJar, + globwatcher::{GlobSet, GlobWatcher}, + FileSystemWatcher, + }; + + fn temp_dir() -> (AbsoluteSystemPathBuf, tempfile::TempDir) { + let tmp = tempfile::tempdir().unwrap(); + let path = AbsoluteSystemPathBuf::try_from(tmp.path()) + .unwrap() + .to_realpath() + .unwrap(); + (path, tmp) + } + + fn setup(repo_root: &AbsoluteSystemPath) { + // Directory layout: + // / + // my-pkg/ + // irrelevant + // dist/ + // dist-file + // distChild/ + // child-file + // .next/ + // next-file + // cache/ + let pkg_path = repo_root.join_component("my-pkg"); + pkg_path.create_dir_all().unwrap(); + pkg_path + .join_component("irrelevant") + .create_with_contents("") + .unwrap(); + let dist_path = pkg_path.join_component("dist"); + dist_path.create_dir_all().unwrap(); + let dist_child_path = dist_path.join_component("distChild"); + dist_child_path.create_dir_all().unwrap(); + dist_child_path + .join_component("child-file") + .create_with_contents("") + .unwrap(); + dist_path + .join_component("dist-file") + .create_with_contents("") + .unwrap(); + let next_path = pkg_path.join_component(".next"); + next_path.create_dir_all().unwrap(); + next_path + .join_component("next-file") + .create_with_contents("") + .unwrap(); + next_path.join_component("cache").create_dir_all().unwrap(); + } + + fn make_includes(raw: &[&str]) -> HashMap> { + raw.iter() + .map(|raw_glob| { + ( + raw_glob.to_string(), + Glob::from_str(raw_glob).unwrap().to_owned(), + ) + }) + .collect() + } + + #[tokio::test] + async fn test_track_outputs() { + let (repo_root, _tmp_dir) = temp_dir(); + setup(&repo_root); + let (cookie_dir, _cookie_dir) = temp_dir(); + + let watcher = FileSystemWatcher::new(&repo_root).unwrap(); + let cookie_watcher = FileSystemWatcher::new(&cookie_dir).unwrap(); + let cookie_jar = CookieJar::new( + &cookie_dir, + Duration::from_secs(2), + cookie_watcher.subscribe(), + ); + + let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + + let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"]; + let raw_excludes = ["my-pkg/.next/cache/**"]; + let exclude = wax::any(raw_excludes).unwrap().to_owned(); + let globs = GlobSet { + include: make_includes(raw_includes), + exclude, + }; + + let hash = "the-hash".to_string(); + + glob_watcher.watch_globs(hash.clone(), globs).await.unwrap(); + + let candidates = HashSet::from_iter(raw_includes.iter().map(|s| s.to_string())); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + // Make an irrelevant change + repo_root + .join_components(&["my-pkg", "irrelevant"]) + .create_with_contents("some bytes") + .unwrap(); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + // Make an excluded change + repo_root + .join_components(&["my-pkg", ".next", "cache", "foo"]) + .create_with_contents("some bytes") + .unwrap(); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + // Make a relevant change + repo_root + .join_components(&["my-pkg", "dist", "foo"]) + .create_with_contents("some bytes") + .unwrap(); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + let expected = HashSet::from_iter(["my-pkg/dist/**".to_string()]); + assert_eq!(results, expected); + + // Change a file matching the other glob + repo_root + .join_components(&["my-pkg", ".next", "foo"]) + .create_with_contents("some bytes") + .unwrap(); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + let expected = + HashSet::from_iter(["my-pkg/dist/**".to_string(), "my-pkg/.next/**".to_string()]); + assert_eq!(results, expected); + } + + #[tokio::test] + async fn test_track_multiple_hashes() { + let (repo_root, _tmp_dir) = temp_dir(); + setup(&repo_root); + let (cookie_dir, _cookie_dir) = temp_dir(); + + let watcher = FileSystemWatcher::new(&repo_root).unwrap(); + let cookie_watcher = FileSystemWatcher::new(&cookie_dir).unwrap(); + let cookie_jar = CookieJar::new( + &cookie_dir, + Duration::from_secs(2), + cookie_watcher.subscribe(), + ); + + let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + + let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"]; + let raw_excludes: [&str; 0] = []; + let globs = GlobSet { + include: make_includes(raw_includes), + exclude: any(raw_excludes).unwrap(), + }; + + let hash = "the-hash".to_string(); + + glob_watcher.watch_globs(hash.clone(), globs).await.unwrap(); + + let candidates = HashSet::from_iter(raw_includes.iter().map(|s| s.to_string())); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + let second_raw_includes = &["my-pkg/.next/**"]; + let second_raw_excludes = ["my-pkg/.next/cache/**"]; + let second_globs = GlobSet { + include: make_includes(second_raw_includes), + exclude: any(second_raw_excludes).unwrap(), + }; + let second_hash = "the-second-hash".to_string(); + glob_watcher + .watch_globs(second_hash.clone(), second_globs) + .await + .unwrap(); + + let second_candidates = + HashSet::from_iter(second_raw_includes.iter().map(|s| s.to_string())); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + let results = glob_watcher + .get_changed_globs(second_hash.clone(), second_candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + // Make a change that is excluded in one of the hashes but not in the other + repo_root + .join_components(&["my-pkg", ".next", "cache", "foo"]) + .create_with_contents("hello") + .unwrap(); + // expect one changed glob for the first hash + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + let expected = HashSet::from_iter(["my-pkg/.next/**".to_string()]); + assert_eq!(results, expected); + + // The second hash which excludes the change should still not have any changed + // globs + let results = glob_watcher + .get_changed_globs(second_hash.clone(), second_candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + // Make a change for second_hash + repo_root + .join_components(&["my-pkg", ".next", "bar"]) + .create_with_contents("hello") + .unwrap(); + let results = glob_watcher + .get_changed_globs(second_hash.clone(), second_candidates.clone()) + .await + .unwrap(); + assert_eq!(results, second_candidates); + } + + #[tokio::test] + async fn test_watch_single_file() { + let (repo_root, _tmp_dir) = temp_dir(); + setup(&repo_root); + let (cookie_dir, _cookie_dir) = temp_dir(); + + let watcher = FileSystemWatcher::new(&repo_root).unwrap(); + let cookie_watcher = FileSystemWatcher::new(&cookie_dir).unwrap(); + let cookie_jar = CookieJar::new( + &cookie_dir, + Duration::from_secs(2), + cookie_watcher.subscribe(), + ); + + let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + + let raw_includes = &["my-pkg/.next/next-file"]; + let raw_excludes: [&str; 0] = []; + let globs = GlobSet { + include: make_includes(raw_includes), + exclude: any(raw_excludes).unwrap(), + }; + + let hash = "the-hash".to_string(); + + glob_watcher.watch_globs(hash.clone(), globs).await.unwrap(); + + // A change to an irrelevant file + repo_root + .join_components(&["my-pkg", ".next", "foo"]) + .create_with_contents("hello") + .unwrap(); + + let candidates = HashSet::from_iter(raw_includes.iter().map(|s| s.to_string())); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert!(results.is_empty()); + + // Change the watched file + repo_root + .join_components(&["my-pkg", ".next", "next-file"]) + .create_with_contents("hello") + .unwrap(); + let results = glob_watcher + .get_changed_globs(hash.clone(), candidates.clone()) + .await + .unwrap(); + assert_eq!(results, candidates); + } +} diff --git a/crates/turborepo-filewatch/src/lib.rs b/crates/turborepo-filewatch/src/lib.rs index c46557da5bc4f..f55197cc368a7 100644 --- a/crates/turborepo-filewatch/src/lib.rs +++ b/crates/turborepo-filewatch/src/lib.rs @@ -39,6 +39,7 @@ use { mod cookie_jar; #[cfg(target_os = "macos")] mod fsevent; +mod globwatcher; #[cfg(not(target_os = "macos"))] type Backend = RecommendedWatcher; diff --git a/crates/turborepo-paths/src/absolute_system_path.rs b/crates/turborepo-paths/src/absolute_system_path.rs index 08b6b12b21927..bde3ed9e8e479 100644 --- a/crates/turborepo-paths/src/absolute_system_path.rs +++ b/crates/turborepo-paths/src/absolute_system_path.rs @@ -7,7 +7,7 @@ use std::os::windows::fs::{symlink_dir, symlink_file}; use std::{ fmt, fs::{File, Metadata, OpenOptions, Permissions}, - io, + io::{self, Write}, path::{Path, PathBuf}, }; @@ -162,6 +162,14 @@ impl AbsoluteSystemPath { Ok(()) } + /// create_with_contents will create or truncate a file, then write the + /// given contents to it + pub fn create_with_contents>(&self, contents: B) -> Result<(), io::Error> { + let mut f = fs::File::create(&self.0)?; + f.write_all(contents.as_ref())?; + Ok(()) + } + pub fn remove_dir_all(&self) -> Result<(), io::Error> { fs::remove_dir_all(&self.0) } diff --git a/crates/turborepo-paths/src/absolute_system_path_buf.rs b/crates/turborepo-paths/src/absolute_system_path_buf.rs index 962d56896ae09..cceb700a9257e 100644 --- a/crates/turborepo-paths/src/absolute_system_path_buf.rs +++ b/crates/turborepo-paths/src/absolute_system_path_buf.rs @@ -185,12 +185,6 @@ impl AbsoluteSystemPathBuf { Ok(self.0.symlink_metadata()?.permissions().readonly()) } - pub fn create_with_contents>(&self, contents: B) -> Result<(), io::Error> { - let mut f = fs::File::create(self.0.as_path())?; - f.write_all(contents.as_ref())?; - Ok(()) - } - pub fn as_str(&self) -> &str { self.0.as_str() } @@ -217,11 +211,7 @@ impl TryFrom for AbsoluteSystemPathBuf { type Error = PathError; fn try_from(path: PathBuf) -> Result { - let path_str = path - .to_str() - .ok_or_else(|| PathError::InvalidUnicode(path.to_string_lossy().to_string()))?; - - Self::new(Utf8PathBuf::from(path_str)) + Self::new(Utf8PathBuf::try_from(path)?) } } @@ -229,11 +219,8 @@ impl TryFrom<&Path> for AbsoluteSystemPathBuf { type Error = PathError; fn try_from(path: &Path) -> Result { - let path_str = path - .to_str() - .ok_or_else(|| PathError::InvalidUnicode(path.to_string_lossy().to_string()))?; - - Self::new(Utf8PathBuf::from(path_str)) + let utf8_path: &Utf8Path = path.try_into()?; + Self::new(utf8_path.to_owned()) } } diff --git a/crates/turborepo-paths/src/relative_unix_path.rs b/crates/turborepo-paths/src/relative_unix_path.rs index aece72a134a09..05c408b4ca573 100644 --- a/crates/turborepo-paths/src/relative_unix_path.rs +++ b/crates/turborepo-paths/src/relative_unix_path.rs @@ -84,6 +84,12 @@ impl AsRef for RelativeUnixPath { } } +impl<'a> From<&'a RelativeUnixPath> for wax::CandidatePath<'a> { + fn from(path: &'a RelativeUnixPath) -> Self { + path.0.into() + } +} + #[cfg(test)] mod test { use super::*; From b6723f2be47a7fef1091fcc29e71b114d80475e3 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Tue, 5 Sep 2023 15:23:06 -0700 Subject: [PATCH 2/9] use .git as a cookie_dir --- crates/turborepo-filewatch/src/globwatcher.rs | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index f75b92457ec28..fc664c14eb3e9 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -305,6 +305,7 @@ mod test { fn setup(repo_root: &AbsoluteSystemPath) { // Directory layout: // / + // .git/ // my-pkg/ // irrelevant // dist/ @@ -314,6 +315,7 @@ mod test { // .next/ // next-file // cache/ + repo_root.join_component(".git").create_dir_all().unwrap(); let pkg_path = repo_root.join_component("my-pkg"); pkg_path.create_dir_all().unwrap(); pkg_path @@ -356,15 +358,10 @@ mod test { async fn test_track_outputs() { let (repo_root, _tmp_dir) = temp_dir(); setup(&repo_root); - let (cookie_dir, _cookie_dir) = temp_dir(); + let cookie_dir = repo_root.join_component(".git"); let watcher = FileSystemWatcher::new(&repo_root).unwrap(); - let cookie_watcher = FileSystemWatcher::new(&cookie_dir).unwrap(); - let cookie_jar = CookieJar::new( - &cookie_dir, - Duration::from_secs(2), - cookie_watcher.subscribe(), - ); + let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe()); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); @@ -439,15 +436,10 @@ mod test { async fn test_track_multiple_hashes() { let (repo_root, _tmp_dir) = temp_dir(); setup(&repo_root); - let (cookie_dir, _cookie_dir) = temp_dir(); + let cookie_dir = repo_root.join_component(".git"); let watcher = FileSystemWatcher::new(&repo_root).unwrap(); - let cookie_watcher = FileSystemWatcher::new(&cookie_dir).unwrap(); - let cookie_jar = CookieJar::new( - &cookie_dir, - Duration::from_secs(2), - cookie_watcher.subscribe(), - ); + let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe()); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); From 64ab09559f181c8600cf700622d21b241fa1b155 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Wed, 6 Sep 2023 11:59:24 -0700 Subject: [PATCH 3/9] Review feedback --- crates/turborepo-filewatch/src/globwatcher.rs | 133 +++++++++--------- 1 file changed, 68 insertions(+), 65 deletions(-) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index fc664c14eb3e9..d26aa44cbcfc1 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -158,75 +158,78 @@ impl GlobTracker { } } + fn handle_query(&mut self, query: Query) { + match query { + Query::WatchGlobs { + hash, + glob_set, + resp, + } => { + // Assume cookie handling has happened external to this component. + // Other tasks _could_ write to the + // same output directories, however we are relying on task + // execution dependencies to prevent that. + for (glob_str, glob) in glob_set.include.iter() { + let glob_str = glob_str.to_owned(); + let (_, hashes) = self + .glob_statuses + .entry(glob_str) + .or_insert_with(|| (glob.clone(), HashSet::new())); + hashes.insert(hash.clone()); + } + self.hash_globs.insert(hash.clone(), glob_set); + let _ = resp.send(Ok(())); + } + Query::GetChangedGlobs { + hash, + mut candidates, + resp, + } => { + // Assume cookie handling has happened external to this component. + // Build a set of candidate globs that *may* have changed. + // An empty set translates to all globs have not changed. + if let Some(unchanged_globs) = self.hash_globs.get(&hash) { + candidates.retain(|glob_str| { + // We are keeping the globs from candidates that + // we don't have a record of as unchanged. + // If we do have a record, drop it from candidates. + !unchanged_globs.include.contains_key(glob_str) + }); + } + // If the client has gone away, we don't care about the error + let _ = resp.send(Ok(candidates)); + } + } + } + + fn handle_file_event( + &mut self, + file_event: Result, broadcast::error::RecvError>, + ) { + match file_event { + Err(broadcast::error::RecvError::Closed) => return, + Err(e @ broadcast::error::RecvError::Lagged(_)) => self.on_error(e.into()), + Ok(Err(error)) => self.on_error(error.into()), + Ok(Ok(file_event)) => { + for path in file_event.paths { + let path = AbsoluteSystemPathBuf::try_from(path) + .expect("filewatching should produce absolute paths"); + let Ok(to_match) = self.root.anchor(path) else { + // irrelevant filesystem update + return; + }; + self.handle_path_change(&to_match.to_unix()); + } + } + } + } + async fn watch(mut self) { loop { tokio::select! { _ = &mut self.exit_signal => return, - Some(query) = self.query_recv.recv().into_future() => { - match query { - Query::WatchGlobs { - hash, - glob_set, - resp - } => { - // Assume cookie handling has happened external to this component. - // Other tasks _could_ write to the - // same output directories, however we are relying on task - // execution dependencies to prevent that. - for (glob_str, glob) in glob_set.include.iter() { - let glob_str = glob_str.to_owned(); - match self.glob_statuses.entry(glob_str) { - Entry::Occupied(mut existing) => { - existing.get_mut().1.insert(hash.clone()); - }, - Entry::Vacant(vacancy) => { - let mut hashes = HashSet::new(); - hashes.insert(hash.clone()); - vacancy.insert((glob.clone(), hashes)); - } - } - } - self.hash_globs.insert(hash.clone(), glob_set); - let _ = resp.send(Ok(())); - }, - Query::GetChangedGlobs { - hash, - mut candidates, - resp - } => { - // Assume cookie handling has happened external to this component. - // Build a set of candidate globs that *may* have changed. - // An empty set translates to all globs have not changed. - if let Some(unchanged_globs) = self.hash_globs.get(&hash) { - candidates.retain(|glob_str| { - // We are keeping the globs from candidates that - // we don't have a record of as unchanged. - // If we do have a record, drop it from candidates. - !unchanged_globs.include.contains_key(glob_str) - }); - } - // If the client has gone away, we don't care about the error - let _ = resp.send(Ok(candidates)); - } - } - }, - file_event = self.recv.recv().into_future() => { - match file_event { - Err(broadcast::error::RecvError::Closed) => return, - Err(e @ broadcast::error::RecvError::Lagged(_)) => self.on_error(e.into()), - Ok(Err(error)) => self.on_error(error.into()), - Ok(Ok(file_event)) => { - for path in file_event.paths { - let path = AbsoluteSystemPathBuf::try_from(path).expect("filewatching should produce absolute paths"); - let Ok(to_match) = self.root.anchor(path) else { - // irrelevant filesystem update - continue; - }; - self.handle_path_change(&to_match.to_unix()); - } - } - } - } + Some(query) = self.query_recv.recv().into_future() => self.handle_query(query), + file_event = self.recv.recv().into_future() => self.handle_file_event(file_event) } } } From 7dc9c5f7daabf7763af1622e096f63c9afef77c1 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Wed, 6 Sep 2023 16:04:05 -0700 Subject: [PATCH 4/9] Use correct cookie dir for globwatching test --- crates/turborepo-filewatch/src/globwatcher.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index d26aa44cbcfc1..d11c0af5b99e7 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -527,14 +527,13 @@ mod test { async fn test_watch_single_file() { let (repo_root, _tmp_dir) = temp_dir(); setup(&repo_root); - let (cookie_dir, _cookie_dir) = temp_dir(); + let cookie_dir = repo_root.join_component(".git"); let watcher = FileSystemWatcher::new(&repo_root).unwrap(); - let cookie_watcher = FileSystemWatcher::new(&cookie_dir).unwrap(); let cookie_jar = CookieJar::new( &cookie_dir, Duration::from_secs(2), - cookie_watcher.subscribe(), + watcher.subscribe(), ); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); From 86c59eed3ddc1e657ff84393b9d3c82538633967 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 7 Sep 2023 13:23:53 -0700 Subject: [PATCH 5/9] Extract constant for Boolean false --- crates/turborepo-filewatch/src/fsevent.rs | 6 +++++- crates/turborepo-filewatch/src/globwatcher.rs | 6 +----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/turborepo-filewatch/src/fsevent.rs b/crates/turborepo-filewatch/src/fsevent.rs index 6353ab8123ac9..477fb44c7e44c 100644 --- a/crates/turborepo-filewatch/src/fsevent.rs +++ b/crates/turborepo-filewatch/src/fsevent.rs @@ -30,6 +30,7 @@ use std::{ thread, }; +use fs::core_foundation::Boolean; use fsevent_sys as fs; use fsevent_sys::core_foundation as cf; use notify::{ @@ -281,6 +282,9 @@ extern "C" { fn CFRunLoopIsWaiting(runloop: cf::CFRunLoopRef) -> cf::Boolean; } +// CoreFoundation false value +const FALSE: Boolean = 0x0; + impl FsEventWatcher { fn from_event_handler(event_handler: Arc>) -> Result { Ok(FsEventWatcher { @@ -480,7 +484,7 @@ impl FsEventWatcher { cur_runloop, cf::kCFRunLoopDefaultMode, ); - if fs::FSEventStreamStart(stream) == 0x0 { + if fs::FSEventStreamStart(stream) == FALSE { panic!("FSEventStream failed to start"); } diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index d11c0af5b99e7..4fd6661f9c68c 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -530,11 +530,7 @@ mod test { let cookie_dir = repo_root.join_component(".git"); let watcher = FileSystemWatcher::new(&repo_root).unwrap(); - let cookie_jar = CookieJar::new( - &cookie_dir, - Duration::from_secs(2), - watcher.subscribe(), - ); + let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe()); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); From 0479a030632f2da9ae1f7d779886f061b551c187 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 7 Sep 2023 14:55:49 -0700 Subject: [PATCH 6/9] Switch to matching to support windows --- crates/turborepo-filewatch/src/cookie_jar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/turborepo-filewatch/src/cookie_jar.rs b/crates/turborepo-filewatch/src/cookie_jar.rs index fe46a7b5dd4fc..03e69673bc492 100644 --- a/crates/turborepo-filewatch/src/cookie_jar.rs +++ b/crates/turborepo-filewatch/src/cookie_jar.rs @@ -121,7 +121,7 @@ async fn watch_cookies( event = file_events.recv() => { match flatten_event(event) { Ok(event) => { - if event.kind == EventKind::Create(notify::event::CreateKind::File) { + if matches!(event.kind, EventKind::Create(_)) { let mut watches = watches.lock().expect("mutex poisoned"); for path in event.paths { let abs_path: &AbsoluteSystemPath = path From 9c8e044476176a5abcc256529d349e94356b5499 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 7 Sep 2023 15:38:42 -0700 Subject: [PATCH 7/9] Clippy cleanup --- crates/turborepo-filewatch/src/globwatcher.rs | 6 +++--- crates/turborepo-paths/src/absolute_system_path_buf.rs | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 4fd6661f9c68c..15e33dbfbd642 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, future::IntoFuture, }; @@ -107,7 +107,7 @@ impl GlobWatcher { let (tx, rx) = oneshot::channel(); self.query_ch .send(Query::WatchGlobs { - hash: hash, + hash, glob_set: globs, resp: tx, }) @@ -207,7 +207,7 @@ impl GlobTracker { file_event: Result, broadcast::error::RecvError>, ) { match file_event { - Err(broadcast::error::RecvError::Closed) => return, + Err(broadcast::error::RecvError::Closed) => (), Err(e @ broadcast::error::RecvError::Lagged(_)) => self.on_error(e.into()), Ok(Err(error)) => self.on_error(error.into()), Ok(Ok(file_event)) => { diff --git a/crates/turborepo-paths/src/absolute_system_path_buf.rs b/crates/turborepo-paths/src/absolute_system_path_buf.rs index cceb700a9257e..b481b73b8628e 100644 --- a/crates/turborepo-paths/src/absolute_system_path_buf.rs +++ b/crates/turborepo-paths/src/absolute_system_path_buf.rs @@ -1,7 +1,6 @@ use std::{ borrow::Borrow, - fmt, - io::{self, Write}, + fmt, io, ops::Deref, path::{Path, PathBuf}, }; From 71c43e5435682831f6acfcf3f72a94a3889ba200 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 8 Sep 2023 13:19:41 -0700 Subject: [PATCH 8/9] Review feedback --- crates/turborepo-filewatch/src/globwatcher.rs | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 15e33dbfbd642..21cc918da87cf 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -246,36 +246,37 @@ impl GlobTracker { } fn handle_path_change(&mut self, path: &RelativeUnixPath) { - self.glob_statuses.retain(|glob_str, (glob, hash_globs)| { - // If this is not a match, we aren't modifying this glob, bail early and mark - // for retention. - if !glob.is_match(path) { - return true; - } - // We have a match. Check which hashes need invalidation. - hash_globs.retain(|hash| { - let Some(glob_set) = self.hash_globs.get_mut(hash) else { - return true; - }; - // If we match an exclusion, don't invalidate this hash - if glob_set.exclude.is_match(path) { + self.glob_statuses + .retain(|glob_str, (glob, hashes_for_glob)| { + // If this is not a match, we aren't modifying this glob, bail early and mark + // for retention. + if !glob.is_match(path) { return true; } - // We didn't match any exclusions. Safe to invalidate this hash - let remove_hash = if let Some(globs_for_hash) = self.hash_globs.get_mut(hash) { - globs_for_hash.include.remove(glob_str); - globs_for_hash.include.is_empty() - } else { + // We have a match. Check which hashes need invalidation. + hashes_for_glob.retain(|hash| { + let Some(glob_set) = self.hash_globs.get_mut(hash) else { + // This shouldn't ever happen, but if we aren't tracking this hash at + // all, we don't need to keep it in the set of hashes that are relevant + // for this glob. + return false; + }; + // If we match an exclusion, don't invalidate this hash + if glob_set.exclude.is_match(path) { + return true; + } + // We didn't match an exclusion, we can remove this glob + glob_set.include.remove(glob_str); + + // We removed the last include, we can stop tracking this hash + if glob_set.include.is_empty() { + self.hash_globs.remove(hash); + } + false - }; - // If we removed the last glob for this hash, drop the hash entirely - if remove_hash { - self.hash_globs.remove(hash); - } - false + }); + !hashes_for_glob.is_empty() }); - !hash_globs.is_empty() - }); } } From 30193e685a5a05715d6debabf3e172891e7aca20 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Mon, 11 Sep 2023 10:03:06 -0700 Subject: [PATCH 9/9] Add debug_assert --- crates/turborepo-filewatch/src/globwatcher.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 21cc918da87cf..4b953142c4b44 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -259,6 +259,11 @@ impl GlobTracker { // This shouldn't ever happen, but if we aren't tracking this hash at // all, we don't need to keep it in the set of hashes that are relevant // for this glob. + debug_assert!( + false, + "A glob is referencing a hash that we are not tracking. This is most \ + likely an internal bookkeeping error in globwatcher.rs" + ); return false; }; // If we match an exclusion, don't invalidate this hash