Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Apr 15, 2024
1 parent 7feaf2e commit ba42afd
Show file tree
Hide file tree
Showing 16 changed files with 551 additions and 38 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ serde = [
test = ["memberlist-core/test", "paste", "tracing-subscriber", "tempfile"]

[dependencies]
auto_impl = "1"
atomic_refcell = "0.1"
arc-swap = "1"
async-lock = "3"
Expand Down
123 changes: 120 additions & 3 deletions core/src/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ const DEFAULT_ADJUSTMENT_WINDOW_SIZE: usize = 20;

const DEFAULT_LATENCY_FILTER_SAMPLES_SIZE: usize = 8;

/// Error type for the [`Coordinate`].
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum CoordinateError {
/// Returned when the dimensions of the coordinates are not compatible.
#[error("dimensions aren't compatible")]
DimensionalityMismatch,
/// Returned when the coordinate is invalid.
#[error("invalid coordinate")]
InvalidCoordinate,
/// Returned when the round trip time is not in a valid range.
#[error("round trip time not in valid range, duration {0:?} is not a value less than 10s")]
InvalidRTT(Duration),
}
Expand All @@ -48,47 +52,121 @@ pub enum CoordinateError {
/// [3] Lee, Sanghwan, et al. "On suitability of Euclidean embedding for
/// host-based network coordinate systems." Networking, IEEE/ACM Transactions
/// on 18.1 (2010): 27-40.
#[viewit::viewit(getters(style = "ref"), setters(prefix = "with"))]
#[viewit::viewit(
vis_all = "pub(crate)",
getters(style = "ref", vis_all = "pub"),
setters(prefix = "with", vis_all = "pub")
)]
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct CoordinateOptions {
/// The dimensionality of the coordinate system. As discussed in [2], more
/// dimensions improves the accuracy of the estimates up to a point. Per [2]
/// we chose 8 dimensions plus a non-Euclidean height.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns the dimensionality of the coordinate system.")
),
setter(attrs(doc = "Sets the dimensionality of the coordinate system."))
)]
dimensionality: usize,

/// The default error value when a node hasn't yet made
/// any observations. It also serves as an upper limit on the error value in
/// case observations cause the error value to increase without bound.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns the default error value when a node hasn't yet made any observations.")
),
setter(attrs(
doc = "Sets the default error value when a node hasn't yet made any observations."
))
)]
vivaldi_error_max: f64,

/// A tuning factor that controls the maximum impact an
/// observation can have on a node's confidence. See [1] for more details.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns the maximum impact an observation can have on a node's confidence.")
),
setter(attrs(
doc = "Sets the maximum impact an observation can have on a node's confidence."
))
)]
vivaldi_ce: f64,

/// A tuning factor that controls the maximum impact an
/// observation can have on a node's coordinate. See [1] for more details.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns the maximum impact an observation can have on a node's coordinate.")
),
setter(attrs(
doc = "Sets the maximum impact an observation can have on a node's coordinate."
))
)]
vivaldi_cc: f64,

/// A tuning factor that determines how many samples
/// we retain to calculate the adjustment factor as discussed in [3]. Setting
/// this to zero disables this feature.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns how many samples we retain to calculate the adjustment factor.")
),
setter(attrs(doc = "Sets how many samples we retain to calculate the adjustment factor."))
)]
adjustment_window_size: usize,

/// The minimum value of the height parameter. Since this
/// always must be positive, it will introduce a small amount error, so
/// the chosen value should be relatively small compared to "normal"
/// coordinates.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns the minimum value of the height parameter.")
),
setter(attrs(doc = "Sets the minimum value of the height parameter."))
)]
height_min: f64,

/// The maximum number of samples that are retained
/// per node, in order to compute a median. The intent is to ride out blips
/// but still keep the delay low, since our time to probe any given node is
/// pretty infrequent. See [2] for more details.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns the maximum number of samples that are retained per node.")
),
setter(attrs(doc = "Sets the maximum number of samples that are retained per node."))
)]
latency_filter_size: usize,

