-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: fixup fvm IPLD flush logic #1810
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,13 +4,12 @@ | |
|
||
use std::cell::RefCell; | ||
use std::collections::HashMap; | ||
use std::io::{Cursor, Read, Seek}; | ||
use std::io::Read; | ||
|
||
use anyhow::{anyhow, Result}; | ||
use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; | ||
use cid::Cid; | ||
use fvm_ipld_blockstore::{Blockstore, Buffered}; | ||
use fvm_ipld_encoding::{CBOR, DAG_CBOR}; | ||
use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW}; | ||
use fvm_shared::commcid::{FIL_COMMITMENT_SEALED, FIL_COMMITMENT_UNSEALED}; | ||
|
||
/// Wrapper around `Blockstore` to limit and have control over when values are written. | ||
|
@@ -43,15 +42,10 @@ where | |
{ | ||
/// Flushes the buffered cache based on the root node. | ||
/// This will recursively traverse the cache and write all data connected by links to this | ||
/// root Cid. Calling flush will not reset the write buffer. | ||
/// root Cid, moving the reachable blocks from the write buffer to the backing store. | ||
fn flush(&self, root: &Cid) -> Result<()> { | ||
let mut buffer = Vec::new(); | ||
let s = self.write.borrow(); | ||
copy_rec(&s, *root, &mut buffer)?; | ||
|
||
self.base.put_many_keyed(buffer)?; | ||
|
||
Ok(()) | ||
self.base | ||
.put_many_keyed(take_reachable(&mut self.write.borrow_mut(), root)?) | ||
} | ||
} | ||
|
||
|
@@ -62,86 +56,63 @@ where | |
/// This was implemented because the CBOR library we use does not expose low | ||
/// methods like this, requiring us to deserialize the whole CBOR payload, which | ||
/// is unnecessary and quite inefficient for our usecase here. | ||
fn cbor_read_header_buf<B: Read>(br: &mut B, scratch: &mut [u8]) -> anyhow::Result<(u8, usize)> { | ||
let first = br.read_u8()?; | ||
fn cbor_read_header_buf<B: Read>(br: &mut B) -> anyhow::Result<(u8, u64)> { | ||
#[inline(always)] | ||
pub fn read_fixed<const N: usize>(r: &mut impl Read) -> std::io::Result<[u8; N]> { | ||
let mut buf = [0; N]; | ||
r.read_exact(&mut buf).map(|_| buf) | ||
} | ||
|
||
let first = read_fixed::<1>(br)?[0]; | ||
let maj = (first & 0xe0) >> 5; | ||
let low = first & 0x1f; | ||
|
||
if low < 24 { | ||
Ok((maj, low as usize)) | ||
} else if low == 24 { | ||
let val = br.read_u8()?; | ||
if val < 24 { | ||
return Err(anyhow!( | ||
"cbor input was not canonical (lval 24 with value < 24)" | ||
)); | ||
} | ||
Ok((maj, val as usize)) | ||
} else if low == 25 { | ||
br.read_exact(&mut scratch[..2])?; | ||
let val = BigEndian::read_u16(&scratch[..2]); | ||
if val <= u8::MAX as u16 { | ||
return Err(anyhow!( | ||
"cbor input was not canonical (lval 25 with value <= MaxUint8)" | ||
)); | ||
} | ||
Ok((maj, val as usize)) | ||
} else if low == 26 { | ||
br.read_exact(&mut scratch[..4])?; | ||
let val = BigEndian::read_u32(&scratch[..4]); | ||
if val <= u16::MAX as u32 { | ||
return Err(anyhow!( | ||
"cbor input was not canonical (lval 26 with value <= MaxUint16)" | ||
)); | ||
} | ||
Ok((maj, val as usize)) | ||
} else if low == 27 { | ||
br.read_exact(&mut scratch[..8])?; | ||
let val = BigEndian::read_u64(&scratch[..8]); | ||
if val <= u32::MAX as u64 { | ||
return Err(anyhow!( | ||
"cbor input was not canonical (lval 27 with value <= MaxUint32)" | ||
)); | ||
} | ||
Ok((maj, val as usize)) | ||
} else { | ||
Err(anyhow!("invalid header cbor_read_header_buf")) | ||
} | ||
let val = match low { | ||
..=23 => low.into(), | ||
24 => read_fixed::<1>(br)?[0].into(), | ||
25 => u16::from_be_bytes(read_fixed(br)?).into(), | ||
26 => u32::from_be_bytes(read_fixed(br)?).into(), | ||
27 => u64::from_be_bytes(read_fixed(br)?), | ||
_ => return Err(anyhow!("invalid header cbor_read_header_buf")), | ||
}; | ||
Comment on lines
+70
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great refactor for improving readability! Regarding performance, IIUC this is now doing 2 extra allocations for each call now instead of using a scratch buffer as before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stack allocations so they should be free (except zeroing, but that's pretty cheap and rust can likely optimize it away). |
||
Ok((maj, val)) | ||
} | ||
|
||
/// Given a CBOR serialized IPLD buffer, read through all of it and return all the Links. | ||
/// This function is useful because it is quite a bit more fast than doing this recursively on a | ||
/// deserialized IPLD object. | ||
fn scan_for_links<B: Read + Seek, F>(buf: &mut B, mut callback: F) -> Result<()> | ||
where | ||
F: FnMut(Cid) -> anyhow::Result<()>, | ||
{ | ||
let mut scratch: [u8; 100] = [0; 100]; | ||
fn scan_for_links(mut buf: &[u8], out: &mut Vec<Cid>) -> Result<()> { | ||
let mut remaining = 1; | ||
while remaining > 0 { | ||
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?; | ||
let (maj, extra) = cbor_read_header_buf(&mut buf)?; | ||
match maj { | ||
// MajUnsignedInt, MajNegativeInt, MajOther | ||
0 | 1 | 7 => {} | ||
// MajByteString, MajTextString | ||
2 | 3 => { | ||
buf.seek(std::io::SeekFrom::Current(extra as i64))?; | ||
if extra > buf.len() as u64 { | ||
return Err(anyhow!("unexpected end of cbor stream")); | ||
} | ||
buf = &buf[extra as usize..]; | ||
} | ||
// MajTag | ||
6 => { | ||
// Check if the tag refers to a CID | ||
if extra == 42 { | ||
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?; | ||
let (maj, extra) = cbor_read_header_buf(&mut buf)?; | ||
// The actual CID is expected to be a byte string | ||
if maj != 2 { | ||
return Err(anyhow!("expected cbor type byte string in input")); | ||
} | ||
if extra > 100 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was unnecessary. If the CID is invalid and/or too big, |
||
return Err(anyhow!("string in cbor input too long")); | ||
if extra > buf.len() as u64 { | ||
return Err(anyhow!("unexpected end of cbor stream")); | ||
} | ||
buf.read_exact(&mut scratch[..extra])?; | ||
let c = Cid::try_from(&scratch[1..extra])?; | ||
callback(c)?; | ||
if buf.first() != Some(&0u8) { | ||
return Err(anyhow!("DagCBOR CID does not start with a 0x byte")); | ||
} | ||
let cid_buf; | ||
(cid_buf, buf) = buf.split_at(extra as usize); | ||
out.push(Cid::try_from(&cid_buf[1..])?); | ||
} else { | ||
remaining += 1; | ||
} | ||
|
@@ -154,22 +125,18 @@ where | |
5 => { | ||
remaining += extra * 2; | ||
} | ||
_ => { | ||
return Err(anyhow!("unhandled cbor type: {}", maj)); | ||
8.. => { | ||
// This case is statically impossible unless `cbor_read_header_buf` makes a mistake. | ||
return Err(anyhow!("invalid cbor tag exceeds 3 bits: {}", maj)); | ||
} | ||
} | ||
remaining -= 1; | ||
} | ||
Ok(()) | ||
} | ||
|
||
/// Copies the IPLD DAG under `root` from the cache to the base store. | ||
fn copy_rec<'a>( | ||
cache: &'a HashMap<Cid, Vec<u8>>, | ||
root: Cid, | ||
buffer: &mut Vec<(Cid, &'a [u8])>, | ||
) -> Result<()> { | ||
const DAG_RAW: u64 = 0x55; | ||
/// Moves the IPLD DAG under `root` from the cache to the base store. | ||
fn take_reachable(cache: &mut HashMap<Cid, Vec<u8>>, root: &Cid) -> Result<Vec<(Cid, Vec<u8>)>> { | ||
const BLAKE2B_256: u64 = 0xb220; | ||
const BLAKE2B_LEN: u8 = 32; | ||
const IDENTITY: u64 = 0x0; | ||
|
@@ -180,53 +147,53 @@ fn copy_rec<'a>( | |
// 2. We always write-back new blocks, even if the client already has them. We haven't noticed a | ||
// perf impact. | ||
|
||
// TODO(M2): Make this not cbor specific. | ||
// TODO(M2): Allow CBOR (not just DAG_CBOR). | ||
match (root.codec(), root.hash().code(), root.hash().size()) { | ||
// Allow non-truncated blake2b-256 raw/cbor (code/state) | ||
(DAG_RAW | DAG_CBOR | CBOR, BLAKE2B_256, BLAKE2B_LEN) => (), | ||
// Ignore raw identity cids (fake code cids) | ||
(DAG_RAW, IDENTITY, _) => return Ok(()), | ||
// Copy links from cbor identity cids. | ||
// We shouldn't be creating these at the moment, but lotus' vm.Copy supports them. | ||
(DAG_CBOR, IDENTITY, _) => { | ||
return scan_for_links(&mut Cursor::new(root.hash().digest()), |link| { | ||
copy_rec(cache, link, buffer) | ||
}) | ||
let mut stack = vec![*root]; | ||
let mut result = Vec::new(); | ||
|
||
while let Some(k) = stack.pop() { | ||
// Check the codec. | ||
match k.codec() { | ||
// We ignore piece commitment CIDs. | ||
FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED => continue, | ||
// We allow raw, cbor, and dag cbor. | ||
IPLD_RAW | DAG_CBOR | CBOR => (), | ||
// Everything else is rejected. | ||
codec => return Err(anyhow!("cid {k} has unexpected codec ({codec})")), | ||
} | ||
// Ignore commitments (not even going to check the hash function. | ||
(FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED, _, _) => return Ok(()), | ||
// Fail on anything else. We usually want to continue on error, but there's really no going | ||
// back from here. | ||
(codec, hash, length) => { | ||
return Err(anyhow!( | ||
"cid {root} has unexpected codec ({codec}), hash ({hash}), or length ({length})" | ||
)) | ||
// Check the hash construction. | ||
match (k.hash().code(), k.hash().size()) { | ||
// Allow non-truncated blake2b-256 and identity hashes. | ||
(BLAKE2B_256, BLAKE2B_LEN) | (IDENTITY, _) => (), | ||
// Reject everything else. | ||
(hash, length) => { | ||
return Err(anyhow!( | ||
"cid {k} has unexpected multihash (code={hash}, len={length})" | ||
)) | ||
} | ||
} | ||
Comment on lines
+155
to
173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous logic was shorter but hard to reason about. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is more readable |
||
} | ||
|
||
// If we don't have the block, we assume it's already in the datastore. | ||
// | ||
// The alternative would be to check if it's in the datastore, but that's likely even more | ||
// expensive. And there wouldn't be much we could do at that point but abort the block. | ||
let block = match cache.get(&root) { | ||
Some(blk) => blk, | ||
None => return Ok(()), | ||
}; | ||
if k.hash().code() == IDENTITY { | ||
if k.codec() == DAG_CBOR { | ||
scan_for_links(k.hash().digest(), &mut stack)?; | ||
} | ||
} else { | ||
// If we don't have the block, we assume it and it's children are already in the | ||
// datastore. | ||
// | ||
// The alternative would be to check if it's in the datastore, but that's likely even more | ||
// expensive. And there wouldn't be much we could do at that point but abort the block. | ||
let Some(block) = cache.remove(&k) else { continue }; | ||
|
||
// At the moment, only DAG_CBOR can link to other blocks. | ||
if k.codec() == DAG_CBOR { | ||
scan_for_links(&block, &mut stack)?; | ||
} | ||
|
||
// At the moment, we only expect dag-cbor and raw. | ||
// In M2, we'll need to copy explicitly. | ||
if root.codec() == DAG_CBOR { | ||
// TODO(M2): Make this non-recursive. | ||
scan_for_links(&mut Cursor::new(block), |link| { | ||
copy_rec(cache, link, buffer) | ||
})?; | ||
// Record the block so we can write it back. | ||
result.push((k, block)); | ||
}; | ||
} | ||
|
||
// Finally, push the block. We do this _last_ so that we always include write before parents. | ||
buffer.push((root, block)); | ||
|
||
Ok(()) | ||
Ok(result) | ||
} | ||
|
||
impl<BS> Blockstore for BufferedBlockstore<BS> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to "take" blocks so we wouldn't end up writing them multiple times. The alternative would have been to keep some form of "written" set.
For some history, we used to clear the entire buffer on flush. However, that ended up causing some issues if we, e.g., wanted to flush some intermediate state. Now we take a middle-path of deleting the written blocks but leaving everything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this then end up saving disk space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lotus checks if we have a block before writing it, so it shouldn't matter from a disk-space perspective. However, it should save some time: