diff --git a/core/Cargo.toml b/core/Cargo.toml index b85f297..5b92343 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -29,6 +29,7 @@ serde = [ test = ["memberlist-core/test", "paste", "tracing-subscriber", "tempfile"] [dependencies] +auto_impl = "1" atomic_refcell = "0.1" arc-swap = "1" async-lock = "3" diff --git a/core/src/coordinate.rs b/core/src/coordinate.rs index 1cc6cf1..944d14d 100644 --- a/core/src/coordinate.rs +++ b/core/src/coordinate.rs @@ -25,12 +25,16 @@ const DEFAULT_ADJUSTMENT_WINDOW_SIZE: usize = 20; const DEFAULT_LATENCY_FILTER_SAMPLES_SIZE: usize = 8; +/// Error type for the [`Coordinate`]. #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum CoordinateError { + /// Returned when the dimensions of the coordinates are not compatible. #[error("dimensions aren't compatible")] DimensionalityMismatch, + /// Returned when the coordinate is invalid. #[error("invalid coordinate")] InvalidCoordinate, + /// Returned when the round trip time is not in a valid range. #[error("round trip time not in valid range, duration {0:?} is not a value less than 10s")] InvalidRTT(Duration), } @@ -48,47 +52,121 @@ pub enum CoordinateError { /// [3] Lee, Sanghwan, et al. "On suitability of Euclidean embedding for /// host-based network coordinate systems." Networking, IEEE/ACM Transactions /// on 18.1 (2010): 27-40. -#[viewit::viewit(getters(style = "ref"), setters(prefix = "with"))] +#[viewit::viewit( + vis_all = "pub(crate)", + getters(style = "ref", vis_all = "pub"), + setters(prefix = "with", vis_all = "pub") +)] #[derive(Debug, Clone, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct CoordinateOptions { /// The dimensionality of the coordinate system. As discussed in [2], more /// dimensions improves the accuracy of the estimates up to a point. Per [2] /// we chose 8 dimensions plus a non-Euclidean height. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns the dimensionality of the coordinate system.") + ), + setter(attrs(doc = "Sets the dimensionality of the coordinate system.")) + )] dimensionality: usize, /// The default error value when a node hasn't yet made /// any observations. It also serves as an upper limit on the error value in /// case observations cause the error value to increase without bound. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns the default error value when a node hasn't yet made any observations.") + ), + setter(attrs( + doc = "Sets the default error value when a node hasn't yet made any observations." + )) + )] vivaldi_error_max: f64, /// A tuning factor that controls the maximum impact an /// observation can have on a node's confidence. See [1] for more details. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns the maximum impact an observation can have on a node's confidence.") + ), + setter(attrs( + doc = "Sets the maximum impact an observation can have on a node's confidence." + )) + )] vivaldi_ce: f64, /// A tuning factor that controls the maximum impact an /// observation can have on a node's coordinate. See [1] for more details. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns the maximum impact an observation can have on a node's coordinate.") + ), + setter(attrs( + doc = "Sets the maximum impact an observation can have on a node's coordinate." + )) + )] vivaldi_cc: f64, /// A tuning factor that determines how many samples /// we retain to calculate the adjustment factor as discussed in [3]. Setting /// this to zero disables this feature. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns how many samples we retain to calculate the adjustment factor.") + ), + setter(attrs(doc = "Sets how many samples we retain to calculate the adjustment factor.")) + )] adjustment_window_size: usize, /// The minimum value of the height parameter. Since this /// always must be positive, it will introduce a small amount error, so /// the chosen value should be relatively small compared to "normal" /// coordinates. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns the minimum value of the height parameter.") + ), + setter(attrs(doc = "Sets the minimum value of the height parameter.")) + )] height_min: f64, /// The maximum number of samples that are retained /// per node, in order to compute a median. The intent is to ride out blips /// but still keep the delay low, since our time to probe any given node is /// pretty infrequent. See [2] for more details. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns the maximum number of samples that are retained per node.") + ), + setter(attrs(doc = "Sets the maximum number of samples that are retained per node.")) + )] latency_filter_size: usize, /// A tuning factor that sets how much gravity has an effect /// to try to re-center coordinates. See [2] for more details. + #[viewit( + getter( + const, + style = "move", + attrs(doc = "Returns how much gravity has an effect to try to re-center coordinates.") + ), + setter(attrs(doc = "Sets how much gravity has an effect to try to re-center coordinates.")) + )] gravity_rho: f64, #[cfg(feature = "metrics")] @@ -136,12 +214,24 @@ impl CoordinateOptions { } /// Used to record events that occur when updating coordinates. -#[viewit::viewit(getters(style = "ref"), setters(prefix = "with"))] +#[viewit::viewit(setters(prefix = "with"))] #[derive(Debug, Copy, Clone, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct CoordinateClientStats { /// Incremented any time we reset our local coordinate because /// our calculations have resulted in an invalid state. + #[viewit( + getter( + const, + style = "move", + attrs( + doc = "Returns the number of times we reset our local coordinate because our calculations have resulted in an invalid state." + ) + ), + setter(attrs( + doc = "Sets the number of times we reset our local coordinate because our calculations have resulted in an invalid state." + )) + )] resets: usize, } @@ -419,19 +509,41 @@ pub struct Coordinate { /// The Euclidean portion of the coordinate. This is used along /// with the other fields to provide an overall distance estimate. The /// units here are seconds. - #[viewit(getter(const, style = "ref"))] + #[viewit( + getter( + const, + style = "ref", + attrs(doc = "Returns the Euclidean portion of the coordinate.") + ), + setter(attrs(doc = "Sets the Euclidean portion of the coordinate.")) + )] portion: SmallVec<[f64; DEFAULT_DIMENSIONALITY]>, /// Reflects the confidence in the given coordinate and is updated /// dynamically by the Vivaldi Client. This is dimensionless. + #[viewit( + getter(const, attrs(doc = "Returns the confidence in the given coordinate.")), + setter(attrs(doc = "Sets the confidence in the given coordinate.")) + )] error: f64, /// A distance offset computed based on a calculation over /// observations from all other nodes over a fixed window and is updated /// dynamically by the Vivaldi Client. The units here are seconds. + #[viewit( + getter(const, attrs(doc = "Returns the distance offset.")), + setter(attrs(doc = "Sets the distance offset.")) + )] adjustment: f64, /// A distance offset that accounts for non-Euclidean effects /// which model the access links from nodes to the core Internet. The access /// links are usually set by bandwidth and congestion, and the core links /// usually follow distance based on geography. + #[viewit( + getter( + const, + attrs(doc = "Returns the distance offset that accounts for non-Euclidean effects.") + ), + setter(attrs(doc = "Sets the distance offset that accounts for non-Euclidean effects.")) + )] height: f64, } @@ -464,6 +576,7 @@ impl Coordinate { } } + /// Returns true if the coordinate is valid. #[inline] pub fn is_valid(&self) -> bool { self.portion.iter().all(|&f| f.is_finite()) @@ -472,6 +585,7 @@ impl Coordinate { && self.height.is_finite() } + /// Returns true if the dimensions of the coordinates are compatible. #[inline] pub fn is_compatible_with(&self, other: &Self) -> bool { self.portion.len() == other.portion.len() @@ -535,10 +649,13 @@ impl Coordinate { } } +/// The error when encoding or decoding a coordinate. #[derive(Debug, thiserror::Error)] pub enum CoordinateTransformError { + /// Returned when the buffer is too small to encode the coordinate. #[error("encode buffer too small")] BufferTooSmall, + /// Returned when there are not enough bytes to decode the coordinate. #[error("not enough bytes to decode")] NotEnoughBytes, } diff --git a/core/src/delegate.rs b/core/src/delegate.rs index 5a5b15d..175d928 100644 --- a/core/src/delegate.rs +++ b/core/src/delegate.rs @@ -12,11 +12,16 @@ pub use transform::*; mod composite; pub use composite::*; +/// [`Delegate`] is the trait that clients must implement if they want to hook +/// into the gossip layer of [`Serf`](crate::Serf). All the methods must be thread-safe, +/// as they can and generally will be called concurrently. pub trait Delegate: MergeDelegate::Id, Address = ::Address> + TransformDelegate::Id, Address = ::Address> + ReconnectDelegate::Id, Address = ::Address> { + /// The id type of the delegate type Id: Id; + /// The address type of the delegate type Address: CheapClone + Send + Sync + 'static; } diff --git a/core/src/delegate/composite.rs b/core/src/delegate/composite.rs index 7431edf..62014e1 100644 --- a/core/src/delegate/composite.rs +++ b/core/src/delegate/composite.rs @@ -15,6 +15,9 @@ use super::{ ReconnectDelegate, TransformDelegate, }; +/// `CompositeDelegate` is a helpful struct to split the [`Delegate`] into multiple small delegates, +/// so that users do not need to implement full [`Delegate`] when they only want to custom some methods +/// in the [`Delegate`]. pub struct CompositeDelegate< I, A, @@ -35,6 +38,7 @@ impl Default for CompositeDelegate { } impl CompositeDelegate { + /// Returns a new `CompositeDelegate`. pub fn new() -> Self { Self { merge: Default::default(), @@ -49,6 +53,7 @@ impl CompositeDelegate where M: MergeDelegate, { + /// Set the [`MergeDelegate`] for the `CompositeDelegate`. pub fn with_merge_delegate(self, merge: NM) -> CompositeDelegate { CompositeDelegate { merge, @@ -60,6 +65,7 @@ where } impl CompositeDelegate { + /// Set the [`ReconnectDelegate`] for the `CompositeDelegate`. pub fn with_reconnect_delegate(self, reconnect: NR) -> CompositeDelegate { CompositeDelegate { reconnect, @@ -71,6 +77,7 @@ impl CompositeDelegate { } impl CompositeDelegate { + /// Set the [`TransformDelegate`] for the `CompositeDelegate`. pub fn with_transform_delegate(self, transform: NT) -> CompositeDelegate { CompositeDelegate { transform, diff --git a/core/src/delegate/merge.rs b/core/src/delegate/merge.rs index 8fe251b..e00f1d0 100644 --- a/core/src/delegate/merge.rs +++ b/core/src/delegate/merge.rs @@ -3,17 +3,31 @@ use std::future::Future; use crate::types::Member; +/// Used to involve a client in +/// a potential cluster merge operation. Namely, when +/// a node does a promised push/pull (as part of a join), +/// the delegate is involved and allowed to cancel the join +/// based on custom logic. The merge delegate is NOT invoked +/// as part of the push-pull anti-entropy. +#[auto_impl::auto_impl(Box, Arc)] pub trait MergeDelegate: Send + Sync + 'static { + /// The error type of the delegate type Error: std::error::Error + Send + Sync + 'static; + /// The id type of the delegate type Id: Id; + /// The address type of the delegate type Address: CheapClone + Send + Sync + 'static; + /// Invoked when a merge could take place. + /// Provides a list of the nodes known by the peer. If + /// the return value is `Err`, the merge is canceled. fn notify_merge( &self, members: TinyVec>, ) -> impl Future> + Send; } +/// A default implementation of the `MergeDelegate` trait. #[derive(Debug, Clone, Copy)] pub struct DefaultMergeDelegate(std::marker::PhantomData<(I, A)>); diff --git a/core/src/delegate/reconnect.rs b/core/src/delegate/reconnect.rs index e5f7909..7dc6600 100644 --- a/core/src/delegate/reconnect.rs +++ b/core/src/delegate/reconnect.rs @@ -5,10 +5,14 @@ use memberlist_core::{transport::Id, CheapClone}; use crate::types::Member; /// Implemented to allow overriding the reconnect timeout for individual members. +#[auto_impl::auto_impl(Box, Arc)] pub trait ReconnectDelegate: Send + Sync + 'static { + /// The id type of the delegate type Id: Id; + /// The address type of the delegate type Address: CheapClone + Send + Sync + 'static; + /// Returns the reconnect timeout for the given member. fn reconnect_timeout( &self, member: &Member, diff --git a/core/src/delegate/transform.rs b/core/src/delegate/transform.rs index c92c107..66be43d 100644 --- a/core/src/delegate/transform.rs +++ b/core/src/delegate/transform.rs @@ -23,12 +23,16 @@ pub trait TransformDelegate: Send + Sync + 'static { /// The Address type. type Address: CheapClone + Send + Sync + 'static; + /// Encodes the filter into bytes. fn encode_filter(filter: &Filter) -> Result; + /// Decodes the filter from the given bytes, returning the number of bytes consumed and the filter. fn decode_filter(bytes: &[u8]) -> Result<(usize, Filter), Self::Error>; + /// Returns the encoded length of the node. fn node_encoded_len(node: &Node) -> usize; + /// Encodes the node into the given buffer, returning the number of bytes written. fn encode_node( node: &Node, dst: &mut [u8], @@ -39,30 +43,43 @@ pub trait TransformDelegate: Send + Sync + 'static { bytes: impl AsRef<[u8]>, ) -> Result<(usize, Node), Self::Error>; + /// Returns the encoded length of the id. fn id_encoded_len(id: &Self::Id) -> usize; + /// Encodes the id into the given buffer, returning the number of bytes written. fn encode_id(id: &Self::Id, dst: &mut [u8]) -> Result; + /// Decodes the id from the given bytes, returning the number of bytes consumed and the id. fn decode_id(bytes: &[u8]) -> Result<(usize, Self::Id), Self::Error>; + /// Returns the encoded length of the address. fn address_encoded_len(address: &Self::Address) -> usize; + /// Encodes the address into the given buffer, returning the number of bytes written. fn encode_address(address: &Self::Address, dst: &mut [u8]) -> Result; + /// Decodes the address from the given bytes, returning the number of bytes consumed and the address. fn decode_address(bytes: &[u8]) -> Result<(usize, Self::Address), Self::Error>; + /// Encoded length of the coordinate. fn coordinate_encoded_len(coordinate: &Coordinate) -> usize; + /// Encodes the coordinate into the given buffer, returning the number of bytes written. fn encode_coordinate(coordinate: &Coordinate, dst: &mut [u8]) -> Result; + /// Decodes the coordinate from the given bytes, returning the number of bytes consumed and the coordinate. fn decode_coordinate(bytes: &[u8]) -> Result<(usize, Coordinate), Self::Error>; + /// Encoded length of the tags. fn tags_encoded_len(tags: &Tags) -> usize; + /// Encodes the tags into the given buffer, returning the number of bytes written. fn encode_tags(tags: &Tags, dst: &mut [u8]) -> Result; + /// Decodes the tags from the given bytes, returning the number of bytes consumed and the tags. fn decode_tags(bytes: &[u8]) -> Result<(usize, Tags), Self::Error>; + /// Encoded length of the message. fn message_encoded_len(msg: impl AsMessageRef) -> usize; /// Encodes the message into the given buffer, returning the number of bytes written. @@ -78,6 +95,7 @@ pub trait TransformDelegate: Send + Sync + 'static { dst: impl AsMut<[u8]>, ) -> Result; + /// Decodes the message from the given bytes, returning the number of bytes consumed and the message. fn decode_message( ty: MessageType, bytes: impl AsRef<[u8]>, @@ -130,6 +148,7 @@ where } } +/// A length-prefixed encoding [`TransformDelegate`] implementation pub struct LpeTransfromDelegate(std::marker::PhantomData<(I, A)>); impl Default for LpeTransfromDelegate { diff --git a/core/src/error.rs b/core/src/error.rs index df83d51..69530e0 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -15,17 +15,6 @@ use crate::{ pub use crate::snapshot::SnapshotError; -#[derive(Debug)] -pub struct VoidError; - -impl std::fmt::Display for VoidError { - fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Ok(()) - } -} - -impl std::error::Error for VoidError {} - /// Error trait for [`Delegate`] #[derive(thiserror::Error)] pub enum SerfDelegateError { @@ -83,20 +72,26 @@ where } } +/// Error type for the ruserf crate. #[derive(thiserror::Error)] pub enum Error where D: Delegate::ResolvedAddress>, T: Transport, { + /// Returned when the underlyhing memberlist error #[error(transparent)] Memberlist(#[from] MemberlistError::ResolvedAddress>), + /// Returned when the serf error #[error(transparent)] Serf(#[from] SerfError), + /// Returned when the transport error #[error(transparent)] Transport(T::Error), + /// Returned when the delegate error #[error(transparent)] Delegate(#[from] SerfDelegateError), + /// Returned when the relay error #[error(transparent)] Relay(#[from] RelayError), } @@ -293,44 +288,68 @@ where } } +/// [`Serf`](crate::Serf) error. #[derive(Debug, thiserror::Error)] pub enum SerfError { + /// Returned when the user event exceeds the configured limit. #[error("ruserf: user event exceeds configured limit of {0} bytes before encoding")] UserEventLimitTooLarge(usize), + /// Returned when the user event exceeds the sane limit. #[error("ruserf: user event exceeds sane limit of {0} bytes before encoding")] UserEventTooLarge(usize), + /// Returned when the join status is bad. #[error("ruserf: join called on {0} statues")] BadJoinStatus(SerfState), + /// Returned when the leave status is bad. #[error("ruserf: leave called on {0} statues")] BadLeaveStatus(SerfState), + /// Returned when the encoded user event exceeds the sane limit after encoding. #[error("ruserf: user event exceeds sane limit of {0} bytes after encoding")] RawUserEventTooLarge(usize), + /// Returned when the query size exceeds the configured limit. #[error("ruserf: query exceeds limit of {0} bytes")] QueryTooLarge(usize), + /// Returned when the query is timeout. #[error("ruserf: query response is past the deadline")] QueryTimeout, + /// Returned when the query response is too large. #[error("ruserf: query response ({got} bytes) exceeds limit of {limit} bytes")] - QueryResponseTooLarge { limit: usize, got: usize }, + QueryResponseTooLarge { + /// The query response size limit. + limit: usize, + /// The query response size. + got: usize, + }, + /// Returned when the query has already been responded. #[error("ruserf: query response already sent")] QueryAlreadyResponsed, + /// Returned when failed to truncate response so that it fits into message. #[error("ruserf: failed to truncate response so that it fits into message")] FailTruncateResponse, + /// Returned when the tags too large. #[error("ruserf: encoded length of tags exceeds limit of {0} bytes")] TagsTooLarge(usize), + /// Returned when the relayed response is too large. #[error("ruserf: relayed response exceeds limit of {0} bytes")] RelayedResponseTooLarge(usize), + /// Returned when failed to deliver query response, dropping. #[error("ruserf: failed to deliver query response, dropping")] QueryResponseDeliveryFailed, + /// Returned when the coordinates are disabled. #[error("ruserf: coordinates are disabled")] CoordinatesDisabled, + /// Returned when snapshot error. #[error("ruserf: {0}")] Snapshot(#[from] SnapshotError), + /// Returned when timed out broadcasting node removal. #[error("ruserf: timed out broadcasting node removal")] RemovalBroadcastTimeout, + /// Returned when the timed out broadcasting channel closed. #[error("ruserf: timed out broadcasting channel closed")] BroadcastChannelClosed, } +/// Error type for [`Memberlist`](memberlist_core::Memberlist). #[derive(Debug, thiserror::Error)] pub enum MemberlistError { /// Returns when the node is not running. @@ -370,6 +389,7 @@ pub enum MemberlistError { Other(Cow<'static, str>), } +/// Relay error from remote nodes. pub struct RelayError( #[allow(clippy::type_complexity)] TinyVec<( @@ -455,20 +475,28 @@ where D: Delegate::ResolvedAddress>, T: Transport, { + /// Returns the broadcast error that occurred during the join. + #[inline] pub const fn broadcast_error(&self) -> Option<&Error> { self.broadcast_error.as_ref() } + /// Returns the errors that occurred during the join. + #[inline] pub const fn errors(&self) -> &HashMap>, Error> { &self.errors } + /// Returns the nodes have successfully joined. + #[inline] pub const fn joined( &self, ) -> &SmallVec::ResolvedAddress>> { &self.joined } + /// Returns how many nodes have successfully joined. + #[inline] pub fn num_joined(&self) -> usize { self.joined.len() } diff --git a/core/src/event.rs b/core/src/event.rs index 74781b7..0e53050 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -275,23 +275,31 @@ where } } +/// The event type for member event #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serde", serde(rename_all = "kebab-case", untagged))] pub enum MemberEventType { + /// Join event #[cfg_attr(feature = "serde", serde(rename = "member-join"))] Join, + /// Leave event #[cfg_attr(feature = "serde", serde(rename = "member-leave"))] Leave, + /// Failed event #[cfg_attr(feature = "serde", serde(rename = "member-failed"))] Failed, + /// Update event #[cfg_attr(feature = "serde", serde(rename = "member-update"))] Update, + /// Reap event #[cfg_attr(feature = "serde", serde(rename = "member-reap"))] Reap, } impl MemberEventType { + /// Returns the string representation of the event type. + #[inline] pub const fn as_str(&self) -> &'static str { match self { Self::Join => "member-join", @@ -356,17 +364,12 @@ impl core::fmt::Display for MemberEvent { } impl MemberEvent { - pub fn new(ty: MemberEventType, members: TinyVec>) -> Self { - Self { - ty, - members: Arc::new(members), - } - } - + /// Returns the event type of this member event pub fn ty(&self) -> MemberEventType { self.ty } + /// Returns the members of this event pub fn members(&self) -> &[Member] { &self.members } diff --git a/core/src/key_manager.rs b/core/src/key_manager.rs index c5982c8..0a66fa2 100644 --- a/core/src/key_manager.rs +++ b/core/src/key_manager.rs @@ -28,26 +28,48 @@ use super::{ /// KeyResponse is used to relay a query for a list of all keys in use. #[viewit::viewit( vis_all = "pub(crate)", - getters(style = "ref", vis_all = "pub"), + getters(style = "move", vis_all = "pub"), setters(skip) )] #[derive(Default, Debug)] pub struct KeyResponse { /// Map of node id to response message + #[viewit(getter( + const, + style = "ref", + attrs(doc = "Returns a map of node id to response message.") + ))] messages: HashMap, /// Total nodes memberlist knows of + #[viewit(getter(const, attrs(doc = "Returns the total nodes memberlist knows of.")))] num_nodes: usize, /// Total responses received + #[viewit(getter(const, attrs(doc = "Returns the total responses received.")))] num_resp: usize, /// Total errors from request + #[viewit(getter(const, attrs(doc = "Returns the total errors from request.")))] num_err: usize, /// A mapping of the value of the key bytes to the /// number of nodes that have the key installed. + #[viewit(getter( + const, + style = "ref", + attrs( + doc = "Returns a mapping of the value of the key bytes to the number of nodes that have the key installed.." + ) + ))] keys: HashMap, /// A mapping of the value of the primary /// key bytes to the number of nodes that have the key installed. + #[viewit(getter( + const, + style = "ref", + attrs( + doc = "Returns a mapping of the value of the primary key bytes to the number of nodes that have the key installed." + ) + ))] primary_keys: HashMap, } diff --git a/core/src/lib.rs b/core/src/lib.rs index 244a11e..f147b2a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,7 +1,7 @@ #![doc = include_str!("../../README.md")] #![doc(html_logo_url = "https://github.com/raw/al8n/ruserf/main/art/logo_72x72.png")] #![forbid(unsafe_code)] -#![deny(warnings)] +#![deny(warnings, missing_docs)] #![allow(clippy::type_complexity)] #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, allow(unused_attributes))] @@ -13,6 +13,7 @@ mod coalesce; /// Coordinate. pub mod coordinate; +/// Events for [`Serf`] pub mod event; /// Errors for `ruserf`. diff --git a/core/src/options.rs b/core/src/options.rs index 66d1d98..f85249d 100644 --- a/core/src/options.rs +++ b/core/src/options.rs @@ -53,6 +53,17 @@ pub struct Options { /// things like leave messages and force remove messages. If this is not /// set, a timeout of 5 seconds will be set. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs( + doc = "Returns the amount of time to wait for a broadcast message to be sent to the cluster." + ) + ), + setter(attrs( + doc = "Sets the amount of time to wait for a broadcast message to be sent to the cluster." + )) + )] broadcast_timeout: Duration, /// For our leave (node dead) message to propagate @@ -61,6 +72,10 @@ pub struct Options { /// leaving and stop probing. Otherwise, we risk getting node failures as /// we leave. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter(const, attrs(doc = "Returns the leave propagate delay.")), + setter(attrs(doc = "Sets the leave propagate delay.")) + )] leave_propagate_delay: Duration, /// The settings below relate to Serf's event coalescence feature. Serf @@ -78,6 +93,10 @@ pub struct Options { /// within 5 seconds that can be coalesced will be. /// #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter(const, attrs(doc = "Returns the coalesce period.")), + setter(attrs(doc = "Sets the coalesce period.")) + )] coalesce_period: Duration, /// specifies the duration of time where if no events @@ -87,39 +106,92 @@ pub struct Options { /// new events are received within 2 seconds of the last event. Otherwise, /// every event will always be delayed by at least 10 seconds. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs( + doc = "Returns the specifies the duration of time where if no events are received, coalescence immediately happens." + ) + ), + setter(attrs( + doc = "Sets specifies the duration of time where if no events are received, coalescence immediately happens." + )) + )] quiescent_period: Duration, /// The settings below relate to Serf's user event coalescing feature. /// The settings operate like above but only affect user messages and /// not the Member* messages that Serf generates. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter(const, attrs(doc = "Returns the user event coalesce period.")), + setter(attrs(doc = "Sets the user event coalesce period.")) + )] user_coalesce_period: Duration, + /// The settings below relate to Serf's user event coalescing feature. /// The settings operate like above but only affect user messages and /// not the Member* messages that Serf generates. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter(const, attrs(doc = "Returns the user quiescent period.")), + setter(attrs(doc = "Sets the user quiescent period.")) + )] user_quiescent_period: Duration, /// The interval when the reaper runs. If this is not /// set (it is zero), it will be set to a reasonable default. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter(const, attrs(doc = "Returns the interval when the reaper runs.")), + setter(attrs(doc = "Sets the interval when the reaper runs.")) + )] reap_interval: Duration, /// The interval when we attempt to reconnect /// to failed nodes. If this is not set (it is zero), it will be set /// to a reasonable default. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs(doc = "Returns the interval when we attempt to reconnect to failed nodes.") + ), + setter(attrs(doc = "Sets the interval when we attempt to reconnect to failed nodes.")) + )] reconnect_interval: Duration, /// The amount of time to attempt to reconnect to /// a failed node before giving up and considering it completely gone. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs( + doc = "Returns the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone." + ) + ), + setter(attrs( + doc = "Sets the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone." + )) + )] reconnect_timeout: Duration, /// The amount of time to keep around nodes /// that gracefully left as tombstones for syncing state with other /// Serf nodes. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs( + doc = "Returns the amount of time to keep around nodes that gracefully left as tombstones for syncing state with other Serf nodes." + ) + ), + setter(attrs( + doc = "Sets the amount of time to keep around nodes that gracefully left as tombstones for syncing state with other Serf nodes." + )) + )] tombstone_timeout: Duration, /// The amount of time less than which we consider a node @@ -128,28 +200,69 @@ pub struct Options { /// to see actual events, given our expected detection times for a failed /// node. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs( + doc = "Returns the amount of time less than which we consider a node being failed and rejoining looks like a flap for telemetry purposes." + ) + ), + setter(attrs( + doc = "Sets the amount of time less than which we consider a node being failed and rejoining looks like a flap for telemetry purposes." + )) + )] flap_timeout: Duration, /// The interval at which we check the message /// queue to apply the warning and max depth. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs( + doc = "Returns the interval at which we check the message queue to apply the warning and max depth." + ) + ), + setter(attrs( + doc = "Sets the interval at which we check the message queue to apply the warning and max depth." + )) + )] queue_check_interval: Duration, /// Used to generate warning message if the /// number of queued messages to broadcast exceeds this number. This /// is to provide the user feedback if events are being triggered /// faster than they can be disseminated + #[viewit( + getter(const, attrs(doc = "Returns the queue depth warning.")), + setter(attrs(doc = "Sets the queue depth warning.")) + )] queue_depth_warning: usize, /// Used to start dropping messages if the number /// of queued messages to broadcast exceeds this number. This is to /// prevent an unbounded growth of memory utilization + #[viewit( + getter(const, attrs(doc = "Returns the max queue depth.")), + setter(attrs(doc = "Sets the max queue depth.")) + )] max_queue_depth: usize, /// if >0 will enforce a lower limit for dropping messages /// and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This /// defaults to 0 which disables this dynamic sizing feature. If this is /// >0 then `max_queue_depth` will be ignored. + #[viewit( + getter( + const, + attrs( + doc = "Returns if `>0` will enforce a lower limit for dropping messages and then the max will be `max(min_queue_depth, 2 * size_of_cluster)`. This defaults to 0 which disables this dynamic sizing feature. If this is `>0` then `max_queue_depth` will be ignored." + ) + ), + setter(attrs( + doc = "Sets if `>0` will enforce a lower limit for dropping messages and then the max will be `max(min_queue_depth, 2 * size_of_cluster)`. This defaults to 0 which disables this dynamic sizing feature. If this is `>0` then `max_queue_depth` will be ignored." + )) + )] min_queue_depth: usize, /// Used to determine how long we store recent @@ -158,6 +271,13 @@ pub struct Options { /// It is important that this not be too short to avoid continuous /// rebroadcasting of dead events. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + #[viewit( + getter( + const, + attrs(doc = "Returns how long we store recent join and leave intents.") + ), + setter(attrs(doc = "Sets how long we store recent join and leave intents.")) + )] recent_intent_timeout: Duration, /// Used to control how many events are buffered. @@ -166,6 +286,10 @@ pub struct Options { /// not deliver messages that are older than the oldest entry in the buffer. /// Thus if a client is generating too many events, it's possible that the /// buffer gets overrun and messages are not delivered. + #[viewit( + getter(const, attrs(doc = "Returns how many events are buffered.")), + setter(attrs(doc = "Sets how many events are buffered.")) + )] event_buffer_size: usize, /// used to control how many queries are buffered. @@ -174,6 +298,10 @@ pub struct Options { /// deliver queries older than the oldest entry in the buffer. /// Thus if a client is generating too many queries, it's possible that the /// buffer gets overrun and messages are not delivered. + #[viewit( + getter(const, attrs(doc = "Returns how many queries are buffered.")), + setter(attrs(doc = "Sets how many queries are buffered.")) + )] query_buffer_size: usize, /// Configures the default timeout multipler for a query to run if no @@ -187,40 +315,87 @@ pub struct Options { /// ```text /// timeout = gossip_interval * query_timeout_mult * log(N+1) /// ``` + #[viewit( + getter( + const, + attrs( + doc = "Returns the default timeout multipler for a query to run if no specific value is provided." + ) + ), + setter(attrs( + doc = "Sets the default timeout multipler for a query to run if no specific value is provided." + )) + )] query_timeout_mult: usize, /// Limit the outbound payload sizes for queries, respectively. These must fit /// in a UDP packet with some additional overhead, so tuning these /// past the default values of 1024 will depend on your network /// configuration. + #[viewit( + getter( + const, + attrs(doc = "Returns the limit of the outbound payload sizes for queries.") + ), + setter(attrs(doc = "Sets the limit of the outbound payload sizes for queries.")) + )] query_response_size_limit: usize, /// Limit the inbound payload sizes for queries, respectively. These must fit /// in a UDP packet with some additional overhead, so tuning these /// past the default values of 1024 will depend on your network /// configuration. + #[viewit( + getter( + const, + attrs(doc = "Returns the limit of the inbound payload sizes for queries.") + ), + setter(attrs(doc = "Sets the limit of the inbound payload sizes for queries.")) + )] query_size_limit: usize, /// The memberlist configuration that Serf will /// use to do the underlying membership management and gossip. - #[viewit(getter(const, style = "ref"))] + #[viewit( + getter( + const, + style = "ref", + attrs( + doc = "Returns the memberlist configuration that Serf will use to do the underlying membership management and gossip." + ) + ), + setter(attrs( + doc = "Sets the memberlist configuration that Serf will use to do the underlying membership management and gossip." + )) + )] memberlist_options: MemberlistOptions, /// If provided is used to snapshot live nodes as well /// as lamport clock values. When Serf is started with a snapshot, /// it will attempt to join all the previously known nodes until one /// succeeds and will also avoid replaying old user events. - #[viewit(getter( - const, - style = "ref", - result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>") - ))] + #[viewit( + getter( + const, + style = "ref", + result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>"), + attrs(doc = "Returns the path to the snapshot file.") + ), + setter(attrs(doc = "Sets the path to the snapshot file.")) + )] snapshot_path: Option, /// Controls our interaction with the snapshot file. /// When set to false (default), a leave causes a Serf to not rejoin /// the cluster until an explicit join is received. If this is set to /// true, we ignore the leave, and rejoin the cluster on start. + #[viewit( + getter( + const, + attrs(doc = "Returns if Serf will rejoin the cluster after a leave.") + ), + setter(attrs(doc = "Sets if Serf will rejoin the cluster after a leave.")) + )] rejoin_after_leave: bool, /// Controls if Serf will actively attempt @@ -231,25 +406,65 @@ pub struct Options { /// conflict and issues a special query which asks the cluster for the /// Name -> IP:Port mapping. If there is a simple majority of votes, that /// node stays while the other node will leave the cluster and exit. + #[viewit( + getter( + const, + attrs(doc = "Returns if Serf will attempt to resolve a name conflict.") + ), + setter(attrs(doc = "Sets if Serf will attempt to resolve a name conflict.")) + )] enable_id_conflict_resolution: bool, /// Controls if Serf will maintain an estimate of this /// node's network coordinate internally. A network coordinate is useful /// for estimating the network distance (i.e. round trip time) between /// two nodes. Enabling this option adds some overhead to ping messages. + #[viewit( + getter( + const, + attrs( + doc = "Returns if Serf will maintain an estimate of this node's network coordinate internally." + ) + ), + setter(attrs( + doc = "Sets if Serf will maintain an estimate of this node's network coordinate internally." + )) + )] disable_coordinates: bool, /// Provides the location of a writable file where Serf can /// persist changes to the encryption keyring. - #[viewit(getter( - const, - style = "ref", - result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>") - ))] + #[cfg(feature = "encryption")] + #[viewit( + getter( + const, + style = "ref", + result(converter(fn = "Option::as_ref"), type = "Option<&PathBuf>"), + attrs( + doc = "Returns the location of a writable file where Serf can persist changes to the encryption keyring.", + cfg(feature = "encryption") + ) + ), + setter(attrs( + doc = "Sets the location of a writable file where Serf can persist changes to the encryption keyring.", + cfg(feature = "encryption") + )) + )] keyring_file: Option, /// Maximum byte size limit of user event `name` + `payload` in bytes. /// It's optimal to be relatively small, since it's going to be gossiped through the cluster. + #[viewit( + getter( + const, + attrs( + doc = "Returns the maximum byte size limit of user event `name` + `payload` in bytes." + ) + ), + setter(attrs( + doc = "Sets the maximum byte size limit of user event `name` + `payload` in bytes." + )) + )] max_user_event_size: usize, } @@ -312,6 +527,7 @@ impl Options { } } + /// Sets the tags for this node. #[inline] pub fn with_tags, V: Into>( self, diff --git a/core/src/serf.rs b/core/src/serf.rs index 543dc7f..1265114 100644 --- a/core/src/serf.rs +++ b/core/src/serf.rs @@ -76,13 +76,18 @@ pub(crate) struct EventCore { /// The state of the Serf instance. #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum SerfState { + /// Alive state Alive, + /// Leaving state Leaving, + /// Left state Left, + /// Shutdown state Shutdown, } impl SerfState { + /// Returns the string representation of the state. pub const fn as_str(&self) -> &'static str { match self { Self::Alive => "alive", diff --git a/core/src/serf/query.rs b/core/src/serf/query.rs index b6ca12c..60726c2 100644 --- a/core/src/serf/query.rs +++ b/core/src/serf/query.rs @@ -27,25 +27,67 @@ use super::Serf; /// Provided to [`Serf::query`] to configure the parameters of the /// query. If not provided, sane defaults will be used. -#[viewit::viewit] +#[viewit::viewit( + vis_all = "pub(crate)", + getters(vis_all = "pub", style = "ref"), + setters(vis_all = "pub", prefix = "with") +)] #[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct QueryParam { /// The filters to apply to the query. + #[viewit( + getter(const, attrs(doc = "Returns the filters of the query")), + setter(attrs(doc = "Sets the filters of the query")) + )] filters: OneOrMore>, /// If true, we are requesting an delivery acknowledgement from /// every node that meets the filter requirement. This means nodes /// the receive the message but do not pass the filters, will not /// send an ack. + #[viewit( + getter( + const, + style = "move", + attrs( + doc = "Returns if we are requesting an delivery acknowledgement from every node that meets the filter requirement. This means nodes the receive the message but do not pass the filters, will not send an ack." + ) + ), + setter(attrs( + doc = "Sets if we are requesting an delivery acknowledgement from every node that meets the filter requirement. This means nodes the receive the message but do not pass the filters, will not send an ack." + )) + )] request_ack: bool, - /// RelayFactor controls the number of duplicate responses to relay + /// Controls the number of duplicate responses to relay /// back to the sender through other nodes for redundancy. + #[viewit( + getter( + const, + style = "move", + attrs( + doc = "Returns the number of duplicate responses to relay back to the sender through other nodes for redundancy." + ) + ), + setter(attrs( + doc = "Sets the number of duplicate responses to relay back to the sender through other nodes for redundancy." + )) + )] relay_factor: u8, /// The timeout limits how long the query is left open. If not provided, /// then a default timeout is used based on the configuration of Serf + #[viewit( + getter( + const, + style = "move", + attrs( + doc = "Returns timeout limits how long the query is left open. If not provided, then a default timeout is used based on the configuration of [`Serf`]" + ) + ), + setter(attrs(doc = "Sets timeout limits how long the query is left open.")) + )] #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] timeout: Duration, } @@ -345,11 +387,17 @@ impl QueryResponse { } /// Used to represent a single response from a node -#[viewit::viewit] +#[viewit::viewit( + vis_all = "pub(crate)", + setters(skip), + getters(vis_all = "pub", style = "ref") +)] #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct NodeResponse { + #[viewit(getter(attrs(doc = "Returns the node that sent the response")))] from: Node, + #[viewit(getter(attrs(doc = "Returns the payload of the response")))] payload: Bytes, } diff --git a/core/src/snapshot.rs b/core/src/snapshot.rs index d46ba7b..9ccac98 100644 --- a/core/src/snapshot.rs +++ b/core/src/snapshot.rs @@ -59,34 +59,49 @@ const SNAPSHOT_BYTES_PER_NODE: usize = 128; /// the snapshot size estimate (nodes * bytes per node) before compacting. const SNAPSHOT_COMPACTION_THRESHOLD: usize = 2; +/// Errors that can occur while interacting with snapshots #[derive(Debug, thiserror::Error)] pub enum SnapshotError { + /// Returned when opening a snapshot fails #[error("failed to open snapshot: {0}")] Open(std::io::Error), + /// Returned when opening a new snapshot fails #[error("failed to open new snapshot: {0}")] OpenNew(std::io::Error), + /// Returned when flush new snapshot fails #[error("failed to flush new snapshot: {0}")] FlushNew(std::io::Error), + /// Returned when flush snapshot fails #[error("failed to flush snapshot: {0}")] Flush(std::io::Error), + /// Returned when fsync snapshot fails #[error("failed to fsync snapshot: {0}")] Sync(std::io::Error), + /// Returned when stat snapshot fails #[error("failed to stat snapshot: {0}")] Stat(std::io::Error), + /// Returned when remove old snapshot fails #[error("failed to remove old snapshot: {0}")] Remove(std::io::Error), + /// Returned when installing a new snapshot fails #[error("failed to install new snapshot: {0}")] Install(std::io::Error), + /// Returned when writing to a new snapshot fails #[error("failed to write to new snapshot: {0}")] WriteNew(std::io::Error), + /// Returned when writing to a snapshot fails #[error("failed to write to snapshot: {0}")] Write(std::io::Error), + /// Returned when seek to start of a snapshot fails #[error("failed to seek to beginning of snapshot: {0}")] SeekStart(std::io::Error), + /// Returned when seek to end of a snapshot fails #[error("failed to seek to end of snapshot: {0}")] SeekEnd(std::io::Error), + /// Returned when replaying a snapshot fails #[error("failed to replay snapshot: {0}")] Replay(std::io::Error), + /// Returned when fail to decode snapshot record type. #[error(transparent)] UnknownRecordType(#[from] UnknownRecordType), } diff --git a/ruserf/src/lib.rs b/ruserf/src/lib.rs index 78133d8..88372b1 100644 --- a/ruserf/src/lib.rs +++ b/ruserf/src/lib.rs @@ -1,3 +1,11 @@ +#![doc = include_str!("../../README.md")] +#![doc(html_logo_url = "https://github.com/raw/al8n/ruserf/main/art/logo_72x72.png")] +#![forbid(unsafe_code)] +#![deny(warnings, missing_docs)] +#![allow(clippy::type_complexity)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(docsrs, allow(unused_attributes))] + pub use ruserf_core::*; pub use memberlist::{agnostic, transport};