/// A tuning factor that sets how much gravity has an effect
/// to try to re-center coordinates. See [2] for more details.
#[viewit(
getter(
const,
style = "move",
attrs(doc = "Returns how much gravity has an effect to try to re-center coordinates.")
),
setter(attrs(doc = "Sets how much gravity has an effect to try to re-center coordinates."))
)]
gravity_rho: f64,

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -136,12 +214,24 @@ impl CoordinateOptions {
}

/// Used to record events that occur when updating coordinates.
#[viewit::viewit(getters(style = "ref"), setters(prefix = "with"))]
#[viewit::viewit(setters(prefix = "with"))]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct CoordinateClientStats {
/// Incremented any time we reset our local coordinate because
/// our calculations have resulted in an invalid state.
#[viewit(
getter(
const,
style = "move",
attrs(
doc = "Returns the number of times we reset our local coordinate because our calculations have resulted in an invalid state."
)
),
setter(attrs(
doc = "Sets the number of times we reset our local coordinate because our calculations have resulted in an invalid state."
))
)]
resets: usize,
}

Expand Down Expand Up @@ -419,19 +509,41 @@ pub struct Coordinate {
/// The Euclidean portion of the coordinate. This is used along
/// with the other fields to provide an overall distance estimate. The
/// units here are seconds.
#[viewit(getter(const, style = "ref"))]
#[viewit(
getter(
const,
style = "ref",
attrs(doc = "Returns the Euclidean portion of the coordinate.")
),
setter(attrs(doc = "Sets the Euclidean portion of the coordinate."))
)]
portion: SmallVec<[f64; DEFAULT_DIMENSIONALITY]>,
/// Reflects the confidence in the given coordinate and is updated
/// dynamically by the Vivaldi Client. This is dimensionless.
#[viewit(
getter(const, attrs(doc = "Returns the confidence in the given coordinate.")),
setter(attrs(doc = "Sets the confidence in the given coordinate."))
)]
error: f64,
/// A distance offset computed based on a calculation over
/// observations from all other nodes over a fixed window and is updated
/// dynamically by the Vivaldi Client. The units here are seconds.
#[viewit(
getter(const, attrs(doc = "Returns the distance offset.")),
setter(attrs(doc = "Sets the distance offset."))
)]
adjustment: f64,
/// A distance offset that accounts for non-Euclidean effects
/// which model the access links from nodes to the core Internet. The access
/// links are usually set by bandwidth and congestion, and the core links
/// usually follow distance based on geography.
#[viewit(
getter(
const,
attrs(doc = "Returns the distance offset that accounts for non-Euclidean effects.")
),
setter(attrs(doc = "Sets the distance offset that accounts for non-Euclidean effects."))
)]
height: f64,
}

Expand Down Expand Up @@ -464,6 +576,7 @@ impl Coordinate {
}
}

