Skip to content

Commit

Permalink
Cleanup macros
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Apr 13, 2024
1 parent ffdcba2 commit ffaaba2
Showing 1 changed file with 78 additions and 76 deletions.
154 changes: 78 additions & 76 deletions core/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,37 @@ enum SnapshotRecord<'a, I: Clone, A: Clone> {

const MAX_INLINED_BYTES: usize = 64;

macro_rules! encode {
($w:ident.$node: ident::$status: ident) => {{
let node = $node.as_ref();
let encoded_node_len = T::node_encoded_len(node);
let encoded_len = 4 + 1 + encoded_node_len;
if encoded_len <= MAX_INLINED_BYTES {
let mut buf = [0u8; MAX_INLINED_BYTES];
buf[0] = Self::$status;
buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
T::encode_node(node, &mut buf[5..]).map_err(invalid_data_io_error)?;
$w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
} else {
let mut buf = BytesMut::with_capacity(encoded_len);
buf.put_u8(Self::$status);
buf.put_u32_le(encoded_node_len as u32);
T::encode_node(node, &mut buf).map_err(invalid_data_io_error)?;
$w.write_all(&buf).map(|_| encoded_len)
}
}};
($w:ident.$t: ident($status: ident)) => {{
const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
let mut data = [0u8; N];
data[0] = Self::$status;
data[1..N].copy_from_slice(&$t.to_le_bytes());
$w.write_all(&data).map(|_| N)
}};
($w:ident.$ident: ident) => {{
$w.write_all(&[Self::$ident]).map(|_| 1)
}};
}

impl<'a, I, A> SnapshotRecord<'a, I, A>
where
I: Id,
Expand All @@ -160,46 +191,15 @@ where
&self,
w: &mut W,
) -> std::io::Result<usize> {
macro_rules! encode {
($node: ident::$status: ident) => {{
let node = $node.as_ref();
let encoded_node_len = T::node_encoded_len(node);
let encoded_len = 4 + 1 + encoded_node_len;
if encoded_len <= MAX_INLINED_BYTES {
let mut buf = [0u8; MAX_INLINED_BYTES];
buf[0] = Self::$status;
buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
T::encode_node(node, &mut buf[5..]).map_err(invalid_data_io_error)?;
w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
} else {
let mut buf = BytesMut::with_capacity(encoded_len);
buf.put_u8(Self::$status);
buf.put_u32_le(encoded_node_len as u32);
T::encode_node(node, &mut buf).map_err(invalid_data_io_error)?;
w.write_all(&buf).map(|_| encoded_len)
}
}};
($t: ident($status: ident)) => {{
const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
let mut data = [0u8; N];
data[0] = Self::$status;
data[1..N].copy_from_slice(&$t.to_le_bytes());
w.write_all(&data).map(|_| N)
}};
($ident: ident) => {{
w.write_all(&[Self::$ident]).map(|_| 1)
}};
}

match self {
Self::Alive(id) => encode!(id::ALIVE),
Self::NotAlive(id) => encode!(id::NOT_ALIVE),
Self::Clock(t) => encode!(t(CLOCK)),
Self::EventClock(t) => encode!(t(EVENT_CLOCK)),
Self::QueryClock(t) => encode!(t(QUERY_CLOCK)),
Self::Coordinate => encode!(COORDINATE),
Self::Leave => encode!(LEAVE),
Self::Comment => encode!(COMMENT),
Self::Alive(id) => encode!(w.id::ALIVE),
Self::NotAlive(id) => encode!(w.id::NOT_ALIVE),
Self::Clock(t) => encode!(w.t(CLOCK)),
Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)),
Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)),
Self::Coordinate => encode!(w.COORDINATE),
Self::Leave => encode!(w.LEAVE),
Self::Comment => encode!(w.COMMENT),
}
}
}
Expand Down Expand Up @@ -387,6 +387,39 @@ where
metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
}

// flushEvent is used to handle writing out an event
macro_rules! stream_flush_event {
($this:ident <- $event:ident) => {{
// Stop recording events after a leave is issued
if $this.leaving {
break;
}

match &$event {
CrateEvent::Member(e) => $this.process_member_event(e),
CrateEvent::User(e) => $this.process_user_event(e),
CrateEvent::Query(e) => $this.process_query_event(e.ltime),
CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
}
}};
}

macro_rules! tee_stream_flush_event {
($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{
// Forward to the internal stream, do not block
futures::select! {
_ = $stream_tx.send($event.clone()).fuse() => {}
default => {}
}

// Forward the event immediately, do not block
futures::select! {
_ = $out_tx.send($event).fuse() => {}
default => {}
}
}};
}

impl<D, T> Snapshot<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
Expand Down Expand Up @@ -486,27 +519,11 @@ where
out_tx: Sender<CrateEvent<T, D>>,
shutdown_rx: Receiver<()>,
) {
macro_rules! flush_event {
($event:ident) => {{
// Forward to the internal stream, do not block
futures::select! {
_ = stream_tx.send($event.clone()).fuse() => {}
default => {}
}

// Forward the event immediately, do not block
futures::select! {
_ = out_tx.send($event).fuse() => {}
default => {}
}
}};
}

loop {
futures::select! {
ev = in_rx.recv().fuse() => {
if let Ok(ev) = ev {
flush_event!(ev)
tee_stream_flush_event!(stream_tx <- ev -> out_tx)
} else {
break;
}
Expand All @@ -522,7 +539,7 @@ where
futures::select! {
ev = in_rx.recv().fuse() => {
if let Ok(ev) = ev {
flush_event!(ev)
tee_stream_flush_event!(stream_tx <- ev -> out_tx)
} else {
break;
}
Expand All @@ -540,23 +557,6 @@ where
) {
let mut clock_ticker = <T::Runtime as RuntimeLite>::interval(CLOCK_UPDATE_INTERVAL);

// flushEvent is used to handle writing out an event
macro_rules! flush_event {
($this:ident <- $event:ident) => {{
// Stop recording events after a leave is issued
if $this.leaving {
break;
}

match &$event {
CrateEvent::Member(e) => $this.process_member_event(e),
CrateEvent::User(e) => $this.process_user_event(e),
CrateEvent::Query(e) => $this.process_query_event(e.ltime),
CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
}
}};
}

loop {
futures::select! {
_ = self.leave_rx.recv().fuse() => {
Expand All @@ -580,7 +580,7 @@ where
}
ev = self.stream_rx.recv().fuse() => {
if let Ok(ev) = ev {
flush_event!(self <- ev)
stream_flush_event!(self <- ev)
} else {
break;
}
Expand All @@ -606,7 +606,9 @@ where
futures::select! {
ev = self.stream_rx.recv().fuse() => {
if let Ok(ev) = ev {
flush_event!(self <- ev)
stream_flush_event!(self <- ev)
} else {
break;
}
}
_ = (&mut flush_timeout).fuse() => {
Expand Down

0 comments on commit ffaaba2

Please sign in to comment.