Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perform invalidation directly on writes #5786

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions crates/turbo-tasks-fs/src/invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,42 @@ impl InvalidationReasonKind for WatchStartKind {
)
}
}

/// Invalidation was caused by a write operation on the filesystem
#[derive(PartialEq, Eq, Hash)]
pub struct Write {
pub path: String,
}

impl InvalidationReason for Write {
fn kind(&self) -> Option<StaticOrArc<dyn InvalidationReasonKind>> {
Some(StaticOrArc::Static(&WRITE_KIND))
}
}

impl Display for Write {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{} written", self.path)
}
}

/// Invalidation kind for [Write]
#[derive(PartialEq, Eq, Hash)]
struct WriteKind;

static WRITE_KIND: WriteKind = WriteKind;

impl InvalidationReasonKind for WriteKind {
fn fmt(
&self,
reasons: &IndexSet<StaticOrArc<dyn InvalidationReason>>,
f: &mut Formatter<'_>,
) -> std::fmt::Result {
write!(
f,
"{} files written ({}, ...)",
reasons.len(),
reasons[0].as_any().downcast_ref::<Write>().unwrap().path
)
}
}
60 changes: 54 additions & 6 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ use turbo_tasks_hash::hash_xxh3_hash64;
use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys};
pub use virtual_fs::VirtualFileSystem;

use self::{invalidation::WatchStart, json::UnparseableJson, mutex_map::MutexMap};
use self::{
invalidation::{WatchStart, Write},
json::UnparseableJson,
mutex_map::MutexMap,
};
use crate::{
attach::AttachedFileSystem,
invalidation::WatchChange,
Expand Down Expand Up @@ -200,6 +204,20 @@ impl DiskFileSystem {
Ok(())
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function. It removes and returns
/// the current list of invalidators.
fn register_sole_invalidator(&self, path: &Path) -> Result<HashSet<Invalidator>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be merged with register_invalidator. The difference seems to be whether it returns the old value, which could just be discarded by the callers of the old register_invalidator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register_invalidator doesn't remove the old_invalidators. It only adds an additional invalidator.

register_sole_invalidator removes the old_invalidators. Basically replacing the list with only that one invalidator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we get two different behaviors out of these two pieces of code:

# inserts a new invalidator
self.invalidator_map.insert(path_to_key(path), invalidator);

# replaces old invalidators
let mut invalidator_map = self.invalidator_map.lock().unwrap();
let old_invalidators = invalidator_map.insert(path_to_key(path), [invalidator].into());

They're both calling map.insert(key), so how does one append and one replaces?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are different insert methods. InvalidatorMap::insert adds a single invalidator to the list.

HashMap::insert replaces the current list with a different list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't realize that lock wasn't itself a mutex, it was a method returning the mutex guarded HashMap.

let invalidator = turbo_tasks::get_invalidator();
let mut invalidator_map = self.invalidator_map.lock().unwrap();
let old_invalidators = invalidator_map.insert(path_to_key(path), [invalidator].into());
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
}
Ok(old_invalidators.unwrap_or_default())
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function
fn register_dir_invalidator(&self, path: &Path) -> Result<()> {
Expand Down Expand Up @@ -487,14 +505,33 @@ impl DiskFileSystem {
path.join(&*unix_to_sys(&fs_path.path))
})
}

fn invalidate_from_write(&self, full_path: &Path, invalidators: HashSet<Invalidator>) {
if !invalidators.is_empty() {
if let Some(path) = format_absolute_fs_path(full_path, &self.name, self.root_path()) {
if invalidators.len() == 1 {
let invalidator = invalidators.into_iter().next().unwrap();
invalidator.invalidate_with_reason(Write { path });
jridgewell marked this conversation as resolved.
Show resolved Hide resolved
} else {
invalidators.into_iter().for_each(|invalidator| {
invalidator.invalidate_with_reason(Write { path: path.clone() });
});
}
} else {
invalidators.into_iter().for_each(|invalidator| {
invalidator.invalidate();
});
}
}
}
}

