Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RegistryData::load to handle management of the index cache #10482

Merged
merged 1 commit into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 92 additions & 85 deletions src/cargo/sources/registry/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

use crate::core::dependency::Dependency;
use crate::core::{PackageId, SourceId, Summary};
use crate::sources::registry::{RegistryData, RegistryPackage, INDEX_V_MAX};
use crate::sources::registry::{LoadResponse, RegistryData, RegistryPackage, INDEX_V_MAX};
use crate::util::interning::InternedString;
use crate::util::{internal, CargoResult, Config, Filesystem, OptVersionReq, ToSemver};
use anyhow::bail;
Expand Down Expand Up @@ -247,6 +247,7 @@ pub struct IndexSummary {
#[derive(Default)]
struct SummariesCache<'a> {
versions: Vec<(Version, &'a [u8])>,
index_version: &'a str,
}

impl<'cfg> RegistryIndex<'cfg> {
Expand Down Expand Up @@ -358,7 +359,6 @@ impl<'cfg> RegistryIndex<'cfg> {

let root = load.assert_index_locked(&self.path);
let cache_root = root.join(".cache");
let index_version = load.current_version();

// See module comment in `registry/mod.rs` for why this is structured
// the way it is.
Expand All @@ -376,7 +376,6 @@ impl<'cfg> RegistryIndex<'cfg> {
// along the way produce helpful "did you mean?" suggestions.
for (i, path) in UncanonicalizedIter::new(&raw_path).take(1024).enumerate() {
let summaries = Summaries::parse(
index_version.as_deref(),
root,
&cache_root,
path.as_ref(),
Expand Down Expand Up @@ -559,7 +558,6 @@ impl Summaries {
/// * `load` - the actual index implementation which may be very slow to
/// call. We avoid this if we can.
pub fn parse(
index_version: Option<&str>,
root: &Path,
cache_root: &Path,
relative: &Path,
Expand All @@ -571,88 +569,101 @@ impl Summaries {
// of reasons, but consider all of them non-fatal and just log their
// occurrence in case anyone is debugging anything.
let cache_path = cache_root.join(relative);
let mut cache_contents = None;
if let Some(index_version) = index_version {
match fs::read(&cache_path) {
Ok(contents) => match Summaries::parse_cache(contents, index_version) {
Ok(s) => {
log::debug!("fast path for registry cache of {:?}", relative);
if cfg!(debug_assertions) {
cache_contents = Some(s.raw_data);
} else {
return Poll::Ready(Ok(Some(s)));
}
}
Err(e) => {
log::debug!("failed to parse {:?} cache: {}", relative, e);
}
},
Err(e) => log::debug!("cache missing for {:?} error: {}", relative, e),
}
let mut cached_summaries = None;
let mut index_version = None;
match fs::read(&cache_path) {
Ok(contents) => match Summaries::parse_cache(contents) {
Ok((s, v)) => {
cached_summaries = Some(s);
index_version = Some(v);
}
Err(e) => {
log::debug!("failed to parse {:?} cache: {}", relative, e);
}
},
Err(e) => log::debug!("cache missing for {:?} error: {}", relative, e),
}

// This is the fallback path where we actually talk to the registry backend to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
let mut response = load.load(root, relative, index_version.as_deref())?;
// In debug builds, perform a second load without the cache so that
// we can validate that the cache is correct.
if cfg!(debug_assertions) && matches!(response, Poll::Ready(LoadResponse::CacheValid)) {
response = load.load(root, relative, None)?;
Eh2406 marked this conversation as resolved.
Show resolved Hide resolved
}
let response = match response {
Poll::Pending => return Poll::Pending,
Poll::Ready(response) => response,
};

let mut bytes_to_cache = None;
let mut version_to_cache = None;
let mut ret = Summaries::default();
let mut hit_closure = false;
let mut cache_bytes = None;
let result = load.load(root, relative, &mut |contents| {
ret.raw_data = contents.to_vec();
let mut cache = SummariesCache::default();
hit_closure = true;
for line in split(contents, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
// This should only happen when there is an index
// entry from a future version of cargo that this
// version doesn't understand. Hopefully, those future
// versions of cargo correctly set INDEX_V_MAX and
// CURRENT_CACHE_VERSION, otherwise this will skip
// entries in the cache preventing those newer
// versions from reading them (that is, until the
// cache is rebuilt).
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
match response {
LoadResponse::CacheValid => {
log::debug!("fast path for registry cache of {:?}", relative);
return Poll::Ready(Ok(cached_summaries));
}
if let Some(index_version) = index_version {
cache_bytes = Some(cache.serialize(index_version));
LoadResponse::NotFound => {
debug_assert!(cached_summaries.is_none());
return Poll::Ready(Ok(None));
}
LoadResponse::Data {
raw_data,
index_version,
} => {
// This is the fallback path where we actually talk to the registry backend to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
let mut cache = SummariesCache::default();
ret.raw_data = raw_data;
for line in split(&ret.raw_data, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
// This should only happen when there is an index
// entry from a future version of cargo that this
// version doesn't understand. Hopefully, those future
// versions of cargo correctly set INDEX_V_MAX and
// CURRENT_CACHE_VERSION, otherwise this will skip
// entries in the cache preventing those newer
// versions from reading them (that is, until the
// cache is rebuilt).
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
}
if let Some(index_version) = index_version {
bytes_to_cache = Some(cache.serialize(index_version.as_str()));
version_to_cache = Some(index_version);
}
}
Ok(())
});

if result?.is_pending() {
assert!(!hit_closure);
return Poll::Pending;
}

if !hit_closure {
debug_assert!(cache_contents.is_none());
return Poll::Ready(Ok(None));
}

// If we've got debug assertions enabled and the cache was previously
// present and considered fresh this is where the debug assertions
// actually happens to verify that our cache is indeed fresh and
// computes exactly the same value as before.
if cfg!(debug_assertions) && cache_contents.is_some() && cache_bytes != cache_contents {
let cache_contents = cached_summaries.as_ref().map(|s| &s.raw_data);
if cfg!(debug_assertions)
&& index_version.as_deref() == version_to_cache.as_deref()
&& cached_summaries.is_some()
&& bytes_to_cache.as_ref() != cache_contents
{
panic!(
"original cache contents:\n{:?}\n\
does not equal new cache contents:\n{:?}\n",
cache_contents.as_ref().map(|s| String::from_utf8_lossy(s)),
cache_bytes.as_ref().map(|s| String::from_utf8_lossy(s)),
bytes_to_cache.as_ref().map(|s| String::from_utf8_lossy(s)),
);
}

Expand All @@ -662,7 +673,7 @@ impl Summaries {
//
// This is opportunistic so we ignore failure here but are sure to log
// something in case of error.
if let Some(cache_bytes) = cache_bytes {
if let Some(cache_bytes) = bytes_to_cache {
if paths::create_dir_all(cache_path.parent().unwrap()).is_ok() {
let path = Filesystem::new(cache_path.clone());
config.assert_package_cache_locked(&path);
Expand All @@ -677,16 +688,17 @@ impl Summaries {

/// Parses an open `File` which represents information previously cached by
/// Cargo.
pub fn parse_cache(contents: Vec<u8>, last_index_update: &str) -> CargoResult<Summaries> {
let cache = SummariesCache::parse(&contents, last_index_update)?;
pub fn parse_cache(contents: Vec<u8>) -> CargoResult<(Summaries, InternedString)> {
let cache = SummariesCache::parse(&contents)?;
let index_version = InternedString::new(cache.index_version);
let mut ret = Summaries::default();
for (version, summary) in cache.versions {
let (start, end) = subslice_bounds(&contents, summary);
ret.versions
.insert(version, MaybeIndexSummary::Unparsed { start, end });
}
ret.raw_data = contents;
return Ok(ret);
return Ok((ret, index_version));

// Returns the start/end offsets of `inner` with `outer`. Asserts that
// `inner` is a subslice of `outer`.
Expand Down Expand Up @@ -742,7 +754,7 @@ impl Summaries {
const CURRENT_CACHE_VERSION: u8 = 3;

impl<'a> SummariesCache<'a> {
fn parse(data: &'a [u8], last_index_update: &str) -> CargoResult<SummariesCache<'a>> {
fn parse(data: &'a [u8]) -> CargoResult<SummariesCache<'a>> {
// NB: keep this method in sync with `serialize` below
Eh2406 marked this conversation as resolved.
Show resolved Hide resolved
let (first_byte, rest) = data
.split_first()
Expand All @@ -764,18 +776,13 @@ impl<'a> SummariesCache<'a> {
let rest = &rest[4..];

let mut iter = split(rest, 0);
if let Some(update) = iter.next() {
if update != last_index_update.as_bytes() {
bail!(
"cache out of date: current index ({}) != cache ({})",
last_index_update,
str::from_utf8(update)?,
)
}
let last_index_update = if let Some(update) = iter.next() {
str::from_utf8(update)?
} else {
bail!("malformed file");
}
};
let mut ret = SummariesCache::default();
ret.index_version = last_index_update;
while let Some(version) = iter.next() {
let version = str::from_utf8(version)?;
let version = Version::parse(version)?;
Expand Down
16 changes: 7 additions & 9 deletions src/cargo/sources/registry/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::core::PackageId;
use crate::sources::registry::{MaybeLock, RegistryConfig, RegistryData};
use crate::sources::registry::{LoadResponse, MaybeLock, RegistryConfig, RegistryData};
use crate::util::errors::CargoResult;
use crate::util::interning::InternedString;
use crate::util::{Config, Filesystem};
use cargo_util::{paths, Sha256};
use std::fs::File;
Expand Down Expand Up @@ -48,18 +47,17 @@ impl<'cfg> RegistryData for LocalRegistry<'cfg> {
path.as_path_unlocked()
}

fn current_version(&self) -> Option<InternedString> {
None
}

fn load(
&self,
root: &Path,
path: &Path,
data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
) -> Poll<CargoResult<()>> {
_index_version: Option<&str>,
) -> Poll<CargoResult<LoadResponse>> {
if self.updated {
Poll::Ready(Ok(data(&paths::read_bytes(&root.join(path))?)?))
Poll::Ready(Ok(LoadResponse::Data {
raw_data: paths::read_bytes(&root.join(path))?,
index_version: None,
}))
} else {
Poll::Pending
}
Expand Down
31 changes: 17 additions & 14 deletions src/cargo/sources/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,20 @@ impl<'a> RegistryDependency<'a> {
}
}

pub enum LoadResponse {
/// The cache is valid. The cached data should be used.
CacheValid,

/// The cache is out of date. Returned data should be used.
Data {
raw_data: Vec<u8>,
index_version: Option<String>,
},

/// The requested crate was found.
NotFound,
}

/// An abstract interface to handle both a local (see `local::LocalRegistry`)
/// and remote (see `remote::RemoteRegistry`) registry.
///
Expand All @@ -432,15 +446,13 @@ pub trait RegistryData {
///
/// * `root` is the root path to the index.
/// * `path` is the relative path to the package to load (like `ca/rg/cargo`).
/// * `data` is a callback that will receive the raw bytes of the index JSON file.
///
/// If `load` returns a `Poll::Pending` then it must not have called data.
/// * `index_version` is the version of the requested crate data currently in cache.
fn load(
&self,
root: &Path,
path: &Path,
data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
) -> Poll<CargoResult<()>>;
index_version: Option<&str>,
) -> Poll<CargoResult<LoadResponse>>;

/// Loads the `config.json` file and returns it.
///
Expand Down Expand Up @@ -495,15 +507,6 @@ pub trait RegistryData {
/// Returns the [`Path`] to the [`Filesystem`].
fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path;

/// Returns the current "version" of the index.
///
/// For local registries, this returns `None` because there is no
/// versioning. For remote registries, this returns the SHA hash of the
/// git index on disk (or None if the index hasn't been downloaded yet).
///
/// This is used by index caching to check if the cache is out of date.
fn current_version(&self) -> Option<InternedString>;

/// Block until all outstanding Poll::Pending requests are Poll::Ready.
fn block_until_ready(&mut self) -> CargoResult<()>;
}
Expand Down
Loading