Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for actor events. #1049

Merged
merged 31 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
277aa3c
events: add basic eventing data model.
raulk Nov 5, 2022
912e65b
events: event eventing data model.
raulk Nov 6, 2022
8a5d538
events: implement the fvm-side handling of actor events.
raulk Nov 6, 2022
2c2d958
events: add events AMT root CID in Receipt.
raulk Nov 6, 2022
b498f03
events: implement support in SDK.
raulk Nov 6, 2022
19f88c3
events: implement basic integration tests.
raulk Nov 6, 2022
56b2679
fix typo.
raulk Nov 6, 2022
7d72eee
events: Receipt: make events_root is an Option<Cid>.
raulk Nov 6, 2022
cc60eea
fix clippy.
raulk Nov 6, 2022
d0648f2
events: charge gas for actor events (params TODO) + use bitflags.
raulk Nov 6, 2022
f18b9d9
fix comment.
raulk Nov 6, 2022
ab4880b
fix compile error.
raulk Nov 9, 2022
da15510
Merge branch 'master' into raulk/events
raulk Nov 9, 2022
40bf949
fix clippy.
raulk Nov 9, 2022
9c29e6e
Merge branch 'master' into raulk/events
raulk Nov 13, 2022
bd35a04
events: make sdk::event::emit_event borrow.
raulk Nov 13, 2022
041ea1a
fix clippy.
raulk Nov 14, 2022
1949ba1
Merge branch 'master' into raulk/events
raulk Nov 14, 2022
4b93932
fix events test.
raulk Nov 14, 2022
3d2b971
Merge branch 'master' into raulk/events
raulk Nov 14, 2022
f1ac14b
fix: serde ActorEvent as a tuple.
raulk Nov 14, 2022
44cfc40
events: serialize ActorEvent as transparent.
raulk Nov 14, 2022
10e4f51
events: fix Entry#value type.
raulk Nov 14, 2022
7d1a62f
address review comment.
raulk Nov 14, 2022
f380575
improve TODO.
raulk Nov 14, 2022
547d8c2
events: initialize event accumulator with no capacity.
raulk Nov 15, 2022
3a8d2e5
address review comment.
raulk Nov 15, 2022
a19b83c
discard events emitted by aborting actors, and its subcallees.
raulk Nov 15, 2022
240b7e5
simplify events accumulator layer accounting.
raulk Nov 15, 2022
30c9d71
fix clippy.
raulk Nov 15, 2022
02e7944
address nits.
raulk Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions fvm/src/call_manager/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use fvm_ipld_encoding::{to_vec, RawBytes, DAG_CBOR};
use fvm_shared::address::{Address, Payload};
use fvm_shared::econ::TokenAmount;
use fvm_shared::error::{ErrorNumber, ExitCode};
use fvm_shared::event::StampedEvent;
use fvm_shared::sys::BlockId;
use fvm_shared::{ActorID, MethodNum, METHOD_SEND};
use num_traits::Zero;
Expand Down Expand Up @@ -51,6 +52,8 @@ pub struct InnerDefaultCallManager<M: Machine> {
invocation_count: u64,
/// Limits on memory throughout the execution.
limits: M::Limiter,
/// Accumulator for events emitted in this call stack.
events: EventsAccumulator,
}

#[doc(hidden)]
Expand Down Expand Up @@ -100,6 +103,7 @@ where
exec_trace: vec![],
invocation_count: 0,
limits,
events: Default::default(),
})))
}

Expand Down Expand Up @@ -162,11 +166,20 @@ where
f: impl FnOnce(&mut Self) -> Result<InvocationResult>,
) -> Result<InvocationResult> {
self.state_tree_mut().begin_transaction();
self.events.create_layer();

let (revert, res) = match f(self) {
Ok(v) => (!v.exit_code().is_success(), Ok(v)),
Err(e) => (true, Err(e)),
};
self.state_tree_mut().end_transaction(revert)?;

if revert {
self.events.discard_last_layer()?;
} else {
self.events.merge_last_layer()?;
}

res
}

Expand All @@ -176,6 +189,7 @@ where
backtrace,
mut gas_tracker,
mut exec_trace,
events,
..
} = *self.0.take().expect("call manager is poisoned");

Expand All @@ -187,11 +201,14 @@ where
exec_trace.extend(gas_tracker.drain_trace().map(ExecutionEvent::GasCharge));
}

let events = events.finish();

(
FinishRet {
gas_used,
backtrace,
exec_trace,
events,
},
machine,
)
Expand Down Expand Up @@ -236,6 +253,10 @@ where
fn invocation_count(&self) -> u64 {
self.invocation_count
}

