Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventConsumer #2145

Closed
wants to merge 20 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 170 additions & 134 deletions crates/bevy_ecs/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,66 +141,120 @@ impl<T> Default for Events<T> {
}
}

fn map_instance_event_with_id<T>(event_instance: &EventInstance<T>) -> (&T, EventId<T>) {
(&event_instance.event, event_instance.event_id)
}

fn map_instance_event<T>(event_instance: &EventInstance<T>) -> &T {
&event_instance.event
}
impl<T: Component> Events<T> {
/// "Sends" an `event` by writing it to the current event buffer. [EventReader]s can then read
/// the event.
pub fn send(&mut self, event: T) {
let event_id = EventId {
id: self.event_count,
_marker: PhantomData,
};
trace!("Events::send() -> {}", event_id);

/// Reads events of type `T` in order and tracks which events have already been read.
#[derive(SystemParam)]
pub struct EventReader<'a, T: Component> {
last_event_count: Local<'a, (usize, PhantomData<T>)>,
events: Res<'a, Events<T>>,
}
let event_instance = EventInstance { event_id, event };

/// Sends events of type `T`.
#[derive(SystemParam)]
pub struct EventWriter<'a, T: Component> {
events: ResMut<'a, Events<T>>,
}
match self.state {
State::A => self.events_a.push(event_instance),
State::B => self.events_b.push(event_instance),
}

impl<'a, T: Component> EventWriter<'a, T> {
pub fn send(&mut self, event: T) {
self.events.send(event);
self.event_count += 1;
}

pub fn send_batch(&mut self, events: impl Iterator<Item = T>) {
self.events.extend(events);
/// Gets a new [ManualEventReader]. This will include all events already in the event buffers.
pub fn get_reader(&self) -> ManualEventReader<T> {
ManualEventReader {
last_event_count: 0,
_marker: PhantomData,
}
}
}

pub struct ManualEventReader<T> {
last_event_count: usize,
_marker: PhantomData<T>,
}

impl<T> Default for ManualEventReader<T> {
fn default() -> Self {
/// Gets a new [ManualEventReader]. This will ignore all events already in the event buffers. It
/// will read all future events.
pub fn get_reader_current(&self) -> ManualEventReader<T> {
ManualEventReader {
last_event_count: 0,
_marker: Default::default(),
last_event_count: self.event_count,
_marker: PhantomData,
}
}
}

impl<T> ManualEventReader<T> {
/// See [`EventReader::iter`]
pub fn iter<'a>(&mut self, events: &'a Events<T>) -> impl DoubleEndedIterator<Item = &'a T> {
internal_event_reader(&mut self.last_event_count, events).map(|(e, _)| e)
/// Swaps the event buffers and clears the oldest event buffer. In general, this should be
/// called once per frame/update.
pub fn update(&mut self) {
match self.state {
State::A => {
self.events_b = Vec::new();
self.state = State::B;
self.b_start_event_count = self.event_count;
}
State::B => {
self.events_a = Vec::new();
self.state = State::A;
self.a_start_event_count = self.event_count;
}
}
}

/// See [`EventReader::iter_with_id`]
pub fn iter_with_id<'a>(
&mut self,
events: &'a Events<T>,
) -> impl DoubleEndedIterator<Item = (&'a T, EventId<T>)> {
internal_event_reader(&mut self.last_event_count, events)
/// A system that calls [Events::update] once per frame.
pub fn update_system(mut events: ResMut<Self>) {
events.update();
}

/// Removes all events.
pub fn clear(&mut self) {
self.events_a.clear();
self.events_b.clear();
}

/// Creates a draining iterator that removes all events.
pub fn drain(&mut self) -> impl DoubleEndedIterator<Item = T> + '_ {
self.drain_with_id().map(|(e, _)| e)
}

/// Creates a draining iterator that returns both events and their ids
pub fn drain_with_id(&mut self) -> impl DoubleEndedIterator<Item = (T, EventId<T>)> + '_ {
let event_instances = match self.state {
BufferState::A => self.events_b.drain(..).chain(self.events_a.drain(..)),
BufferState::B => self.events_a.drain(..).chain(self.events_b.drain(..)),
};

event_instances.map(|ei| {
alice-i-cecile marked this conversation as resolved.
Show resolved Hide resolved
trace!("Events::drain_with_id -> {}", ei.event_id);
Copy link
Contributor

@NathanSWard NathanSWard May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case we call drain() it may be a little confusing if our trace! message is "Events::drain_with_id", however I'm not sure if there is an obvious work around for this.... 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'm not super concerned about that; if you're using this trace functionality you're a relatively advanced user IMO.

(ei.event, ei.event_id)
})
}

