Skip to content

Commit

Permalink
Handle discarded regions when packing deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
jthornber committed May 19, 2022
1 parent ebd1c6e commit f46641b
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 95 deletions.
36 changes: 22 additions & 14 deletions src/chunkers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use io::prelude::*;
use std::fs::File;
use std::fs::OpenOptions;
Expand Down Expand Up @@ -154,20 +154,18 @@ impl Iterator for ThinChunker {

pub struct DeltaChunker {
input: File,
additions: RunIter,
removals: RunIter,
deltas: DualIter,
data_block_size: u64,

max_read_size: usize,
current_run: Option<(bool, Range<u64>)>,
current_run: Option<(DualType, Range<u64>)>,
}

impl DeltaChunker {
pub fn new(input: File, additions: RunIter, removals: RunIter, data_block_size: u64) -> Self {
pub fn new(input: File, deltas: DualIter, data_block_size: u64) -> Self {
Self {
input,
additions,
removals,
deltas,
data_block_size,

max_read_size: 16 * 1024 * 1024,
Expand All @@ -176,10 +174,10 @@ impl DeltaChunker {
}

// FIXME: removals are being ignored
fn next_run_bytes(&mut self) -> Option<(bool, Range<u64>)> {
self.additions.next().map(|(b, Range { start, end })| {
fn next_run_bytes(&mut self) -> Option<(DualType, Range<u64>)> {
self.deltas.next().map(|(t, Range { start, end })| {
(
b,
t,
Range {
start: start as u64 * self.data_block_size,
end: end as u64 * self.data_block_size,
Expand All @@ -193,8 +191,8 @@ impl DeltaChunker {
std::mem::swap(&mut run, &mut self.current_run);

match run.or_else(|| self.next_run_bytes()) {
Some((false, run)) => Ok(Some(Chunk::Ref(run.end - run.start))),
Some((true, run)) => {
Some((DualType::Left, run)) => {
// Addition
let run_len = run.end - run.start;
if run_len <= self.max_read_size as u64 {
let mut buf = vec![0; run_len as usize];
Expand All @@ -203,11 +201,21 @@ impl DeltaChunker {
} else {
let mut buf = vec![0; self.max_read_size];
self.input.read_exact_at(&mut buf, run.start)?;
self.current_run = Some((true, (run.start + buf.len() as u64)..run.end));
self.current_run = Some((DualType::Left, (run.start + buf.len() as u64)..run.end));
Ok(Some(Chunk::Mapped(buf)))
}
}
None => Ok(None),
Some((DualType::Right, run)) => {
// Removal
Ok(Some(Chunk::Unmapped(run.end - run.start)))
}
Some((DualType::Both, ..)) => {
Err(anyhow!("internal error: region can't be both an addition and removal"))
}
Some((DualType::Neither, run)) => {
Ok(Some(Chunk::Ref(run.end - run.start)))
}
None => Ok(None)
}
}
}
Expand Down
8 changes: 0 additions & 8 deletions src/content_sensitive_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ mod splitter_tests {
Ok(())
}

fn handle_gap(&mut self, _len: u64) -> Result<()> {
Ok(())
}

fn complete(&mut self) -> Result<()> {
Ok(())
}
Expand Down Expand Up @@ -384,10 +380,6 @@ mod splitter_tests {
Ok(())
}

fn handle_gap(&mut self, _len: u64) -> Result<()> {
Ok(())
}

fn complete(&mut self) -> Result<()> {
Ok(())
}
Expand Down
9 changes: 2 additions & 7 deletions src/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,20 +653,15 @@ fn thin_delta_packer(
let old_config = config::read_stream_config(delta_id)?;
let mapped_size = old_config.mapped_size;

let additions = RunIter::new(
let run_iter = DualIter::new(
mappings.additions,
(input_size / (mappings.data_block_size as u64 * 512)) as u32,
);

let removals = RunIter::new(
mappings.removals,
(input_size / (mappings.data_block_size as u64 * 512)) as u32,
);

let input_iter = Box::new(DeltaChunker::new(
input,
additions,
removals,
run_iter,
mappings.data_block_size as u64 * 512,
));
let thin_id = Some(mappings.thin_id);
Expand Down
152 changes: 152 additions & 0 deletions src/run_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,155 @@ mod run_iter_tests {
}

//-----------------------------------------

#[derive(Debug, PartialEq, Eq)]
pub enum DualType {
Left,
Right,
Both,
Neither,
}

pub struct DualIter {
len: u32,
current: u32,
left: RoaringBitmap,
right: RoaringBitmap,
}

impl DualIter {
pub fn new(left: RoaringBitmap, right: RoaringBitmap, len: u32) -> Self {
Self {
len,
current: 0,
left,
right,
}
}
}

impl Iterator for DualIter {
type Item = (DualType, Range<u32>);

fn next(&mut self) -> Option<Self::Item> {
use DualType::*;

if self.current == self.len {
None
} else {
let l = self.left.contains(self.current);
let r = self.right.contains(self.current);
let start = self.current;
self.current += 1;
match (l, r) {
(false, false) => {
while self.current < self.len
&& !self.left.contains(self.current)
&& !self.right.contains(self.current)
{
self.current += 1;
}
Some((Neither, start..self.current))
}
(false, true) => {
while self.current < self.len
&& !self.left.contains(self.current)
&& self.right.contains(self.current)
{
self.current += 1;
}
Some((Right, start..self.current))
}
(true, false) => {
while self.current < self.len
&& self.left.contains(self.current)
&& !self.right.contains(self.current)
{
self.current += 1;
}
Some((Left, start..self.current))
}
(true, true) => {
while self.current < self.len
&& self.left.contains(self.current)
&& self.right.contains(self.current)
{
self.current += 1;
}
Some((Both, start..self.current))
}
}
}
}
}

//-----------------------------------------

#[cfg(test)]
mod dual_iter_tests {
use super::*;

struct Test {
left: Vec<bool>,
right: Vec<bool>,
expected: Vec<(DualType, Range<u32>)>,
}

fn mk_bits(v: &[bool]) -> RoaringBitmap {
let mut bits = RoaringBitmap::new();
for (i, b) in v.iter().enumerate() {
if *b {
bits.insert(i as u32);
}
}
bits
}

#[test]
fn test_run_iter() {
use DualType::*;

let tests = vec![
Test {
left: vec![],
right: vec![],
expected: vec![],
},
Test {
left: vec![false, false, false],
right: vec![false, false, false],
expected: vec![(Neither, 0..3)],
},
Test {
left: vec![false, true, true, false, false, false, true],
right: vec![false, true, true, false, false, false, true],
expected: vec![(Neither, 0..1), (Both, 1..3), (Neither, 3..6), (Both, 6..7)],
},
Test {
left: vec![false, true, false, false, false, false, true, false, false],
right: vec![false, false, true, false, false, false, true, false, true],
expected: vec![
(Neither, 0..1),
(Left, 1..2),
(Right, 2..3),
(Neither, 3..6),
(Both, 6..7),
(Neither, 7..8),
(Right, 8..9),
],
},
];

for t in tests {
assert_eq!(t.left.len(), t.right.len());
let left = mk_bits(&t.left[..]);
let right = mk_bits(&t.right[..]);

let it = DualIter::new(left, right, t.left.len() as u32);
let actual: Vec<(DualType, Range<u32>)> = it.collect();
assert_eq!(actual, t.expected);
}
}
}

//-----------------------------------------
66 changes: 0 additions & 66 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1258,70 +1258,4 @@ impl Dumper {

//-----------------------------------------

#[cfg(test)]
mod stream_tests {
use super::*;

fn mk_run(slab: u32, b: u32, e: u32) -> MapEntry {
assert!((e - b) < u16::MAX as u32);
MapEntry::Data {
slab,
offset: b,
nr_entries: e - b,
}
}

#[test]
fn pack_unpack_cycle() {
use MapEntry::*;

let tests: Vec<Vec<MapEntry>> = vec![
vec![],
vec![Fill { byte: 0, len: 1 }],
/*
* Test doesn't work now we aggregate zeroes
vec![
Zero { len: 15 },
Zero { len: 16 },
Zero { len: 4095 },
Zero { len: 4096 },
Zero {
len: (4 * 1024 * 1024) - 1,
},
Zero {
len: 4 * 1024 * 1024,
},
Zero {
len: 16 * 1024 * 1024,
},
],
*/
vec![mk_run(0, 0, 4)],
vec![mk_run(1, 1, 4)],
vec![mk_run(1, 1, 1024)],
vec![mk_run(1, 1, 16000)],
];

for t in tests {
// pack
let mut buf: Vec<u8> = Vec::new();
let mut c = std::io::Cursor::new(&mut buf);

let mut builder = MappingBuilder::default();
for e in &t {
let len = 16; // FIXME: assume all entries are 16 bytes in length
builder
.next(&e, len, &mut c)
.expect("builder.next() failed");
}
builder.complete(&mut c).expect("builder.complete() failed");

// unpack
let (actual, _) = unpack(&buf[..]).expect("unpack failed");

assert_eq!(*t, actual);
}
}
}

//-----------------------------------------
Loading

0 comments on commit f46641b

Please sign in to comment.