fn append_event(&mut self, evt: StampedEvent) {
self.events.append_event(evt)
}
}

impl<M> DefaultCallManager<M>
Expand Down Expand Up @@ -568,3 +589,49 @@ where
res
}
}

/// Stores events in layers as they are emitted by actors. As the call stack progresses, when an
/// actor exits normally, its events should be merged onto the previous layer (merge_last_layer).
/// If an actor aborts, the last layer should be discarded (discard_last_layer). This will also
/// throw away any events collected from subcalls (and previously merged, as those subcalls returned
/// normally).
#[derive(Default)]
pub struct EventsAccumulator {
events: Vec<StampedEvent>,
idxs: Vec<usize>,
}

impl EventsAccumulator {
fn append_event(&mut self, evt: StampedEvent) {
self.events.push(evt)
}

fn create_layer(&mut self) {
self.idxs.push(self.events.len());
}

fn merge_last_layer(&mut self) -> Result<()> {
self.idxs.pop().map(|_| {}).ok_or_else(|| {
ExecutionError::Fatal(anyhow!(
"no index in the event accumulator when calling merge_last_layer"
))
})
}

fn discard_last_layer(&mut self) -> Result<()> {
let idx = self.idxs.pop().ok_or_else(|| {
ExecutionError::Fatal(anyhow!(
"no index in the event accumulator when calling discard_last_layer"
))
})?;
self.events.truncate(idx);
Ok(())
}

fn finish(self) -> Vec<StampedEvent> {
// Ideally would assert here, but there's risk of poisoning the Machine.
// Cannot return a Result because the call site expects infallibility.
// assert!(self.idxs.is_empty());
self.events
}
}
5 changes: 5 additions & 0 deletions fvm/src/call_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use backtrace::Backtrace;
mod default;

pub use default::DefaultCallManager;
use fvm_shared::event::StampedEvent;

use crate::trace::ExecutionTrace;

Expand Down Expand Up @@ -129,6 +130,9 @@ pub trait CallManager: 'static {

/// Limit memory usage throughout a message execution.
fn limiter_mut(&mut self) -> &mut <Self::Machine as Machine>::Limiter;

/// Appends an event to the event accumulator.
fn append_event(&mut self, evt: StampedEvent);
}

/// The result of a method invocation.
Expand Down Expand Up @@ -162,4 +166,5 @@ pub struct FinishRet {
pub gas_used: i64,
pub backtrace: Backtrace,
pub exec_trace: ExecutionTrace,
pub events: Vec<StampedEvent>,
}
62 changes: 51 additions & 11 deletions fvm/src/executor/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ use fvm_shared::address::Address;
use fvm_shared::address::Payload;
use fvm_shared::econ::TokenAmount;
use fvm_shared::error::{ErrorNumber, ExitCode};
use fvm_shared::event::StampedEvent;
use fvm_shared::message::Message;
use fvm_shared::receipt::Receipt;
use fvm_shared::ActorID;
use num_traits::Zero;

use super::{ApplyFailure, ApplyKind, ApplyRet, Executor};
use crate::call_manager::{backtrace, CallManager, InvocationResult};
use crate::call_manager::{backtrace, Backtrace, CallManager, InvocationResult};
use crate::gas::{Gas, GasCharge, GasOutputs};
use crate::kernel::{Block, ClassifyResult, Context as _, ExecutionError, Kernel};
use crate::machine::{Machine, BURNT_FUNDS_ACTOR_ADDR, REWARD_ACTOR_ADDR};
use crate::trace::ExecutionTrace;

/// The default [`Executor`].
///
/// # Warning
///
/// Message execution might run out of stack and crash (the entire process) if it doesn't have at
/// least 64MiB of stacks space. If you can't guarantee 64MiB of stack space, wrap this executor in
/// least 64MiB of stack space. If you can't guarantee 64MiB of stack space, wrap this executor in
/// a [`ThreadedExecutor`][super::ThreadedExecutor].
// If the inner value is `None` it means the machine got poisoned and is unusable.
#[repr(transparent)]
Expand Down Expand Up @@ -65,8 +67,17 @@ where
Err(apply_ret) => return Ok(apply_ret),
};

struct MachineExecRet {
result: crate::kernel::error::Result<InvocationResult>,
gas_used: i64,
backtrace: Backtrace,
exec_trace: ExecutionTrace,
events_root: Option<Cid>,
events: Vec<StampedEvent>, // TODO consider removing if nothing in the client ends up using it.
}

