Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedy #133

Merged
merged 24 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
18919bc
speedups
laudiacay Mar 1, 2023
a577ae8
it's broken but i refactored it :)
laudiacay Mar 4, 2023
3dfffb4
GroupConfigs still not functional but code now compiles
organizedgrime Mar 6, 2023
be671d4
got it compiling, configs work
laudiacay Mar 6, 2023
c5a5e2e
fclones pls
laudiacay Mar 6, 2023
3735f59
Working for non-deduplication cases using fclones
organizedgrime Mar 6, 2023
4f28f79
All tests working. yay.
organizedgrime Mar 6, 2023
b966e49
Fixed pack.rs partitioning
organizedgrime Mar 6, 2023
5c18e14
Added TODO and simplified for in pack.rs
organizedgrime Mar 7, 2023
4c0fc70
tiny cleanup in pack.rs
organizedgrime Mar 7, 2023
13277f2
Added obake and versioning to ManifestData
organizedgrime Mar 8, 2023
1142f45
resolved documentation warnings; added version compatibility checker …
organizedgrime Mar 8, 2023
a674016
Strictly enforcing unpack congruence w Cargo version
organizedgrime Mar 9, 2023
f0504a2
Added real documentation to unpack_pipeline and cleaned up function args
organizedgrime Mar 9, 2023
7b06efc
Reduced type complexity in pack/unpack pipelines opting for borrowed …
organizedgrime Mar 9, 2023
65f5605
Removed unused plan_copy and hasher functionality; now handled by fcl…
organizedgrime Mar 9, 2023
4181ce4
Deep clean on pack_pipeline
organizedgrime Mar 9, 2023
59c1d72
light clean of do_pack_pipeline
organizedgrime Mar 9, 2023
cff1549
Making more modules public to include them in docs
organizedgrime Mar 9, 2023
15c8181
Added remaining documentation; expanded CompressionScheme functionali…
organizedgrime Mar 9, 2023
77718f9
Allowed backwards compatibility with minor versions
organizedgrime Mar 9, 2023
f8c3983
Finished adding documentation
organizedgrime Mar 9, 2023
4fc7bab
Merge branch 'speedy' into docs_cleanup
organizedgrime Mar 9, 2023
eb3ed06
Merge pull request #134 from banyancomputer/docs_cleanup
laudiacay Mar 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
943 changes: 881 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ clap = { version = "4.0.32", features = ["derive"] }
anyhow = { version = "1", features = ["backtrace"] }
tokio = { version = "1.24", features = ["full", "io-util", "fs"]}
tokio-stream = { version = "0.1.11", features = ["fs"]}
aead = { version = "0.5.1", features = ["stream", "alloc"] }
aes-gcm = {version = "0.10.0", features = ["aes"]}
jwalk = "0.8.1"
rand = "0.8.4"
blake2 = "0.10.6"
flate2 = "1.0.25"
uuid = { version = "1.2.2", features = ["v4"]}
serde ={version= "1.0.152", features = ["derive"]}
serde_json = { version = "1.0.72", features = ["std"]}
Expand All @@ -36,11 +32,12 @@ fake-file = "0.1.0"
lazy_static = "1.4"
criterion = {version = "0.4.0", features = ["async_tokio"]}
dir-assert = { git = "https://github.com/banyancomputer/dir-assert.git", branch = "non-utf8" }
zstd = "0.12.3"
fclones = "0.29.3"

[patch]

[profile.dev]
split-debuginfo = "unpacked"

[profile.release]

6 changes: 3 additions & 3 deletions dataprep-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ anyhow.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
jwalk.workspace = true
blake2.workspace = true
flate2.workspace = true
uuid.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand All @@ -26,8 +24,10 @@ dir-assert.workspace = true
fs_extra.workspace = true
criterion.workspace = true
lazy_static.workspace = true
zstd.workspace = true
fclones.workspace = true