struct PathLockGuard<'a>(
RwLockReadGuard<'a, ()>,
mutex_map::MutexMapGuard<'a, PathBuf>,
);

fn format_absolute_fs_path(path: &Path, name: &str, root_path: &PathBuf) -> Option<String> {
fn format_absolute_fs_path(path: &Path, name: &str, root_path: &Path) -> Option<String> {
let path = if let Ok(rel_path) = path.strip_prefix(root_path) {
let path = if MAIN_SEPARATOR != '/' {
let rel_path = rel_path.to_string_lossy().replace(MAIN_SEPARATOR, "/");
Expand Down Expand Up @@ -710,22 +747,29 @@ impl FileSystem for DiskFileSystem {
let full_path = self.to_sys_path(fs_path).await?;
let content = content.await?;

// Track the file, so that we will rewrite it if it ever changes.
fs_path.track().await?;

let _lock = self.lock_path(&full_path).await;

// Track the file, so that we will rewrite it if it ever changes.
let old_invalidators = self.register_sole_invalidator(&full_path)?;

// We perform an untracked comparison here, so that this write is not dependent
// on a read's Vc<FileContent> (and the memory it holds). Our untracked read can
// be freed immediately. Given this is an output file, it's unlikely any Turbo
// code will need to read the file from disk into a Vc<FileContent>, so we're
// not wasting cycles.
let compare = content.streaming_compare(full_path.clone()).await?;
if compare == FileComparison::Equal {
if !old_invalidators.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: comment why we're preserving the old invalidators. Especially because it seems like we're trying to emit the same contents from multiple tasks, shouldn't that be an error, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old_invalidators can also contain invalidators from read methods. It's not write-only.

We don't want to invalidate read calls when writing identical content to a file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to invalidate read calls when writing identical content to a file.

But that means there are two owners of the file. Shouldn't that be an error, why aren't they reusing the same write cell?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, but old_invalidators.is_empty() is not related to that case, since the invalidator map stores read and write invalidators. Having old_invalidators is valid, since they might be invalidators from read methods.

let key = path_to_key(&full_path);
for i in old_invalidators {
self.invalidator_map.insert(key.clone(), i);
}
}
return Ok(Completion::unchanged());
}

let create_directory = compare == FileComparison::Create;

match &*content {
FileContent::Content(file) => {
if create_directory {
Expand Down Expand Up @@ -769,6 +813,8 @@ impl FileSystem for DiskFileSystem {
}
}

self.invalidate_from_write(&full_path, old_invalidators);

Ok(Completion::new())
}

Expand All @@ -779,6 +825,8 @@ impl FileSystem for DiskFileSystem {
target: Vc<LinkContent>,
) -> Result<Vc<Completion>> {
let full_path = self.to_sys_path(fs_path).await?;
// TODO(sokra) preform a untracked read here, register an invalidator and get
// all existing invalidators
let old_content = fs_path
.read_link()
.await
Expand Down Expand Up @@ -834,7 +882,7 @@ impl FileSystem for DiskFileSystem {
return Err(anyhow!("invalid symlink target: {}", full_path.display()));
}
LinkContent::NotFound => {
retry_future(|| fs::remove_file(full_path.clone()))
retry_future(|| fs::remove_file(&full_path))
.await
.or_else(|err| {
if err.kind() == ErrorKind::NotFound {
Expand Down
4 changes: 2 additions & 2 deletions crates/turbo-tasks-fs/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use tokio::task::spawn_blocking;

const MAX_RETRY_ATTEMPTS: usize = 10;

pub(crate) async fn retry_future<R, F, Fut>(func: F) -> io::Result<R>
pub(crate) async fn retry_future<'a, R, F, Fut>(func: F) -> io::Result<R>
where
F: FnMut() -> Fut + Unpin,
Fut: Future<Output = io::Result<R>>,
Fut: Future<Output = io::Result<R>> + 'a,
{
match FutureRetry::new(
func,
Expand Down
Loading