Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework libp2p-identify #116

Merged
merged 19 commits into from
Mar 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/examples/echo-dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn main() {
// We now use the controller to dial to the address.
let (finished_tx, finished_rx) = oneshot::channel();
swarm_controller
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo| {
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo, _| {
// `echo` is what the closure used when initializing `proto` returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
Expand Down
2 changes: 1 addition & 1 deletion example/examples/ping-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn main() {
let (tx, rx) = oneshot::channel();
swarm_controller
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), ping::Ping,
|(mut pinger, future)| {
|(mut pinger, future), _| {
let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| {
println!("Received pong from the remote");
let _ = tx.send(());
Expand Down
4 changes: 2 additions & 2 deletions libp2p-dns/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
type RawConn = T::RawConn;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;

#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Expand Down Expand Up @@ -238,7 +238,7 @@ mod tests {
type RawConn = <TcpConfig as Transport>::RawConn;
type Listener = <TcpConfig as Transport>::Listener;
type ListenerUpgrade = <TcpConfig as Transport>::ListenerUpgrade;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;

#[inline]
fn listen_on(
Expand Down
232 changes: 46 additions & 186 deletions libp2p-identify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,200 +23,60 @@
//!
//! When two nodes connect to each other, the listening half sends a message to the dialing half,
//! indicating the information, and then the protocol stops.
//!
//! # Usage
//!
//! Both low-level and high-level usages are available.
//!
//! ## High-level usage through the `IdentifyTransport` struct
//!
//! This crate provides the `IdentifyTransport` struct, which wraps around a `Transport` and an
//! implementation of `Peerstore`. `IdentifyTransport` is itself a transport that accepts
//! multiaddresses of the form `/p2p/...` or `/ipfs/...`.
//!
//! > **Note**: All the documentation refers to `/p2p/...`, however `/ipfs/...` is also supported.
//!
//! If you dial a multiaddr of the form `/p2p/...`, then the `IdentifyTransport` will look into
//! the `Peerstore` for any known multiaddress for this peer and try to dial them using the
//! underlying transport. If you dial any other multiaddr, then it will dial this multiaddr using
//! the underlying transport, then negotiate the *identify* protocol with the remote in order to
//! obtain its ID, then add it to the peerstore, and finally dial the same multiaddr again and
//! return the connection.
//!
//! Listening doesn't support multiaddresses of the form `/p2p/...` (because that wouldn't make
//! sense). Any address passed to `listen_on` will be passed directly to the underlying transport.
//!
//! Whenever a remote connects to us, either through listening or through `next_incoming`, the
//! `IdentifyTransport` dials back the remote, upgrades the connection to the *identify* protocol
//! in order to obtain the ID of the remote, stores the information in the peerstore, and finally
//! only returns the connection. From the exterior, the multiaddress of the remote is of the form
//! `/p2p/...`. If the remote doesn't support the *identify* protocol, then the socket is closed.
//!
//! Because of the behaviour of `IdentifyProtocol`, it is recommended to build it on top of a
//! `ConnectionReuse`.
//!
//! ## Low-level usage through the `IdentifyProtocolConfig` struct
//!
//! The `IdentifyProtocolConfig` struct implements the `ConnectionUpgrade` trait. Using it will
//! negotiate the *identify* protocol.
//!
//! The output of the upgrade is a `IdentifyOutput`. If we are the dialer, then `IdentifyOutput`
//! will contain the information sent by the remote. If we are the listener, then it will contain
//! a `IdentifySender` struct that can be used to transmit back to the remote the information about
//! it.

extern crate bytes;
extern crate futures;
extern crate multiaddr;
extern crate libp2p_peerstore;
extern crate libp2p_swarm;
extern crate multiaddr;
extern crate protobuf;
extern crate tokio_io;
extern crate varint;

use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, Sink};
use libp2p_swarm::{ConnectionUpgrade, Endpoint};
use multiaddr::Multiaddr;
use protobuf::Message as ProtobufMessage;
use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes;
use protobuf::repeated::RepeatedField;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec;
pub use self::protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig, IdentifySender};
pub use self::transport::IdentifyTransport;

mod protocol;
mod structs_proto;

/// Prototype for an upgrade to the identity protocol.
#[derive(Debug, Clone)]
pub struct IdentifyProtocol {
/// Our public key to report to the remote.
pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
pub protocol_version: String,
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
/// of HTTP.
pub agent_version: String,
/// Addresses that we are listening on.
pub listen_addrs: Vec<Multiaddr>,
/// Protocols supported by us.
pub protocols: Vec<String>,
}

/// Information sent from the listener to the dialer.
#[derive(Debug, Clone)]
pub struct IdentifyInfo {
/// Public key of the node.
pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
pub protocol_version: String,
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
/// of HTTP.
pub agent_version: String,
/// Addresses that the remote is listening on.
pub listen_addrs: Vec<Multiaddr>,
/// Our own address as reported by the remote.
pub observed_addr: Multiaddr,
/// Protocols supported by the remote.
pub protocols: Vec<String>,
}

impl<C> ConnectionUpgrade<C> for IdentifyProtocol
where C: AsyncRead + AsyncWrite + 'static
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = Option<IdentifyInfo>;
type Future = Box<Future<Item = Self::Output, Error = IoError>>;

#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
}

fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future {
let socket = socket.framed(VarintCodec::default());

match ty {
Endpoint::Dialer => {
let future = socket.into_future()
.map(|(msg, _)| msg)
.map_err(|(err, _)| err)
.and_then(|msg| if let Some(msg) = msg {
Ok(Some(parse_proto_msg(msg)?))
} else {
Ok(None)
});

Box::new(future) as Box<_>
}

Endpoint::Listener => {
let listen_addrs = self.listen_addrs
.into_iter()
.map(|addr| addr.to_string().into_bytes())
.collect();

let mut message = structs_proto::Identify::new();
message.set_agentVersion(self.agent_version);
message.set_protocolVersion(self.protocol_version);
message.set_publicKey(self.public_key);
message.set_listenAddrs(listen_addrs);
message.set_observedAddr(remote_addr.to_string().into_bytes());
message.set_protocols(RepeatedField::from_vec(self.protocols));

let bytes = message.write_to_bytes()
.expect("writing protobuf failed ; should never happen");

// On the server side, after sending the information to the client we make the
// future produce a `None`. If we were on the client side, this would contain the
// information received by the server.
let future = socket.send(bytes).map(|_| None);
Box::new(future) as Box<_>
}
}
}
}

// Turns a protobuf message into an `IdentifyInfo`. If something bad happens, turn it into
// an `IoError`.
fn parse_proto_msg(msg: BytesMut) -> Result<IdentifyInfo, IoError> {
match protobuf_parse_from_bytes::<structs_proto::Identify>(&msg) {
Ok(mut msg) => {
let listen_addrs = {
let mut addrs = Vec::new();
for addr in msg.take_listenAddrs().into_iter() {
addrs.push(bytes_to_multiaddr(addr)?);
}
addrs
};

let observed_addr = bytes_to_multiaddr(msg.take_observedAddr())?;

Ok(IdentifyInfo {
public_key: msg.take_publicKey(),
protocol_version: msg.take_protocolVersion(),
agent_version: msg.take_agentVersion(),
listen_addrs: listen_addrs,
observed_addr: observed_addr,
protocols: msg.take_protocols().into_vec(),
})
}

Err(err) => {
Err(IoError::new(IoErrorKind::InvalidData, err))
}
}
}

// Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into an `IoError`.
fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, IoError> {
String::from_utf8(bytes)
.map_err(|err| {
IoError::new(IoErrorKind::InvalidData, err)
})
.and_then(|s| {
s.parse()
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
})
}

#[cfg(test)]
mod tests {
extern crate libp2p_tcp_transport;
extern crate tokio_core;

use self::libp2p_tcp_transport::TcpConfig;
use self::tokio_core::reactor::Core;
use IdentifyProtocol;
use futures::{IntoFuture, Future, Stream};
use libp2p_swarm::Transport;

#[test]
fn basic() {
let mut core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let with_proto = tcp.with_upgrade(IdentifyProtocol {
public_key: vec![1, 2, 3, 4],
protocol_version: "ipfs/1.0.0".to_owned(),
agent_version: "agent/version".to_owned(),
listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()],
protocols: vec!["ping".to_owned(), "kad".to_owned()],
});

let (server, addr) = with_proto.clone()
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let server = server.into_future()
.map_err(|(err, _)| err)
.and_then(|(n, _)| n.unwrap().0);
let dialer = with_proto.dial(addr)
.unwrap()
.into_future();

let (recv, should_be_empty) = core.run(dialer.join(server)).unwrap();
assert!(should_be_empty.is_none());
let recv = recv.unwrap();
assert_eq!(recv.public_key, &[1, 2, 3, 4]);
}
}
mod transport;
Loading