diff --git a/.gx/lastpubver b/.gx/lastpubver index 8290b17..d68bc3e 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -2.2.14: QmPUHzTLPZFYqv8WqcBTuMFYTgeom4uHHEaxzk7bd5GYZB +3.0.0: QmYnjSGtvn7LhrxCvwrU9uDWxKyg28uBYeXvgzTDDDzVy4 diff --git a/README.md b/README.md index d1cc136..4b2e2e2 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,12 @@ A common interface for network transports. This is the 'base' layer for any transport that wants to be used by libp2p and ipfs. If you want to make 'ipfs work over X', the first thing you'll want to do is to implement the `Transport` interface for 'X'. +Transports are: + +* Encrypted: Connections must be end-to-end encrypted. +* Authenticated: The endpoints, RemotePeer and LocalPeer, must be authenticated. +* Multiplexed: It must be possible to multiplex multiple reliable streams over a single transport connection. + ## Install ```sh @@ -23,23 +29,9 @@ This is the 'base' layer for any transport that wants to be used by libp2p and i ## Usage -```go -var t Transport - -t = NewTCPTransport() +To actually *use* a transport, you'll likely want to register it with a `transport.Network` (e.g., [go-libp2p-swarm](https://github.com/libp2p/go-libp2p-swarm)). However, you're probably more interested in *implementing* transports. -list, err := t.Listen(listener_maddr) -if err != nil { - log.Fatal(err) -} - -con, err := list.Accept() -if err != nil { - log.Fatal(err) -} - -fmt.Fprintln(con, "Hello World!") -``` +Transports construct fully featured, encrypted, multiplexed connections. However, there's a fairly good chance your transport won't meet all of those requirements. To make life easier, we've created a helper library called [go-libp2p-transport-upgrader](https://github.com/libp2p/go-libp2p-transport-upgrader) for upgrading simple stream transports to fully-featured (encrypted, authenticated, multiplexed) transports. Check out that packages [README]([go-libp2p-transport-upgrader](https://github.com/libp2p/go-libp2p-transport-upgrader/blob/master/README.md) for an example. ## Contribute diff --git a/fallback.go b/fallback.go deleted file mode 100644 index 8fd9636..0000000 --- a/fallback.go +++ /dev/null @@ -1,53 +0,0 @@ -package transport - -import ( - "context" - "fmt" - - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" - mafmt "github.com/whyrusleeping/mafmt" -) - -type FallbackDialer struct { - madialer manet.Dialer -} - -var _ Dialer = &FallbackDialer{} - -func (fbd *FallbackDialer) Matches(a ma.Multiaddr) bool { - return mafmt.TCP.Matches(a) -} - -func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) { - return fbd.DialContext(context.Background(), a) -} - -func (fbd *FallbackDialer) DialContext(ctx context.Context, a ma.Multiaddr) (Conn, error) { - if mafmt.TCP.Matches(a) { - return fbd.tcpDial(ctx, a) - } - return nil, fmt.Errorf("cannot dial %s with fallback dialer", a) -} - -func (fbd *FallbackDialer) tcpDial(ctx context.Context, raddr ma.Multiaddr) (Conn, error) { - var c manet.Conn - var err error - c, err = fbd.madialer.DialContext(ctx, raddr) - - if err != nil { - return nil, err - } - - return &fallbackConn{ - Conn: c, - }, nil -} - -type fallbackConn struct { - manet.Conn -} - -func (c *fallbackConn) Transport() Transport { - return nil -} diff --git a/fallback_test.go b/fallback_test.go deleted file mode 100644 index b69e733..0000000 --- a/fallback_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package transport - -import ( - "bytes" - "fmt" - "io" - "testing" - - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" -) - -func assertWrite(w io.Writer, data []byte) error { - n, err := w.Write(data) - if err != nil { - return err - } - - if n != len(data) { - return fmt.Errorf("didnt write the correct amount of data (exp: %d, got: %d)", len(data), n) - } - - return nil -} - -func assertRead(r io.Reader, exp []byte) error { - buf := make([]byte, len(exp)) - _, err := io.ReadFull(r, buf) - if err != nil { - return err - } - - if !bytes.Equal(buf, exp) { - return fmt.Errorf("read wrong data %s vs %s", buf, exp) - } - return nil -} - -func TestFallbackDialTcp(t *testing.T) { - laddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - if err != nil { - t.Fatal(err) - } - - list, err := manet.Listen(laddr) - if err != nil { - t.Fatal(err) - } - - done := make(chan bool) - message := []byte("this is only a test") - go func() { - defer close(done) - scon, err := list.Accept() - if err != nil { - t.Error(err) - } - - err = assertWrite(scon, message) - if err != nil { - t.Error(err) - } - }() - - fbd := new(FallbackDialer) - - if !fbd.Matches(list.Multiaddr()) { - t.Fatal("fallback dialer should match tcp multiaddr") - } - - con, err := fbd.Dial(list.Multiaddr()) - if err != nil { - t.Fatal(err) - } - - err = assertRead(con, message) - if err != nil { - t.Fatal(err) - } - - <-done -} - -func TestCantDialUDP(t *testing.T) { - fbd := new(FallbackDialer) - - udpa, err := ma.NewMultiaddr("/ip4/1.2.3.4/udp/9876") - if err != nil { - t.Fatal(err) - } - - _, err = fbd.Dial(udpa) - if err == nil { - t.Fatal("fallback dialer shouldnt be able to dial udp connections") - } -} diff --git a/package.json b/package.json index 5feb34b..3909107 100644 --- a/package.json +++ b/package.json @@ -7,18 +7,6 @@ "dvcsimport": "github.com/libp2p/go-libp2p-transport" }, "gxDependencies": [ - { - "author": "jbenet", - "hash": "QmRK2LxanhK2gZq6k6R7vk5ZoYZk8ULSSTB7FzDsMUX6CB", - "name": "go-multiaddr-net", - "version": "1.5.7" - }, - { - "author": "whyrusleeping", - "hash": "QmTy17Jm1foTnvUS9JXRhLbRQ3XuC64jPTjUfpB4mHz2QM", - "name": "mafmt", - "version": "1.2.5" - }, { "hash": "QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7", "name": "go-log", @@ -29,6 +17,24 @@ "hash": "QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb", "name": "go-multiaddr", "version": "1.2.6" + }, + { + "author": "whyrusleeping", + "hash": "QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen", + "name": "go-libp2p-net", + "version": "3.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmY9JXR3FupnYAYJWK9aMr9bCpqWKcToQ1tz8DVGTrHpHw", + "name": "go-stream-muxer", + "version": "3.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74", + "name": "go-libp2p-peer", + "version": "2.3.2" } ], "gxVersion": "0.4.0", @@ -36,6 +42,6 @@ "license": "MIT", "name": "go-libp2p-transport", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "2.2.14" + "version": "3.0.0" } diff --git a/test/utils.go b/test/utils.go index 111640f..70e05fb 100644 --- a/test/utils.go +++ b/test/utils.go @@ -1,120 +1,185 @@ package utils import ( - "fmt" + "context" "io" "testing" + "time" + peer "github.com/libp2p/go-libp2p-peer" tpt "github.com/libp2p/go-libp2p-transport" + smux "github.com/libp2p/go-stream-muxer" ma "github.com/multiformats/go-multiaddr" ) -func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string) { +type streamAndConn struct { + stream smux.Stream + conn tpt.Conn +} + +var testData = []byte("this is some test data") + +var Subtests = map[string]func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ + "Protocols": SubtestProtocols, + "Basic": SubtestBasic, + "Cancel": SubtestCancel, +} + +func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA peer.ID) { maddr, err := ma.NewMultiaddr(addr) if err != nil { t.Fatal(err) } - - list, err := ta.Listen(maddr) - if err != nil { - t.Fatal(err) + for n, f := range Subtests { + t.Run(n, func(t *testing.T) { + f(t, ta, tb, maddr, peerA) + }) } +} - dialer, err := tb.Dialer(list.Multiaddr()) - if err != nil { - t.Fatal(err) +func SubtestProtocols(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID) { + rawIPAddr, _ := ma.NewMultiaddr("/ip4/1.2.3.4") + if ta.CanDial(rawIPAddr) || tb.CanDial(rawIPAddr) { + t.Error("nothing should be able to dial raw IP") } - accepted := make(chan tpt.Conn, 1) - errs := make(chan error, 1) - go func() { - b, err := list.Accept() - if err != nil { - errs <- err - return - } - - accepted <- b - }() - - a, err := dialer.Dial(list.Multiaddr()) - if err != nil { - t.Fatal(err) + tprotos := make(map[int]bool) + for _, p := range ta.Protocols() { + tprotos[p] = true } - var b tpt.Conn - select { - case b = <-accepted: - case err := <-errs: - t.Fatal(err) + if !ta.Proxy() { + protos := maddr.Protocols() + proto := protos[len(protos)-1] + if !tprotos[proto.Code] { + t.Errorf("transport should have reported that it supports protocol '%s' (%d)", proto.Name, proto.Code) + } + } else { + found := false + for _, proto := range maddr.Protocols() { + if tprotos[proto.Code] { + found = true + break + } + } + if !found { + t.Errorf("didn't find any matching proxy protocols in maddr: %s", maddr) + } } +} - defer a.Close() - defer b.Close() +func SubtestBasic(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - err = checkDataTransfer(a, b) + list, err := ta.Listen(maddr) if err != nil { t.Fatal(err) } + defer list.Close() -} - -func checkDataTransfer(a, b io.ReadWriter) error { - errs := make(chan error, 2) - data := []byte("this is some test data") - + done := make(chan struct{}) go func() { - n, err := a.Write(data) + defer close(done) + c, err := list.Accept() if err != nil { - errs <- err + t.Fatal(err) return } - - if n != len(data) { - errs <- fmt.Errorf("failed to write enough data (a->b)") - return - } - - buf := make([]byte, len(data)) - _, err = io.ReadFull(a, buf) + s, err := c.AcceptStream() if err != nil { - errs <- err + c.Close() + t.Fatal(err) return } - errs <- nil - }() - - go func() { - buf := make([]byte, len(data)) - _, err := io.ReadFull(b, buf) + buf := make([]byte, len(testData)) + _, err = io.ReadFull(s, buf) if err != nil { - errs <- err + t.Fatal(err) return } - n, err := b.Write(data) + n, err := s.Write(testData) if err != nil { - errs <- err + t.Fatal(err) return } + s.Close() - if n != len(data) { - errs <- fmt.Errorf("failed to write enough data (b->a)") + if n != len(testData) { + t.Fatal(err) return } - - errs <- nil }() - err := <-errs + if !tb.CanDial(list.Multiaddr()) { + t.Error("CanDial should have returned true") + } + + c, err := tb.Dial(ctx, list.Multiaddr(), peerA) if err != nil { - return err + t.Fatal(err) } - err = <-errs + defer c.Close() + + s, err := c.OpenStream() if err != nil { - return err + t.Fatal(err) } - return nil + n, err := s.Write(testData) + if err != nil { + t.Fatal(err) + return + } + + if n != len(testData) { + t.Fatalf("failed to write enough data (a->b)") + return + } + + buf := make([]byte, len(testData)) + _, err = io.ReadFull(s, buf) + if err != nil { + t.Fatal(err) + return + } +} + +func SubtestCancel(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID) { + list, err := ta.Listen(maddr) + if err != nil { + t.Fatal(err) + } + defer list.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + done := make(chan struct{}) + go func() { + defer close(done) + c, err := tb.Dial(ctx, list.Multiaddr(), peerA) + if err == nil { + c.Close() + t.Fatal("dial should have failed") + } + }() + + time.Sleep(time.Millisecond) + cancel() + <-done + + done = make(chan struct{}) + go func() { + defer close(done) + c, err := list.Accept() + if err == nil { + c.Close() + t.Fatal("accept should have failed") + } + }() + time.Sleep(time.Millisecond) + list.Close() + <-done } diff --git a/transport.go b/transport.go index 759d697..e81428e 100644 --- a/transport.go +++ b/transport.go @@ -3,19 +3,35 @@ package transport import ( "context" "net" + "time" logging "github.com/ipfs/go-log" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + smux "github.com/libp2p/go-stream-muxer" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" ) +// DialTimeout is the maximum duration a Dial is allowed to take. +// This includes the time between dialing the raw network connection, +// protocol selection as well the handshake, if applicable. +var DialTimeout = 60 * time.Second + +// AcceptTimeout is the maximum duration an Accept is allowed to take. +// This includes the time between accepting the raw network connection, +// protocol selection as well as the handshake, if applicable. +var AcceptTimeout = 60 * time.Second + var log = logging.Logger("transport") // Conn is an extension of the net.Conn interface that provides multiaddr // information, and an accessor for the transport used to create the conn type Conn interface { - manet.Conn + smux.Conn + inet.ConnSecurity + inet.ConnMultiaddrs + // Transport returns the transport to which this connection belongs. Transport() Transport } @@ -24,19 +40,31 @@ type Conn interface { // but many more can be implemented, sctp, audio signals, sneakernet, UDT, a // network of drones carrying usb flash drives, and so on. type Transport interface { - Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) + // Dial dials a remote peer. It should try to reuse local listener + // addresses if possible but it may choose not to. + Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (Conn, error) + + // CanDial returns true if this transport knows how to dial the given + // multiaddr. + // + // Returning true does not guarantee that dialing this multiaddr will + // succeed. This function should *only* be used to preemptively filter + // out addresses that we can't dial. + CanDial(addr ma.Multiaddr) bool + + // Listen listens on the passed multiaddr. Listen(laddr ma.Multiaddr) (Listener, error) - Matches(ma.Multiaddr) bool -} -// Dialer is an abstraction that is normally filled by an object containing -// information/options around how to perform the dial. An example would be -// setting TCP dial timeout for all dials made, or setting the local address -// that we dial out from. -type Dialer interface { - Dial(raddr ma.Multiaddr) (Conn, error) - DialContext(ctx context.Context, raddr ma.Multiaddr) (Conn, error) - Matches(ma.Multiaddr) bool + // Protocol returns the set of protocols handled by this transport. + // + // See the Network interface for an explanation of how this is used. + Protocols() []int + + // Proxy returns true if this is a proxy transport. + // + // See the Network interface for an explanation of how this is used. + // TODO: Make this a part of the go-multiaddr protocol instead? + Proxy() bool } // Listener is an interface closely resembling the net.Listener interface. The @@ -50,9 +78,20 @@ type Listener interface { Multiaddr() ma.Multiaddr } -// DialOpt is an option used for configuring dialer behaviour -type DialOpt interface{} +// Network is an inet.Network with methods for managing transports. +type Network interface { + inet.Network -type ReuseportOpt bool - -var ReusePorts ReuseportOpt = true + // AddTransport adds a transport to this Network. + // + // When dialing, this Network will iterate over the protocols in the + // remote multiaddr and pick the first protocol registered with a proxy + // transport, if any. Otherwise, it'll pick the transport registered to + // handle the last protocol in the multiaddr. + // + // When listening, this Network will iterate over the protocols in the + // local multiaddr and pick the *last* protocol registered with a proxy + // transport, if any. Otherwise, it'll pick the transport registered to + // handle the last protocol in the multiaddr. + AddTransport(t Transport) error +}