Skip to content

Commit

Permalink
feat: implement index compact feature
Browse files Browse the repository at this point in the history
Refs: #188
Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Jun 28, 2023
1 parent 2b6a719 commit 2bc8908
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 19 deletions.
2 changes: 1 addition & 1 deletion utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ pub mod level_format {
use crate::parse_log_level;

/// deserializes a cluster duration
#[allow(single_use_lifetimes)] // TODO: Think is it necessary to allow this clippy??
#[allow(single_use_lifetimes)]
pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result<LevelConfig, D::Error>
where
D: Deserializer<'de>,
Expand Down
200 changes: 182 additions & 18 deletions xline/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ impl Index {
revision: i64,
sub_revision: i64,
) -> Option<(Revision, Revision)> {
let last_rev = Self::get_revision(revs, 0)?;
let last_available_rev = Self::get_revision(revs, 0)?;
let del_rev = KeyRevision::new_deletion(revision, sub_revision);
revs.push(del_rev);
Some((last_rev, del_rev.as_revision()))
Some((last_available_rev, del_rev.as_revision()))
}

/// Mark the `KeyRevision` as available
Expand Down Expand Up @@ -129,7 +129,10 @@ pub(super) trait IndexOperate {
version: i64,
);

// TODO: fn compact(rev:i64)
/// Compact a `KeyRevision` by removing the versions with smaller or equal
/// revision than the given atRev except the largest one (If the largest one is
/// a tombstone, it will not be kept).
fn compact(&self, at_rev: i64) -> Vec<KeyRevision>;
}

impl IndexOperate for Index {
Expand Down Expand Up @@ -287,6 +290,58 @@ impl IndexOperate for Index {
.or_insert_with(Vec::new)
.push(new_rev);
}

fn compact(&self, at_rev: i64) -> Vec<KeyRevision> {
let mut revs = Vec::new();
let mut del_keys = Vec::new();

let mut inner = self.inner.lock();
inner.index.iter_mut().for_each(|(key, revisions)| {
if let Some(revision) = revisions.first() {
if revision.mod_revision < at_rev {
match revisions.binary_search_by(|rev| rev.mod_revision.cmp(&at_rev)) {
Ok(idx) => {
let key_rev = revisions.get(idx).unwrap_or_else(|| {
unreachable!(
"{idx} is out of range, len of revisions is {}",
revisions.len()
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=idx)
} else {
revisions.drain(..idx)
};
revs.extend(compact_revs.into_iter());
}
Err(idx) => {
let compacted_last_idx = idx.overflow_sub(1);
let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| {
unreachable!(
"{idx} is out of range, len of revisions is {}",
revisions.len()
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=compacted_last_idx)
} else {
revisions.drain(..compacted_last_idx)
};
revs.extend(compact_revs.into_iter());
}
}
if revisions.is_empty() {
del_keys.push(key.clone());
}
}
}
});

for key in del_keys {
let _ignore = inner.index.remove(&key);
}
revs
}
}

#[cfg(test)]
Expand All @@ -309,12 +364,20 @@ mod test {
index.insert_or_update_revision(b"key", 3, 1);
index.insert_or_update_revision(b"foo", 4, 5);
index.insert_or_update_revision(b"bar", 5, 4);
index.insert_or_update_revision(b"foo", 6, 6);
index.insert_or_update_revision(b"bar", 7, 7);
index.insert_or_update_revision(b"foo", 8, 8);
index.insert_or_update_revision(b"bar", 9, 9);

index.mark_available(1);
index.mark_available(2);
index.mark_available(3);
index.mark_available(4);
index.mark_available(5);
index.mark_available(6);
index.mark_available(7);
index.mark_available(8);
index.mark_available(9);

assert_eq!(
index.inner.lock().index,
Expand All @@ -329,11 +392,19 @@ mod test {
),
(
b"foo".to_vec(),
vec![KeyRevision::new(4, 1, 4, 5).mark_available()]
vec![
KeyRevision::new(4, 1, 4, 5).mark_available(),
KeyRevision::new(4, 2, 6, 6).mark_available(),
KeyRevision::new(4, 3, 8, 8).mark_available()
]
),
(
b"bar".to_vec(),
vec![KeyRevision::new(5, 1, 5, 4).mark_available()]
vec![
KeyRevision::new(5, 1, 5, 4).mark_available(),
KeyRevision::new(5, 2, 7, 7).mark_available(),
KeyRevision::new(5, 3, 9, 9).mark_available(),
]
)
])
);
Expand All @@ -352,14 +423,25 @@ mod test {
);
assert_eq!(
index.get_from_rev(b"a", b"g", 3),
vec![Revision::new(4, 5), Revision::new(5, 4)]
vec![
Revision::new(4, 5),
Revision::new(5, 4),
Revision::new(6, 6),
Revision::new(7, 7),
Revision::new(8, 8),
Revision::new(9, 9)
]
);
assert_eq!(
index.get_from_rev(b"\0", b"\0", 3),
vec![
Revision::new(3, 1),
Revision::new(4, 5),
Revision::new(5, 4)
Revision::new(5, 4),
Revision::new(6, 6),
Revision::new(7, 7),
Revision::new(8, 8),
Revision::new(9, 9)
]
);
}
Expand All @@ -369,27 +451,27 @@ mod test {
let index = init_and_test_insert();

assert_eq!(
index.delete(b"key", b"", 6, 0),
index.delete(b"key", b"", 10, 0),
(
vec![(Revision::new(3, 1), Revision::new(6, 0))],
vec![(Revision::new(3, 1), Revision::new(10, 0))],
vec![b"key".to_vec()]
)
);
index.mark_available(6);
index.mark_available(10);

assert_eq!(
index.delete(b"a", b"g", 7, 0),
index.delete(b"a", b"g", 11, 0),
(
vec![
(Revision::new(5, 4), Revision::new(7, 0)),
(Revision::new(4, 5), Revision::new(7, 1)),
(Revision::new(9, 9), Revision::new(11, 0)),
(Revision::new(8, 8), Revision::new(11, 1)),
],
vec![b"bar".to_vec(), b"foo".to_vec()]
)
);
index.mark_available(7);
index.mark_available(11);

assert_eq!(index.delete(b"\0", b"\0", 8, 0), (vec![], vec![]));
assert_eq!(index.delete(b"\0", b"\0", 12, 0), (vec![], vec![]));

assert_eq!(
index.inner.lock().index,
Expand All @@ -400,21 +482,25 @@ mod test {
KeyRevision::new(1, 1, 1, 3).mark_available(),
KeyRevision::new(1, 2, 2, 2).mark_available(),
KeyRevision::new(1, 3, 3, 1).mark_available(),
KeyRevision::new_deletion(6, 0).mark_available()
KeyRevision::new_deletion(10, 0).mark_available()
]
),
(
b"foo".to_vec(),
vec![
KeyRevision::new(4, 1, 4, 5).mark_available(),
KeyRevision::new_deletion(7, 1).mark_available()
KeyRevision::new(4, 2, 6, 6).mark_available(),
KeyRevision::new(4, 3, 8, 8).mark_available(),
KeyRevision::new_deletion(11, 1).mark_available()
]
),
(
b"bar".to_vec(),
vec![
KeyRevision::new(5, 1, 5, 4).mark_available(),
KeyRevision::new_deletion(7, 0).mark_available()
KeyRevision::new(5, 2, 7, 7).mark_available(),
KeyRevision::new(5, 3, 9, 9).mark_available(),
KeyRevision::new_deletion(11, 0).mark_available()
]
)
])
Expand Down Expand Up @@ -444,4 +530,82 @@ mod test {
])
);
}