// Apply the message.
let (res, gas_used, mut backtrace, exec_trace) = self.map_machine(|machine| {
let ret = self.map_machine(|machine| {
// We're processing a chain message, so the sender is the origin of the call stack.
let mut cm = K::CallManager::new(
machine,
Expand Down Expand Up @@ -101,12 +112,35 @@ where
Ok(ret)
});
let (res, machine) = cm.finish();

// Flush all events to the store.
let events_root = match machine.commit_events(res.events.as_slice()) {
Ok(cid) => cid,
Err(e) => return (Err(e), machine),
};

(
Ok((result, res.gas_used, res.backtrace, res.exec_trace)),
Ok(MachineExecRet {
result,
gas_used: res.gas_used,
backtrace: res.backtrace,
exec_trace: res.exec_trace,
events_root,
events: res.events,
}),
machine,
)
})?;

let MachineExecRet {
result: res,
gas_used,
mut backtrace,
exec_trace,
events_root,
events,
} = ret;

// Extract the exit code and build the result of the message application.
let receipt = match res {
Ok(InvocationResult::Return(return_value)) => {
Expand All @@ -121,6 +155,7 @@ where
exit_code: ExitCode::OK,
return_data,
gas_used,
events_root,
}
}
Ok(InvocationResult::Failure(exit_code)) => {
Expand All @@ -131,12 +166,14 @@ where
exit_code,
return_data: Default::default(),
gas_used,
events_root,
}
}
Err(ExecutionError::OutOfGas) => Receipt {
exit_code: ExitCode::SYS_OUT_OF_GAS,
return_data: Default::default(),
gas_used,
events_root,
},
Err(ExecutionError::Syscall(err)) => {
// Errors indicate the message couldn't be dispatched at all
Expand All @@ -153,6 +190,7 @@ where
exit_code,
return_data: Default::default(),
gas_used,
events_root,
}
}
Err(ExecutionError::Fatal(err)) => {
Expand All @@ -177,6 +215,7 @@ where
exit_code: ExitCode::SYS_ASSERTION_FAILED,
return_data: Default::default(),
gas_used: msg.gas_limit,
events_root,
}
}
};
Expand All @@ -188,12 +227,9 @@ where
};

match apply_kind {
ApplyKind::Explicit => self
.finish_message(msg, receipt, failure_info, gas_cost)
.map(|mut apply_ret| {
apply_ret.exec_trace = exec_trace;
apply_ret
}),
ApplyKind::Explicit => {
self.finish_message(msg, receipt, failure_info, gas_cost, exec_trace, events)
}
ApplyKind::Implicit => Ok(ApplyRet {
msg_receipt: receipt,
penalty: TokenAmount::zero(),
Expand All @@ -205,6 +241,7 @@ where
gas_burned: 0,
failure_info,
exec_trace,
events,
}),
}
}
Expand Down Expand Up @@ -367,6 +404,8 @@ where
receipt: Receipt,
failure_info: Option<ApplyFailure>,
gas_cost: TokenAmount,
exec_trace: ExecutionTrace,
events: Vec<StampedEvent>,
) -> anyhow::Result<ApplyRet> {
// NOTE: we don't support old network versions in the FVM, so we always burn.
let GasOutputs {
Expand Down Expand Up @@ -425,7 +464,8 @@ where
gas_refund,
gas_burned,
failure_info,
exec_trace: vec![],
exec_trace,
events,
})
}

Expand Down
5 changes: 5 additions & 0 deletions fvm/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use default::DefaultExecutor;
use fvm_ipld_encoding::RawBytes;
use fvm_shared::econ::TokenAmount;
use fvm_shared::error::ExitCode;
use fvm_shared::event::StampedEvent;
use fvm_shared::message::Message;
use fvm_shared::receipt::Receipt;
use num_traits::Zero;
Expand Down Expand Up @@ -88,6 +89,8 @@ pub struct ApplyRet {
pub failure_info: Option<ApplyFailure>,
/// Execution trace information, for debugging.
pub exec_trace: ExecutionTrace,
/// Events generated while applying the message.
pub events: Vec<StampedEvent>,
}

impl ApplyRet {
Expand All @@ -102,6 +105,7 @@ impl ApplyRet {
exit_code: code,
return_data: RawBytes::default(),
gas_used: 0,
events_root: None,
},
penalty: miner_penalty,
miner_tip: TokenAmount::zero(),
Expand All @@ -112,6 +116,7 @@ impl ApplyRet {
gas_burned: 0,
failure_info: Some(ApplyFailure::PreValidation(message.into())),
exec_trace: vec![],
events: vec![],
}
}
}
Expand Down
Loading