Skip to content

Commit

Permalink
Separate versions from mere header lines.
Browse files Browse the repository at this point in the history
Every multistream-select version maps to a specific header line,
but there may be different variants of the same multistream-select
version using the same header line, i.e. the same wire protocol.
  • Loading branch information
Roman S. Borschel committed Nov 24, 2020
1 parent 3fabe47 commit 70f041b
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 80 deletions.
19 changes: 11 additions & 8 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

//! Protocol negotiation strategies for the peer acting as the dialer.

use crate::{Negotiated, NegotiationError};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
use crate::{Negotiated, NegotiationError, Version};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine};

use futures::{future::Either, prelude::*};
use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}};
Expand All @@ -41,7 +41,7 @@ use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}};
/// thus an inaccurate size estimate may result in a suboptimal choice.
///
/// Within the scope of this library, a dialer always commits to a specific
/// multistream-select protocol [`Version`], whereas a listener always supports
/// multistream-select [`Version`], whereas a listener always supports
/// all versions supported by this library. Frictionless multistream-select
/// protocol upgrades may thus proceed by deployments with updated listeners,
/// eventually followed by deployments of dialers choosing the newer protocol.
Expand Down Expand Up @@ -181,7 +181,8 @@ where
},
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
let h = HeaderLine::from(*this.version);
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(h)) {
return Poll::Ready(Err(From::from(err)));
}

Expand Down Expand Up @@ -218,7 +219,8 @@ where
// the dialer expects a regular `V1` response.
Version::V1Lazy => {
log::debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p, Some(Version::V1));
let hl = HeaderLine::V1;
let io = Negotiated::expecting(io.into_reader(), p, Some(hl));
return Poll::Ready(Ok((protocol, io)))
}
}
Expand Down Expand Up @@ -249,7 +251,7 @@ where
};

match msg {
Message::Header(Version::V1) => {
Message::Header(v) if v == HeaderLine::from(*this.version) => {
*this.state = SeqState::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
Expand Down Expand Up @@ -325,7 +327,8 @@ where
},
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
let msg = Message::Header(HeaderLine::from(*this.version));
if let Err(err) = Pin::new(&mut io).start_send(msg) {
return Poll::Ready(Err(From::from(err)));
}

Expand Down Expand Up @@ -373,7 +376,7 @@ where
};

match &msg {
Message::Header(Version::V1) => {
Message::Header(h) if h == &HeaderLine::from(*this.version) => {
*this.state = ParState::RecvProtocols { io }
}
Message::Protocols(supported) => {
Expand Down
49 changes: 48 additions & 1 deletion misc/multistream-select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,54 @@ mod protocol;
mod tests;

pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError};
pub use self::protocol::{ProtocolError, Version};
pub use self::protocol::ProtocolError;
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};

/// Supported multistream-select versions.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Version {
/// Version 1 of the multistream-select protocol. See [1] and [2].
///
/// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
/// [2]: https://github.com/multiformats/multistream-select
V1,
/// A "lazy" variant of version 1 that is identical on the wire but whereby
/// the dialer delays flushing protocol negotiation data in order to combine
/// it with initial application data, thus performing 0-RTT negotiation.
///
/// This strategy is only applicable for the node with the role of "dialer"
/// in the negotiation and only if the dialer supports just a single
/// application protocol. In that case the dialer immedidately "settles"
/// on that protocol, buffering the negotiation messages to be sent
/// with the first round of application protocol data (or an attempt
/// is made to read from the `Negotiated` I/O stream).
///
/// A listener will behave identically to `V1`. This ensures interoperability with `V1`.
/// Notably, it will immediately send the multistream header as well as the protocol
/// confirmation, resulting in multiple frames being sent on the underlying transport.
/// Nevertheless, if the listener supports the protocol that the dialer optimistically
/// settled on, it can be a 0-RTT negotiation.
///
/// > **Note**: `V1Lazy` is specific to `rust-libp2p`. The wire protocol is identical to `V1`
/// > and generally interoperable with peers only supporting `V1`. Nevertheless, there is a
/// > pitfall that is rarely encountered: When nesting multiple protocol negotiations, the
/// > listener should either be known to support all of the dialer's optimistically chosen
/// > protocols or there is must be no intermediate protocol without a payload and none of
/// > the protocol payloads must have the potential for being mistaken for a multistream-select
/// > protocol message. This avoids rare edge-cases whereby the listener may not recognize
/// > upgrade boundaries and erroneously process a request despite not supporting one of
/// > the intermediate protocols that the dialer committed to. See [1] and [2].
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
V1Lazy,
// Draft: https://github.com/libp2p/specs/pull/95
// V2,
}

impl Default for Version {
fn default() -> Self {
Version::V1
}
}
11 changes: 7 additions & 4 deletions misc/multistream-select/src/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! in a multistream-select protocol negotiation.

use crate::{Negotiated, NegotiationError};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine};