#[test]
fn test_compact() {
let index = init_and_test_insert();
let res = index.compact(7);
assert_eq!(
index.inner.lock().index,
BTreeMap::from_iter(vec![
(
b"key".to_vec(),
vec![KeyRevision::new(1, 3, 3, 1).mark_available(),]
),
(
b"foo".to_vec(),
vec![
KeyRevision::new(4, 2, 6, 6).mark_available(),
KeyRevision::new(4, 3, 8, 8).mark_available()
]
),
(
b"bar".to_vec(),
vec![
KeyRevision::new(5, 2, 7, 7).mark_available(),
KeyRevision::new(5, 3, 9, 9).mark_available(),
]
)
])
);
assert_eq!(
res,
vec![
KeyRevision::new(5, 1, 5, 4).mark_available(),
KeyRevision::new(4, 1, 4, 5).mark_available(),
KeyRevision::new(1, 1, 1, 3).mark_available(),
KeyRevision::new(1, 2, 2, 2).mark_available(),
]
);
}

#[test]
fn test_compact_with_deletion() {
let index = init_and_test_insert();
index.delete(b"a", b"g", 10, 0);
index.mark_available(10);

index.insert_or_update_revision(b"bar", 11, 0);
index.mark_available(11);

let res = index.compact(10);
assert_eq!(
index.inner.lock().index,
BTreeMap::from_iter(vec![
(
b"key".to_vec(),
vec![KeyRevision::new(1, 3, 3, 1).mark_available(),]
),
(
b"bar".to_vec(),
vec![KeyRevision::new(11, 1, 11, 0).mark_available(),]
)
])
);
assert_eq!(
res,
vec![
KeyRevision::new(5, 1, 5, 4).mark_available(),
KeyRevision::new(5, 2, 7, 7).mark_available(),
KeyRevision::new(5, 3, 9, 9).mark_available(),
KeyRevision::new(0, 0, 10, 0).mark_available(),
KeyRevision::new(4, 1, 4, 5).mark_available(),
KeyRevision::new(4, 2, 6, 6).mark_available(),
KeyRevision::new(4, 3, 8, 8).mark_available(),
KeyRevision::new(0, 0, 10, 1).mark_available(),
KeyRevision::new(1, 1, 1, 3).mark_available(),
KeyRevision::new(1, 2, 2, 2).mark_available(),
]
);
}
}

0 comments on commit 2bc8908

Please sign in to comment.