/// Returns true if the coordinate is valid.
#[inline]
pub fn is_valid(&self) -> bool {
self.portion.iter().all(|&f| f.is_finite())
Expand All @@ -472,6 +585,7 @@ impl Coordinate {
&& self.height.is_finite()
}

/// Returns true if the dimensions of the coordinates are compatible.
#[inline]
pub fn is_compatible_with(&self, other: &Self) -> bool {
self.portion.len() == other.portion.len()
Expand Down Expand Up @@ -535,10 +649,13 @@ impl Coordinate {
}
}

/// The error when encoding or decoding a coordinate.
#[derive(Debug, thiserror::Error)]
pub enum CoordinateTransformError {
/// Returned when the buffer is too small to encode the coordinate.
#[error("encode buffer too small")]
BufferTooSmall,
/// Returned when there are not enough bytes to decode the coordinate.
#[error("not enough bytes to decode")]
NotEnoughBytes,
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ pub use transform::*;
mod composite;
pub use composite::*;

/// [`Delegate`] is the trait that clients must implement if they want to hook
/// into the gossip layer of [`Serf`](crate::Serf). All the methods must be thread-safe,
/// as they can and generally will be called concurrently.
pub trait Delegate:
MergeDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
+ TransformDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
+ ReconnectDelegate<Id = <Self as Delegate>::Id, Address = <Self as Delegate>::Address>
{
/// The id type of the delegate
type Id: Id;
/// The address type of the delegate
type Address: CheapClone + Send + Sync + 'static;
}
7 changes: 7 additions & 0 deletions core/src/delegate/composite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use super::{
ReconnectDelegate, TransformDelegate,
};

/// `CompositeDelegate` is a helpful struct to split the [`Delegate`] into multiple small delegates,
/// so that users do not need to implement full [`Delegate`] when they only want to custom some methods
/// in the [`Delegate`].
pub struct CompositeDelegate<
I,
A,
Expand All @@ -35,6 +38,7 @@ impl<I, A> Default for CompositeDelegate<I, A> {
}

impl<I, A> CompositeDelegate<I, A> {
/// Returns a new `CompositeDelegate`.
pub fn new() -> Self {
Self {
merge: Default::default(),
Expand All @@ -49,6 +53,7 @@ impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T>
where
M: MergeDelegate<Id = I, Address = A>,
{
/// Set the [`MergeDelegate`] for the `CompositeDelegate`.
pub fn with_merge_delegate<NM>(self, merge: NM) -> CompositeDelegate<I, A, NM, R, T> {
CompositeDelegate {
merge,
Expand All @@ -60,6 +65,7 @@ where
}

impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T> {
/// Set the [`ReconnectDelegate`] for the `CompositeDelegate`.
pub fn with_reconnect_delegate<NR>(self, reconnect: NR) -> CompositeDelegate<I, A, M, NR, T> {
CompositeDelegate {
reconnect,
Expand All @@ -71,6 +77,7 @@ impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T> {
}

impl<I, A, M, R, T> CompositeDelegate<I, A, M, R, T> {
/// Set the [`TransformDelegate`] for the `CompositeDelegate`.
pub fn with_transform_delegate<NT>(self, transform: NT) -> CompositeDelegate<I, A, M, R, NT> {
CompositeDelegate {
transform,
Expand Down
14 changes: 14 additions & 0 deletions core/src/delegate/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,31 @@ use std::future::Future;

use crate::types::Member;

/// Used to involve a client in
/// a potential cluster merge operation. Namely, when
/// a node does a promised push/pull (as part of a join),
/// the delegate is involved and allowed to cancel the join
/// based on custom logic. The merge delegate is NOT invoked
/// as part of the push-pull anti-entropy.
#[auto_impl::auto_impl(Box, Arc)]
pub trait MergeDelegate: Send + Sync + 'static {
/// The error type of the delegate
type Error: std::error::Error + Send + Sync + 'static;
/// The id type of the delegate
type Id: Id;
/// The address type of the delegate
type Address: CheapClone + Send + Sync + 'static;

/// Invoked when a merge could take place.
/// Provides a list of the nodes known by the peer. If
/// the return value is `Err`, the merge is canceled.
fn notify_merge(
&self,
members: TinyVec<Member<Self::Id, Self::Address>>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

/// A default implementation of the `MergeDelegate` trait.
#[derive(Debug, Clone, Copy)]
pub struct DefaultMergeDelegate<I, A>(std::marker::PhantomData<(I, A)>);

Expand Down
4 changes: 4 additions & 0 deletions core/src/delegate/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ use memberlist_core::{transport::Id, CheapClone};
use crate::types::Member;

/// Implemented to allow overriding the reconnect timeout for individual members.
#[auto_impl::auto_impl(Box, Arc)]
pub trait ReconnectDelegate: Send + Sync + 'static {
/// The id type of the delegate
type Id: Id;
/// The address type of the delegate
type Address: CheapClone + Send + Sync + 'static;

/// Returns the reconnect timeout for the given member.
fn reconnect_timeout(
&self,
member: &Member<Self::Id, Self::Address>,
Expand Down
Loading

0 comments on commit ba42afd

Please sign in to comment.