Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: fixup fvm IPLD flush logic #1810

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion fvm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ filecoin-proofs-api = { version = "14", default-features = false }
rayon = "1"
num_cpus = "1.15.0"
log = "0.4.19"
byteorder = "1.4.3"
fvm-wasm-instrument = "0.4.0"
yastl = "0.1.2"
arbitrary = { version = "1.3.0", optional = true, features = ["derive"] }
Expand Down
201 changes: 84 additions & 117 deletions fvm/src/blockstore/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@

use std::cell::RefCell;
use std::collections::HashMap;
use std::io::{Cursor, Read, Seek};
use std::io::Read;

use anyhow::{anyhow, Result};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use cid::Cid;
use fvm_ipld_blockstore::{Blockstore, Buffered};
use fvm_ipld_encoding::{CBOR, DAG_CBOR};
use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW};
use fvm_shared::commcid::{FIL_COMMITMENT_SEALED, FIL_COMMITMENT_UNSEALED};

/// Wrapper around `Blockstore` to limit and have control over when values are written.
Expand Down Expand Up @@ -43,15 +42,10 @@ where
{
/// Flushes the buffered cache based on the root node.
/// This will recursively traverse the cache and write all data connected by links to this
/// root Cid. Calling flush will not reset the write buffer.
/// root Cid, moving the reachable blocks from the write buffer to the backing store.
fn flush(&self, root: &Cid) -> Result<()> {
let mut buffer = Vec::new();
let s = self.write.borrow();
copy_rec(&s, *root, &mut buffer)?;

self.base.put_many_keyed(buffer)?;

Ok(())
self.base
.put_many_keyed(take_reachable(&mut self.write.borrow_mut(), root)?)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to "take" blocks so we wouldn't end up writing them multiple times. The alternative would have been to keep some form of "written" set.

For some history, we used to clear the entire buffer on flush. However, that ended up causing some issues if we, e.g., wanted to flush some intermediate state. Now we take a middle-path of deleting the written blocks but leaving everything else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this then end up saving disk space?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lotus checks if we have a block before writing it, so it shouldn't matter from a disk-space perspective. However, it should save some time:

  1. If we have to flush multiple times. Although I think we only do this in some tools/tests.
  2. If we have identical subtrees and/or common blocks. I could have solved that by keeping a temporary "seen" set while flushing instead of actually removing the flushed blocks from the write buffer, but this way was faster/simpler.

}
}