use futures::prelude::*;
use smallvec::SmallVec;
Expand Down Expand Up @@ -111,8 +111,10 @@ where
match mem::replace(this.state, State::Done) {
State::RecvHeader { mut io } => {
match io.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(Message::Header(Version::V1)))) => {
*this.state = State::SendHeader { io }
Poll::Ready(Some(Ok(Message::Header(h)))) => {
match h {
HeaderLine::V1 => *this.state = State::SendHeader { io }
}
}
Poll::Ready(Some(Ok(_))) => {
return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
Expand All @@ -139,7 +141,8 @@ where
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(Version::V1)) {
let msg = Message::Header(HeaderLine::V1);
if let Err(err) = Pin::new(&mut io).start_send(msg) {
return Poll::Ready(Err(From::from(err)));
}

Expand Down
25 changes: 12 additions & 13 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError};
use crate::protocol::{Protocol, MessageReader, Message, ProtocolError, HeaderLine};

use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
use pin_project::pin_project;
Expand Down Expand Up @@ -83,9 +83,9 @@ impl<TInner> Negotiated<TInner> {
pub(crate) fn expecting(
io: MessageReader<TInner>,
protocol: Protocol,
version: Option<Version>
header: Option<HeaderLine>
) -> Self {
Negotiated { state: State::Expecting { io, protocol, version } }
Negotiated { state: State::Expecting { io, protocol, header } }
}

/// Polls the `Negotiated` for completion.
Expand Down Expand Up @@ -116,11 +116,11 @@ impl<TInner> Negotiated<TInner> {
// Read outstanding protocol negotiation messages.
loop {
match mem::replace(&mut *this.state, State::Invalid) {
State::Expecting { mut io, protocol, version } => {
State::Expecting { mut io, header, protocol } => {
let msg = match Pin::new(&mut io).poll_next(cx)? {
Poll::Ready(Some(msg)) => msg,
Poll::Pending => {
*this.state = State::Expecting { io, protocol, version };
*this.state = State::Expecting { io, header, protocol };
return Poll::Pending
},
Poll::Ready(None) => {
Expand All @@ -129,12 +129,11 @@ impl<TInner> Negotiated<TInner> {
}
};

match (&msg, version) {
(Message::Header(Version::V1), Some(Version::V1)) => {
*this.state = State::Expecting { io, protocol, version: None };
if let Message::Header(h) = &msg {
if Some(h) == header.as_ref() {
*this.state = State::Expecting { io, protocol, header: None };
continue
}
_ => {}
}

if let Message::Protocol(p) = &msg {
Expand Down Expand Up @@ -171,11 +170,11 @@ enum State<R> {
/// The underlying I/O stream.
#[pin]
io: MessageReader<R>,
/// The expected protocol (i.e. name and version).
protocol: Protocol,
/// The expected protocol negotiation (i.e. multistream-select) header,
/// The expected negotiation header/preamble (i.e. multistream-select version),
/// if one is still expected to be received.
version: Option<Version>,
header: Option<HeaderLine>,
/// The expected application protocol (i.e. name and version).
protocol: Protocol,
},

/// In this state, a protocol has been agreed upon and I/O
Expand Down
70 changes: 16 additions & 54 deletions misc/multistream-select/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
//! `Stream` and `Sink` implementations of `MessageIO` and
//! `MessageReader`.

use crate::Version;
use crate::length_delimited::{LengthDelimited, LengthDelimitedReader};

use bytes::{Bytes, BytesMut, BufMut};
Expand All @@ -42,51 +43,20 @@ const MSG_PROTOCOL_NA: &[u8] = b"na\n";
/// The encoded form of a multistream-select 'ls' message.
const MSG_LS: &[u8] = b"ls\n";

/// Supported multistream-select protocol versions.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Version {
/// Version 1 of the multistream-select protocol. See [1] and [2].
///
/// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
/// [2]: https://github.com/multiformats/multistream-select
/// The multistream-select header lines preceeding negotiation.
///
/// Every [`Version`] has a corresponding header line.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum HeaderLine {
/// The `/multistream/1.0.0` header line.
V1,
/// A "lazy" variant of version 1 that is identical on the wire but whereby
/// the dialer delays flushing protocol negotiation data in order to combine
/// it with initial application data, thus performing 0-RTT negotiation.
///
/// This strategy is only applicable for the node with the role of "dialer"
/// in the negotiation and only if the dialer supports just a single
/// application protocol. In that case the dialer immedidately "settles"
/// on that protocol, buffering the negotiation messages to be sent
/// with the first round of application protocol data (or an attempt
/// is made to read from the `Negotiated` I/O stream).
///
/// A listener will behave identically to `V1`. This ensures interoperability with `V1`.
/// Notably, it will immediately send the multistream header as well as the protocol
/// confirmation, resulting in multiple frames being sent on the underlying transport.
/// Nevertheless, if the listener supports the protocol that the dialer optimistically
/// settled on, it can be a 0-RTT negotiation.
///
/// > **Note**: `V1Lazy` is specific to `rust-libp2p`. The wire protocol is identical to `V1`
/// > and generally interoperable with peers only supporting `V1`. Nevertheless, there is a
/// > pitfall that is rarely encountered: When nesting multiple protocol negotiations, the
/// > listener should either be known to support all of the dialer's optimistically chosen
/// > protocols or there is must be no intermediate protocol without a payload and none of
/// > the protocol payloads must have the potential for being mistaken for a multistream-select
/// > protocol message. This avoids rare edge-cases whereby the listener may not recognize
/// > upgrade boundaries and erroneously process a request despite not supporting one of
/// > the intermediate protocols that the dialer committed to. See [1] and [2].
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
V1Lazy,
// Draft: https://github.com/libp2p/specs/pull/95
// V2,
}

impl Default for Version {
fn default() -> Self {
Version::V1
impl From<Version> for HeaderLine {
fn from(v: Version) -> HeaderLine {
match v {
Version::V1 | Version::V1Lazy => HeaderLine::V1,
}
}
}

Expand Down Expand Up @@ -133,7 +103,7 @@ impl fmt::Display for Protocol {
pub enum Message {
/// A header message identifies the multistream-select protocol
/// that the sender wishes to speak.
Header(Version),
Header(HeaderLine),
/// A protocol message identifies a protocol request or acknowledgement.
Protocol(Protocol),
/// A message through which a peer requests the complete list of
Expand All @@ -149,15 +119,7 @@ impl Message {
/// Encodes a `Message` into its byte representation.
pub fn encode(&self, dest: &mut BytesMut) -> Result<(), ProtocolError> {
match self {
Message::Header(Version::V1) => {
dest.reserve(MSG_MULTISTREAM_1_0.len());
dest.put(MSG_MULTISTREAM_1_0);
Ok(())
}
Message::Header(Version::V1Lazy) => {
// Note: Encoded identically to `V1`. `V1Lazy` only
// has a local effect for the dialer, determining when
// it flushes negotiation data.
Message::Header(HeaderLine::V1) => {
dest.reserve(MSG_MULTISTREAM_1_0.len());
dest.put(MSG_MULTISTREAM_1_0);
Ok(())
Expand Down Expand Up @@ -198,7 +160,7 @@ impl Message {
/// Decodes a `Message` from its byte representation.
pub fn decode(mut msg: Bytes) -> Result<Message, ProtocolError> {
if msg == MSG_MULTISTREAM_1_0 {
return Ok(Message::Header(Version::V1))
return Ok(Message::Header(HeaderLine::V1))
}

if msg == MSG_PROTOCOL_NA {
Expand Down Expand Up @@ -490,7 +452,7 @@ mod tests {
impl Arbitrary for Message {
fn arbitrary<G: Gen>(g: &mut G) -> Message {
match g.gen_range(0, 5) {
0 => Message::Header(Version::V1),
0 => Message::Header(HeaderLine::V1),
1 => Message::NotAvailable,
2 => Message::ListProtocols,
3 => Message::Protocol(Protocol::arbitrary(g)),
Expand Down

0 comments on commit 70f041b

Please sign in to comment.