pub fn extend<I>(&mut self, events: I)
where
I: Iterator<Item = T>,
{
for event in events {
self.send(event);
}
alice-i-cecile marked this conversation as resolved.
Show resolved Hide resolved
}

/// Iterates over events that happened since the last "update" call.
/// WARNING: You probably don't want to use this call. In most cases you should use an
/// `EventReader`. You should only use this if you know you only need to consume events
/// between the last `update()` call and your call to `iter_current_update_events`.
/// If events happen outside that window, they will not be handled. For example, any events that
/// happen after this call and before the next `update()` call will be dropped.
pub fn iter_current_update_events(&self) -> impl DoubleEndedIterator<Item = &T> {
match self.state {
State::A => self.events_a.iter().map(map_instance_event),
State::B => self.events_b.iter().map(map_instance_event),
}
}
}

fn map_instance_event_with_id<T>(event_instance: &EventInstance<T>) -> (&T, EventId<T>) {
(&event_instance.event, event_instance.event_id)
}

fn map_instance_event<T>(event_instance: &EventInstance<T>) -> &T {
&event_instance.event
}

/// Like [`iter_with_id`](EventReader::iter_with_id) except not emitting any traces for read
/// messages.
fn internal_event_reader<'a, T>(
Expand Down Expand Up @@ -251,6 +305,38 @@ fn internal_event_reader<'a, T>(
),
}
}
/// Sends events of type `T`.
pub struct EventWriter<'a, T: Component> {
events: &'a mut Events<T>,
}

impl<'a, T: Component> SystemParam for EventWriter<'a, T> {
type Fetch = ResMutState<Events<T>>;
}

impl<'a, T: Component> EventWriter<'a, T> {
pub fn new(events: &'a mut Events<T>) -> Self {
EventWriter::<'a, T> { events }
}

pub fn send(&mut self, event: T) {
self.events.send(event);
}

pub fn send_batch(&mut self, events: impl Iterator<Item = T>) {
self.events.extend(events);
}
}

/// Reads events of type `T` in order and tracks which events have already been read.
pub struct EventReader<'a, T: Component> {
last_event_count: Local<'a, (usize, PhantomData<T>)>,
events: &'a Events<T>,
}

impl<'a, T: Component> SystemParam for EventReader<'a, T> {
type Fetch = ResState<Events<T>>;
}

impl<'a, T: Component> EventReader<'a, T> {
/// Iterates over the events this EventReader has not seen yet. This updates the EventReader's
Expand All @@ -269,108 +355,58 @@ impl<'a, T: Component> EventReader<'a, T> {
}
}

impl<T: Component> Events<T> {
/// "Sends" an `event` by writing it to the current event buffer. [EventReader]s can then read
/// the event.
pub fn send(&mut self, event: T) {
let event_id = EventId {
id: self.event_count,
_marker: PhantomData,
};
trace!("Events::send() -> {}", event_id);

let event_instance = EventInstance { event_id, event };

match self.state {
State::A => self.events_a.push(event_instance),
State::B => self.events_b.push(event_instance),
}
/// Reads and consumes all events of type T
///
/// Useful for manual event cleanup when [AppBuilder::add_event::<T>] is omitted,
/// allowing events to accumulate on your components or resources until consumed.
/// Note: due to the draining nature of this reader, you probably only want one
/// EventConsumer per event storage location + event type combination.
pub struct EventConsumer<'a, T: Component> {
events: &'a mut Events<T>,
}

self.event_count += 1;
}
impl<'a, T: Component> SystemParam for EventConsumer<'a, T> {
type Fetch = ResMutState<Events<T>>;
}