# Benchmark for packing -> unpacking an input directory
[[bench]]
name = "pipeline"
harness = false
harness = false
19 changes: 7 additions & 12 deletions dataprep-lib/benches/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use dataprep_lib::{
pipeline::{pack_pipeline::pack_pipeline, unpack_pipeline::unpack_pipeline},
do_pipeline_and_write_metadata::{
pack_pipeline::pack_pipeline, unpack_pipeline::unpack_pipeline,
},
utils::fs::{ensure_path_exists_and_is_dir, ensure_path_exists_and_is_empty_dir},
};
use dir_assert::assert_paths;
Expand Down Expand Up @@ -318,9 +320,9 @@ fn pack_benchmark(
// The routine to benchmark
|_| async {
pack_pipeline(
black_box(input_path.clone()),
black_box(packed_path.clone()),
black_box(manifest_path.clone()),
black_box(input_path),
black_box(packed_path),
black_box(manifest_path),
// TODO (amiller68) - make this configurable
black_box(1073741824),
black_box(false),
Expand Down Expand Up @@ -366,14 +368,7 @@ fn unpack_benchmark(
// Operation needed to make sure unpack doesn't fail
|| prep_unpack(unpacked_path, manifest_path),
// The routine to benchmark
|_| async {
unpack_pipeline(
black_box(packed_path.clone()),
black_box(unpacked_path.clone()),
black_box(manifest_path.clone()),
)
.await
},
|_| async { unpack_pipeline(black_box(unpacked_path), black_box(manifest_path)).await },
// We need to make sure this data is cleared between iterations
// We only want to use one iteration
BatchSize::PerIteration,
Expand Down
4 changes: 4 additions & 0 deletions dataprep-lib/src/do_pipeline_and_write_metadata/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/// This module contains the pack_pipeline function, which is the main entry point for packing new data.
pub mod pack_pipeline;
/// This module contains the unpack_pipeline function, which is the main entry point for extracting previously packed data.
pub mod unpack_pipeline;
230 changes: 230 additions & 0 deletions dataprep-lib/src/do_pipeline_and_write_metadata/pack_pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use anyhow::Result;
use fclones::{config::GroupConfig, group_files};
use std::{
collections::HashSet,
fs,
path::{Path, PathBuf},
sync::Arc,
};

use crate::{
spider,
types::{
pack_plan::{PackPipelinePlan, PackPlan},
shared::{CompressionScheme, EncryptionScheme, PartitionScheme},
spider::SpiderMetadata,
unpack_plan::{ManifestData, UnpackPipelinePlan},
},
utils::fs as fsutil,
vacuum,
};

/// Given the input directory, the output directory, the manifest file, and other metadata,
/// pack the input directory into the output directory and store a record of how this
/// operation was performed in the manifest file.
///
/// # Arguments
///
/// * `input_dir` - &Path representing the relative path of the input directory to pack.
/// * `output_dir` - &Path representing the relative path of where to store the packed data.
/// * `manifest_file` - &Path representing the relative path of where to store the manifest file.
/// * `chunk_size` - The maximum size of a packed file / chunk in bytes.
/// * `follow_links` - Whether or not to follow symlinks when packing.
///
/// # Return Type
/// Returns `Ok(())` on success, otherwise returns an error.
pub async fn pack_pipeline(
input_dir: &Path,
output_dir: &Path,
manifest_file: &Path,
chunk_size: u64,
follow_links: bool,
) -> Result<()> {
// Construct the group config
let group_config = create_group_config(input_dir, follow_links);

// Create the output directory
fsutil::ensure_path_exists_and_is_empty_dir(output_dir, false)
.expect("output directory must exist and be empty");

// This pack plan is used to construct FileGroup type PackPipelinePlans,
// but is not unique to any individual file / FileGroup.
let pack_plan = PackPlan {
compression: CompressionScheme::new_zstd(),
partition: PartitionScheme { chunk_size },
encryption: EncryptionScheme::new_age(),
writeout: output_dir.to_path_buf(),
};

/* Perform deduplication and plan how to copy the files */

// Initialize a struct to figure out which files are friends with which
let mut fclones_logger = fclones::log::StdLog::new();
fclones_logger.no_progress = true;

// TODO fix setting base_dir / do it right
let file_groups = group_files(&group_config, &fclones_logger)?;
// HashSet to track files that have already been seen
let mut seen_files: HashSet<PathBuf> = HashSet::new();
// Vector holding all the PackPipelinePlans for packing
let mut packing_plan = vec![];
// go over the files- do it in groups
for group in file_groups {
// Create a vector to hold the SpiderMetadata for each file in this group
let mut metadatas = Vec::new();
// For each file in this group
for file in group.files {
// Construct a PathBuf version of the path of this file
let file_path_buf = file.path.to_path_buf();
// Construct a canonicalized version of the path
let canonicalized_path = file_path_buf.canonicalize().unwrap();
// Insert that path into the list of seen paths
seen_files.insert(canonicalized_path.clone());

// Construct the original root and relative path
let original_root = &group_config.base_dir;
// Construct the original location relative to the root
let original_location = file.path.strip_prefix(original_root).unwrap().to_path_buf();

// Construct the metadata
let spider_metadata = Arc::new(SpiderMetadata {
/// This is the root of the backup
original_root: original_root.to_path_buf(),
/// This is the path relative to the root of the backup
original_location,
/// This is the canonicalized path of the original file
canonicalized_path,
/// This is the metadata of the original file
original_metadata: fs::metadata(file_path_buf).unwrap(),
});

// Append the metadata
metadatas.push(spider_metadata);
}
// Push a PackPipelinePlan with this file group
packing_plan.push(PackPipelinePlan::FileGroup(metadatas, pack_plan.clone()));
}

/* Spider all the files so we can figure out what's there */
// TODO fix setting follow_links / do it right
let spidered: Vec<SpiderMetadata> =
spider::spider(input_dir, group_config.follow_links).await?;

// and now get all the directories and symlinks
for spidered in spidered.into_iter() {
// If this is a duplicate
if seen_files.contains(&spidered.canonicalized_path.to_path_buf()) {
// Just skip it
continue;
}
// Now that we've checked for duplicates, add this to the seen files
seen_files.insert(spidered.canonicalized_path.clone());

// Construct Automatic Reference Counting pointer to the spidered metadata
let origin_data = Arc::new(spidered.clone());
// If this is a directory
if spidered.original_metadata.is_dir() {
// Push a PackPipelinePlan with this origin data
packing_plan.push(PackPipelinePlan::Directory(origin_data));
}
// If this is a symlink
else if spidered.original_metadata.is_symlink() {
// Determine where this symlink points to, an operation that should never fail
let symlink_target = fs::read_link(&spidered.canonicalized_path).unwrap();
// Push a PackPipelinePlan with this origin data and symlink
packing_plan.push(PackPipelinePlan::Symlink(origin_data, symlink_target));
}
// If this is a file that was not in a group
else {
// Push a PackPipelinePlan using fake file group of singular spidered metadata
packing_plan.push(PackPipelinePlan::FileGroup(
vec![origin_data],
pack_plan.clone(),
));
}
}

// TODO (laudiacay): For now we are doing compression in place, per-file. Make this better.
let unpack_plans = futures::future::join_all(
packing_plan
.iter()
.map(|copy_plan| vacuum::pack::do_pack_pipeline(copy_plan.clone())),
)
.await
.into_iter()
.flat_map(|x| x.unwrap())
.collect::<Vec<UnpackPipelinePlan>>();

// For now just write out the content of compressed_and_encrypted to a file.
// make sure the manifest file doesn't exist
let manifest_writer = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(manifest_file)
.unwrap();

// Construct the latest version of the ManifestData struct
let manifest_data = ManifestData {
version: env!("CARGO_PKG_VERSION").to_string(),
unpack_plans,
};

// Use serde to convert the ManifestData to JSON and write it to the path specified
// Return the result of this operation
serde_json::to_writer_pretty(manifest_writer, &manifest_data).map_err(|e| anyhow::anyhow!(e))
}

/// Private function used to construct a GroupConfig struct from the relevant command line options.
/// This is used to make the main function more readable, as well as to ensure that
/// the GroupConfig options are always set correctly.
fn create_group_config(input_dir: &Path, follow_links: bool) -> GroupConfig {
let base_dir = input_dir.canonicalize().unwrap();

// we checked over these options manually and sorted them
GroupConfig {
// will definitely never need to change
output: None,
format: Default::default(),
stdin: false,
isolate: false, // TODO laudiacay um bug?
in_place: false,
no_copy: false,
rf_over: None,
rf_under: None,
unique: false,

// will probably never need to change
depth: None,
match_links: false,
symbolic_links: false, // TODO laudiacay here be bugs
transform: None,
min_size: (0_usize).into(),
max_size: None,
ignore_case: false,
regex: false,

// may want to change for feature adds in the future
hidden: true,
no_ignore: false, // TODO laudiacay HELPPPP THIS MIGHT BE BUGS
// TODO laudiacay ????
name_patterns: vec![],
path_patterns: vec![],
exclude_patterns: vec![],
hash_fn: Default::default(),
cache: false,

// we are using this option it is load bearing
threads: vec![(
"default".to_string().parse().unwrap(),
fclones::config::Parallelism {
random: 1,
sequential: 1,
},
)],
follow_links,
base_dir: base_dir.into(),
paths: vec![".".into()],
}
// TODO think about fclones caching for repeated runs :3 this will b useful for backup utility kind of thing
// TODO groupconfig.threads and think about splitting squential and random io into separate thread pools
}
45 changes: 45 additions & 0 deletions dataprep-lib/src/do_pipeline_and_write_metadata/unpack_pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::{
types::unpack_plan::{ManifestData, UnpackPipelinePlan},
vacuum::unpack::do_unpack_pipeline,
};
use anyhow::Result;
use std::path::Path;
use tokio_stream::StreamExt;

/// Given the manifest file and a destination for our unpacked data, run the unpacking pipeline
/// on the data referenced in the manifest.
///
/// # Arguments
///
/// * `output_dir` - &Path representing the relative path of the output directory in which to unpack the data
/// * `manifest_file` - &Path representing the relative path of the manifest file
///
/// # Return Type
/// Returns `Ok(())` on success, otherwise returns an error.
pub async fn unpack_pipeline(output_dir: &Path, manifest_file: &Path) -> Result<()> {
// parse manifest file into Vec<CodablePipeline>
let reader = std::fs::File::open(manifest_file)?;

// Deserialize the data read as the latest version of manifestdata
let manifest_data: ManifestData = serde_json::from_reader(reader)?;

// If the major version of the manifest is not the same as the major version of the program
if manifest_data.version.split('.').next().unwrap()
!= env!("CARGO_PKG_VERSION").split('.').next().unwrap()
{
// Panic if it's not
panic!("Unsupported manifest version.");
}

// Extract the unpacking plans
let unpack_plans: Vec<UnpackPipelinePlan> = manifest_data.unpack_plans;

// Iterate over each pipeline
tokio_stream::iter(unpack_plans)
.then(|pipeline_to_disk| do_unpack_pipeline(pipeline_to_disk, output_dir))
.collect::<Result<Vec<_>>>()
.await?;

// If the async block returns, we're Ok.
Ok(())
}
6 changes: 0 additions & 6 deletions dataprep-lib/src/fs_carfiler.rs

This file was deleted.

Loading