Expand All @@ -62,86 +56,63 @@ where
/// This was implemented because the CBOR library we use does not expose low
/// methods like this, requiring us to deserialize the whole CBOR payload, which
/// is unnecessary and quite inefficient for our usecase here.
fn cbor_read_header_buf<B: Read>(br: &mut B, scratch: &mut [u8]) -> anyhow::Result<(u8, usize)> {
let first = br.read_u8()?;
fn cbor_read_header_buf<B: Read>(br: &mut B) -> anyhow::Result<(u8, u64)> {
#[inline(always)]
pub fn read_fixed<const N: usize>(r: &mut impl Read) -> std::io::Result<[u8; N]> {
let mut buf = [0; N];
r.read_exact(&mut buf).map(|_| buf)
}

let first = read_fixed::<1>(br)?[0];
let maj = (first & 0xe0) >> 5;
let low = first & 0x1f;

if low < 24 {
Ok((maj, low as usize))
} else if low == 24 {
let val = br.read_u8()?;
if val < 24 {
return Err(anyhow!(
"cbor input was not canonical (lval 24 with value < 24)"
));
}
Ok((maj, val as usize))
} else if low == 25 {
br.read_exact(&mut scratch[..2])?;
let val = BigEndian::read_u16(&scratch[..2]);
if val <= u8::MAX as u16 {
return Err(anyhow!(
"cbor input was not canonical (lval 25 with value <= MaxUint8)"
));
}
Ok((maj, val as usize))
} else if low == 26 {
br.read_exact(&mut scratch[..4])?;
let val = BigEndian::read_u32(&scratch[..4]);
if val <= u16::MAX as u32 {
return Err(anyhow!(
"cbor input was not canonical (lval 26 with value <= MaxUint16)"
));
}
Ok((maj, val as usize))
} else if low == 27 {
br.read_exact(&mut scratch[..8])?;
let val = BigEndian::read_u64(&scratch[..8]);
if val <= u32::MAX as u64 {
return Err(anyhow!(
"cbor input was not canonical (lval 27 with value <= MaxUint32)"
));
}
Ok((maj, val as usize))
} else {
Err(anyhow!("invalid header cbor_read_header_buf"))
}
let val = match low {
..=23 => low.into(),
24 => read_fixed::<1>(br)?[0].into(),
25 => u16::from_be_bytes(read_fixed(br)?).into(),
26 => u32::from_be_bytes(read_fixed(br)?).into(),
27 => u64::from_be_bytes(read_fixed(br)?),
_ => return Err(anyhow!("invalid header cbor_read_header_buf")),
};
Comment on lines +70 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great refactor for improving readability!

Regarding performance, IIUC this is now doing 2 extra allocations for each call now instead of using a scratch buffer as before?

Copy link
Member Author

@Stebalien Stebalien Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stack allocations so they should be free (except zeroing, but that's pretty cheap and rust can likely optimize it away).

Ok((maj, val))
}

/// Given a CBOR serialized IPLD buffer, read through all of it and return all the Links.
/// This function is useful because it is quite a bit more fast than doing this recursively on a
/// deserialized IPLD object.
fn scan_for_links<B: Read + Seek, F>(buf: &mut B, mut callback: F) -> Result<()>
where
F: FnMut(Cid) -> anyhow::Result<()>,
{
let mut scratch: [u8; 100] = [0; 100];
fn scan_for_links(mut buf: &[u8], out: &mut Vec<Cid>) -> Result<()> {
let mut remaining = 1;
while remaining > 0 {
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?;
let (maj, extra) = cbor_read_header_buf(&mut buf)?;
match maj {
// MajUnsignedInt, MajNegativeInt, MajOther
0 | 1 | 7 => {}
// MajByteString, MajTextString
2 | 3 => {
buf.seek(std::io::SeekFrom::Current(extra as i64))?;
if extra > buf.len() as u64 {
return Err(anyhow!("unexpected end of cbor stream"));
}
buf = &buf[extra as usize..];
}
// MajTag
6 => {
// Check if the tag refers to a CID
if extra == 42 {
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?;
let (maj, extra) = cbor_read_header_buf(&mut buf)?;
// The actual CID is expected to be a byte string
if maj != 2 {
return Err(anyhow!("expected cbor type byte string in input"));
}
if extra > 100 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was unnecessary. If the CID is invalid and/or too big, Cid::try_from will catch that.

return Err(anyhow!("string in cbor input too long"));
if extra > buf.len() as u64 {
return Err(anyhow!("unexpected end of cbor stream"));
}
buf.read_exact(&mut scratch[..extra])?;
let c = Cid::try_from(&scratch[1..extra])?;
callback(c)?;
if buf.first() != Some(&0u8) {
return Err(anyhow!("DagCBOR CID does not start with a 0x byte"));
}
let cid_buf;
(cid_buf, buf) = buf.split_at(extra as usize);
out.push(Cid::try_from(&cid_buf[1..])?);
} else {
remaining += 1;
}
Expand All @@ -154,22 +125,18 @@ where
5 => {
remaining += extra * 2;
}
_ => {
return Err(anyhow!("unhandled cbor type: {}", maj));
8.. => {
// This case is statically impossible unless `cbor_read_header_buf` makes a mistake.
return Err(anyhow!("invalid cbor tag exceeds 3 bits: {}", maj));
}
}
remaining -= 1;
}
Ok(())
}

/// Copies the IPLD DAG under `root` from the cache to the base store.
fn copy_rec<'a>(
cache: &'a HashMap<Cid, Vec<u8>>,
root: Cid,
buffer: &mut Vec<(Cid, &'a [u8])>,
) -> Result<()> {
const DAG_RAW: u64 = 0x55;
/// Moves the IPLD DAG under `root` from the cache to the base store.
fn take_reachable(cache: &mut HashMap<Cid, Vec<u8>>, root: &Cid) -> Result<Vec<(Cid, Vec<u8>)>> {
const BLAKE2B_256: u64 = 0xb220;
const BLAKE2B_LEN: u8 = 32;
const IDENTITY: u64 = 0x0;
Expand All @@ -180,53 +147,53 @@ fn copy_rec<'a>(
// 2. We always write-back new blocks, even if the client already has them. We haven't noticed a
// perf impact.

// TODO(M2): Make this not cbor specific.
// TODO(M2): Allow CBOR (not just DAG_CBOR).
match (root.codec(), root.hash().code(), root.hash().size()) {
// Allow non-truncated blake2b-256 raw/cbor (code/state)
(DAG_RAW | DAG_CBOR | CBOR, BLAKE2B_256, BLAKE2B_LEN) => (),
// Ignore raw identity cids (fake code cids)
(DAG_RAW, IDENTITY, _) => return Ok(()),
// Copy links from cbor identity cids.
// We shouldn't be creating these at the moment, but lotus' vm.Copy supports them.
(DAG_CBOR, IDENTITY, _) => {
return scan_for_links(&mut Cursor::new(root.hash().digest()), |link| {
copy_rec(cache, link, buffer)
})
let mut stack = vec![*root];
let mut result = Vec::new();

while let Some(k) = stack.pop() {
// Check the codec.
match k.codec() {
// We ignore piece commitment CIDs.
FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED => continue,
// We allow raw, cbor, and dag cbor.
IPLD_RAW | DAG_CBOR | CBOR => (),
// Everything else is rejected.
codec => return Err(anyhow!("cid {k} has unexpected codec ({codec})")),
}
// Ignore commitments (not even going to check the hash function.
(FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED, _, _) => return Ok(()),
// Fail on anything else. We usually want to continue on error, but there's really no going
// back from here.
(codec, hash, length) => {
return Err(anyhow!(
"cid {root} has unexpected codec ({codec}), hash ({hash}), or length ({length})"
))
// Check the hash construction.
match (k.hash().code(), k.hash().size()) {
// Allow non-truncated blake2b-256 and identity hashes.
(BLAKE2B_256, BLAKE2B_LEN) | (IDENTITY, _) => (),
// Reject everything else.
(hash, length) => {
return Err(anyhow!(
"cid {k} has unexpected multihash (code={hash}, len={length})"
))
}
}
Comment on lines +155 to 173
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic was shorter but hard to reason about.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is more readable

}

// If we don't have the block, we assume it's already in the datastore.
//
// The alternative would be to check if it's in the datastore, but that's likely even more
// expensive. And there wouldn't be much we could do at that point but abort the block.
let block = match cache.get(&root) {
Some(blk) => blk,
None => return Ok(()),
};
if k.hash().code() == IDENTITY {
if k.codec() == DAG_CBOR {
scan_for_links(k.hash().digest(), &mut stack)?;
}
} else {
// If we don't have the block, we assume it and it's children are already in the
// datastore.
//
// The alternative would be to check if it's in the datastore, but that's likely even more
// expensive. And there wouldn't be much we could do at that point but abort the block.
let Some(block) = cache.remove(&k) else { continue };

// At the moment, only DAG_CBOR can link to other blocks.
if k.codec() == DAG_CBOR {
scan_for_links(&block, &mut stack)?;
}

// At the moment, we only expect dag-cbor and raw.
// In M2, we'll need to copy explicitly.
if root.codec() == DAG_CBOR {
// TODO(M2): Make this non-recursive.
scan_for_links(&mut Cursor::new(block), |link| {
copy_rec(cache, link, buffer)
})?;
// Record the block so we can write it back.
result.push((k, block));
};
}

// Finally, push the block. We do this _last_ so that we always include write before parents.
buffer.push((root, block));

Ok(())
Ok(result)
}

impl<BS> Blockstore for BufferedBlockstore<BS>
Expand Down