/// Gets a new [ManualEventReader]. This will include all events already in the event buffers.
pub fn get_reader(&self) -> ManualEventReader<T> {
ManualEventReader {
last_event_count: 0,
_marker: PhantomData,
}
impl<'a, T: Component> EventConsumer<'a, T> {
alice-i-cecile marked this conversation as resolved.
Show resolved Hide resolved
/// Drains all available events this EventConsumer has access to into an iterator
pub fn drain(self) -> impl DoubleEndedIterator<Item = T> + 'a {
self.events.drain()
}

/// Gets a new [ManualEventReader]. This will ignore all events already in the event buffers. It
/// will read all future events.
pub fn get_reader_current(&self) -> ManualEventReader<T> {
ManualEventReader {
last_event_count: self.event_count,
_marker: PhantomData,
}
}

/// Swaps the event buffers and clears the oldest event buffer. In general, this should be
/// called once per frame/update.
pub fn update(&mut self) {
match self.state {
State::A => {
self.events_b = Vec::new();
self.state = State::B;
self.b_start_event_count = self.event_count;
}
State::B => {
self.events_a = Vec::new();
self.state = State::A;
self.a_start_event_count = self.event_count;
}
}
}

/// A system that calls [Events::update] once per frame.
pub fn update_system(mut events: ResMut<Self>) {
events.update();
/// Drains all available events this EventConsumer has access to into an iterator and returns the id
pub fn drain_with_id(self) -> impl DoubleEndedIterator<Item = (T, EventId<T>)> + 'a {
self.events.drain_with_id()
}
}

/// Removes all events.
pub fn clear(&mut self) {
self.events_a.clear();
self.events_b.clear();
}
pub struct ManualEventReader<T> {
last_event_count: usize,
_marker: PhantomData<T>,
}

/// Creates a draining iterator that removes all events.
pub fn drain(&mut self) -> impl Iterator<Item = T> + '_ {
let map = |i: EventInstance<T>| i.event;
match self.state {
State::A => self
.events_b
.drain(..)
.map(map)
.chain(self.events_a.drain(..).map(map)),
State::B => self
.events_a
.drain(..)
.map(map)
.chain(self.events_b.drain(..).map(map)),
impl<T> Default for ManualEventReader<T> {
fn default() -> Self {
ManualEventReader {
last_event_count: 0,
_marker: Default::default(),
}
}
}

pub fn extend<I>(&mut self, events: I)
where
I: Iterator<Item = T>,
{
for event in events {
self.send(event);
}
impl<T> ManualEventReader<T> {
/// See [`EventReader::iter`]
pub fn iter<'a>(&mut self, events: &'a Events<T>) -> impl DoubleEndedIterator<Item = &'a T> {
internal_event_reader(&mut self.last_event_count, events).map(|(e, _)| e)
}

/// Iterates over events that happened since the last "update" call.
/// WARNING: You probably don't want to use this call. In most cases you should use an
/// `EventReader`. You should only use this if you know you only need to consume events
/// between the last `update()` call and your call to `iter_current_update_events`.
/// If events happen outside that window, they will not be handled. For example, any events that
/// happen after this call and before the next `update()` call will be dropped.
pub fn iter_current_update_events(&self) -> impl DoubleEndedIterator<Item = &T> {
match self.state {
State::A => self.events_a.iter().map(map_instance_event),
State::B => self.events_b.iter().map(map_instance_event),
}
/// See [`EventReader::iter_with_id`]
pub fn iter_with_id<'a>(
&mut self,
events: &'a Events<T>,
) -> impl DoubleEndedIterator<Item = (&'a T, EventId<T>)> {
internal_event_reader(&mut self.last_event_count, events)
}
}

Expand Down