diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index a365f3e1bed..a3bef025fa3 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -161,13 +161,12 @@ func TestCommands(t *testing.T) { "/object/put", "/object/stat", "/p2p", - "/p2p/listener", - "/p2p/listener/close", - "/p2p/listener/ls", - "/p2p/listener/open", + "/p2p/close", + "/p2p/forward", + "/p2p/listen", + "/p2p/ls", "/p2p/stream", "/p2p/stream/close", - "/p2p/stream/dial", "/p2p/stream/ls", "/pin", "/pin/add", diff --git a/core/commands/p2p.go b/core/commands/p2p.go index ca852fafcd8..dda18de4c16 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -2,33 +2,41 @@ package commands import ( "bytes" + "context" "errors" "fmt" "io" "strconv" + "strings" "text/tabwriter" cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" + p2p "github.com/ipfs/go-ipfs/p2p" "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" + "gx/ipfs/QmePSRaGafvmURQwQkHPDBJsaGwKXC1WpBBHVCQxdr8FPn/go-ipfs-addr" ) +// P2PProtoPrefix is the default required prefix for protocol names +const P2PProtoPrefix = "/x/" + // P2PListenerInfoOutput is output type of ls command type P2PListenerInfoOutput struct { - Protocol string - Address string + Protocol string + ListenAddress string + TargetAddress string } // P2PStreamInfoOutput is output type of streams command type P2PStreamInfoOutput struct { HandlerID string Protocol string - LocalPeer string - LocalAddress string - RemotePeer string - RemoteAddress string + OriginAddress string + TargetAddress string } // P2PLsOutput is output type of ls command @@ -53,124 +61,198 @@ are refined`, }, Subcommands: map[string]*cmds.Command{ - "listener": p2pListenerCmd, - "stream": p2pStreamCmd, + "stream": p2pStreamCmd, + + "forward": p2pForwardCmd, + "listen": p2pListenCmd, + "close": p2pCloseCmd, + "ls": p2pLsCmd, }, } -// p2pListenerCmd is the 'ipfs p2p listener' command -var p2pListenerCmd = &cmds.Command{ +var p2pForwardCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ - Tagline: "P2P listener management.", - ShortDescription: "Create and manage listener p2p endpoints", - }, + Tagline: "Forward connections to libp2p service", + ShortDescription: ` +Forward connections made to to . - Subcommands: map[string]*cmds.Command{ - "ls": p2pListenerLsCmd, - "open": p2pListenerListenCmd, - "close": p2pListenerCloseCmd, - }, -} + specifies the libp2p protocol name to use for libp2p +connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'. -// p2pStreamCmd is the 'ipfs p2p stream' command -var p2pStreamCmd = &cmds.Command{ - Helptext: cmdkit.HelpText{ - Tagline: "P2P stream management.", - ShortDescription: "Create and manage p2p streams", +Example: + ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /ipfs/QmPeer + - Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /ipfs/QmPeer + +`, }, + Arguments: []cmdkit.Argument{ + cmdkit.StringArg("protocol", true, false, "Protocol name."), + cmdkit.StringArg("listen-address", true, false, "Listening endpoint."), + cmdkit.StringArg("target-address", true, false, "Target endpoint."), + }, + Options: []cmdkit.Option{ + cmdkit.BoolOption("allow-custom-protocol", "Don't require /x/ prefix"), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := p2pGetNode(req) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } - Subcommands: map[string]*cmds.Command{ - "ls": p2pStreamLsCmd, - "dial": p2pStreamDialCmd, - "close": p2pStreamCloseCmd, + protoOpt := req.Arguments()[0] + listenOpt := req.Arguments()[1] + targetOpt := req.Arguments()[2] + + proto := protocol.ID(protoOpt) + + listen, err := ma.NewMultiaddr(listenOpt) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + target, err := ipfsaddr.ParseString(targetOpt) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + allowCustom, _, err := req.Option("allow-custom-protocol").Bool() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) { + res.SetError(errors.New("protocol name must be within '"+P2PProtoPrefix+"' namespace"), cmdkit.ErrNormal) + return + } + + if err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, target); err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + res.SetOutput(nil) }, } -var p2pListenerLsCmd = &cmds.Command{ +var p2pListenCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ - Tagline: "List active p2p listeners.", + Tagline: "Create libp2p service", + ShortDescription: ` +Create libp2p service and forward connections made to . + + specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'. + +Example: + ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234 + - Forward connections to 'myproto' libp2p service to 127.0.0.1:1234 + +`, + }, + Arguments: []cmdkit.Argument{ + cmdkit.StringArg("protocol", true, false, "Protocol name."), + cmdkit.StringArg("target-address", true, false, "Target endpoint."), }, Options: []cmdkit.Option{ - cmdkit.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote)."), + cmdkit.BoolOption("allow-custom-protocol", "Don't require /x/ prefix"), }, Run: func(req cmds.Request, res cmds.Response) { + n, err := p2pGetNode(req) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + protoOpt := req.Arguments()[0] + targetOpt := req.Arguments()[1] - n, err := getNode(req) + proto := protocol.ID(protoOpt) + + target, err := ma.NewMultiaddr(targetOpt) if err != nil { res.SetError(err, cmdkit.ErrNormal) return } - output := &P2PLsOutput{} + allowCustom, _, err := req.Option("allow-custom-protocol").Bool() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } - for _, listener := range n.P2P.Listeners.Listeners { - output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ - Protocol: listener.Protocol, - Address: listener.Address.String(), - }) + if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) { + res.SetError(errors.New("protocol name must be within '"+P2PProtoPrefix+"' namespace"), cmdkit.ErrNormal) + return } - res.SetOutput(output) + if err := forwardRemote(n.Context(), n.P2P, proto, target); err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + res.SetOutput(nil) }, - Type: P2PLsOutput{}, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } +} - headers, _, _ := res.Request().Option("headers").Bool() - list := v.(*P2PLsOutput) - buf := new(bytes.Buffer) - w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) - for _, listener := range list.Listeners { - if headers { - fmt.Fprintln(w, "Address\tProtocol") - } +// forwardRemote forwards libp2p service connections to a manet address +func forwardRemote(ctx context.Context, p *p2p.P2P, proto protocol.ID, target ma.Multiaddr) error { + // TODO: return some info + _, err := p.ForwardRemote(ctx, proto, target) + return err +} - fmt.Fprintf(w, "%s\t%s\n", listener.Address, listener.Protocol) - } - w.Flush() +// forwardLocal forwards local connections to a libp2p service +func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr ipfsaddr.IPFSAddr) error { + if addr != nil { + ps.AddAddr(addr.ID(), addr.Multiaddr(), pstore.TempAddrTTL) + } - return buf, nil - }, - }, + // TODO: return some info + _, err := p.ForwardLocal(ctx, addr.ID(), proto, bindAddr) + return err } -var p2pStreamLsCmd = &cmds.Command{ +var p2pLsCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ - Tagline: "List active p2p streams.", + Tagline: "List active p2p listeners.", }, Options: []cmdkit.Option{ - cmdkit.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote)."), + cmdkit.BoolOption("headers", "v", "Print table headers (Protocol, Listen, Target)."), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return } - output := &P2PStreamsOutput{} - - for _, s := range n.P2P.Streams.Streams { - output.Streams = append(output.Streams, P2PStreamInfoOutput{ - HandlerID: strconv.FormatUint(s.HandlerID, 10), - - Protocol: s.Protocol, + output := &P2PLsOutput{} - LocalPeer: s.LocalPeer.Pretty(), - LocalAddress: s.LocalAddr.String(), + n.P2P.ListenersLocal.Lock() + for _, listener := range n.P2P.ListenersLocal.Listeners { + output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ + Protocol: string(listener.Protocol()), + ListenAddress: listener.ListenAddress().String(), + TargetAddress: listener.TargetAddress().String(), + }) + } + n.P2P.ListenersLocal.Unlock() - RemotePeer: s.RemotePeer.Pretty(), - RemoteAddress: s.RemoteAddr.String(), + n.P2P.ListenersP2P.Lock() + for _, listener := range n.P2P.ListenersP2P.Listeners { + output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ + Protocol: string(listener.Protocol()), + ListenAddress: listener.ListenAddress().String(), + TargetAddress: listener.TargetAddress().String(), }) } + n.P2P.ListenersP2P.Unlock() res.SetOutput(output) }, - Type: P2PStreamsOutput{}, + Type: P2PLsOutput{}, Marshalers: cmds.MarshalerMap{ cmds.Text: func(res cmds.Response) (io.Reader, error) { v, err := unwrapOutput(res.Output()) @@ -179,15 +261,15 @@ var p2pStreamLsCmd = &cmds.Command{ } headers, _, _ := res.Request().Option("headers").Bool() - list := v.(*P2PStreamsOutput) + list := v.(*P2PLsOutput) buf := new(bytes.Buffer) w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) - for _, stream := range list.Streams { + for _, listener := range list.Listeners { if headers { - fmt.Fprintln(w, "HandlerID\tProtocol\tLocal\tRemote") + fmt.Fprintln(w, "Protocol\tListen Address\tTarget Address") } - fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer) + fmt.Fprintf(w, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress) } w.Flush() @@ -196,149 +278,161 @@ var p2pStreamLsCmd = &cmds.Command{ }, } -var p2pListenerListenCmd = &cmds.Command{ +var p2pCloseCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ - Tagline: "Forward p2p connections to a network multiaddr.", - ShortDescription: ` -Register a p2p connection handler and forward the connections to a specified -address. - -Note that the connections originate from the ipfs daemon process. - `, + Tagline: "Stop listening for new connections to forward.", }, - Arguments: []cmdkit.Argument{ - cmdkit.StringArg("Protocol", true, false, "Protocol identifier."), - cmdkit.StringArg("Address", true, false, "Request handling application address."), + Options: []cmdkit.Option{ + cmdkit.BoolOption("all", "a", "Close all listeners."), + cmdkit.StringOption("protocol", "p", "Match protocol name"), + cmdkit.StringOption("listen-address", "l", "Match listen address"), + cmdkit.StringOption("target-address", "t", "Match target address"), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return } - proto := "/p2p/" + req.Arguments()[0] - if n.P2P.CheckProtoExists(proto) { - res.SetError(errors.New("protocol handler already registered"), cmdkit.ErrNormal) - return - } + closeAll, _, _ := req.Option("all").Bool() + protoOpt, p, _ := req.Option("protocol").String() + listenOpt, l, _ := req.Option("listen-address").String() + targetOpt, t, _ := req.Option("target-address").String() + + proto := protocol.ID(protoOpt) - addr, err := ma.NewMultiaddr(req.Arguments()[1]) + listen, err := ma.NewMultiaddr(listenOpt) if err != nil { res.SetError(err, cmdkit.ErrNormal) return } - _, err = n.P2P.NewListener(n.Context(), proto, addr) + target, err := ma.NewMultiaddr(targetOpt) if err != nil { res.SetError(err, cmdkit.ErrNormal) return } - // Successful response. - res.SetOutput(&P2PListenerInfoOutput{ - Protocol: proto, - Address: addr.String(), - }) - }, -} - -var p2pStreamDialCmd = &cmds.Command{ - Helptext: cmdkit.HelpText{ - Tagline: "Dial to a p2p listener.", - - ShortDescription: ` -Establish a new connection to a peer service. - -When a connection is made to a peer service the ipfs daemon will setup one -time TCP listener and return it's bind port, this way a dialing application -can transparently connect to a p2p service. - `, - }, - Arguments: []cmdkit.Argument{ - cmdkit.StringArg("Peer", true, false, "Remote peer to connect to"), - cmdkit.StringArg("Protocol", true, false, "Protocol identifier."), - cmdkit.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."), - }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) + if !(closeAll || p || l || t) { + res.SetError(errors.New("no matching options given"), cmdkit.ErrNormal) return } - addr, peer, err := ParsePeerParam(req.Arguments()[0]) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) + if closeAll && (p || l || t) { + res.SetError(errors.New("can't combine --all with other matching options"), cmdkit.ErrNormal) return } - proto := "/p2p/" + req.Arguments()[1] - - bindAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - if len(req.Arguments()) == 3 { - bindAddr, err = ma.NewMultiaddr(req.Arguments()[2]) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + match := func(listener p2p.Listener) bool { + if closeAll { + return true } + if p && proto != listener.Protocol() { + return false + } + if l && !listen.Equal(listener.ListenAddress()) { + return false + } + if t && !target.Equal(listener.TargetAddress()) { + return false + } + return true } - listenerInfo, err := n.P2P.Dial(n.Context(), addr, peer, proto, bindAddr) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } + done := n.P2P.ListenersLocal.Close(match) + done += n.P2P.ListenersP2P.Close(match) - output := P2PListenerInfoOutput{ - Protocol: listenerInfo.Protocol, - Address: listenerInfo.Address.String(), - } + res.SetOutput(done) + }, + Type: int(0), + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err + } + + closed := v.(int) + buf := new(bytes.Buffer) + fmt.Fprintf(buf, "Closed %d stream(s)\n", closed) - res.SetOutput(&output) + return buf, nil + }, }, } -var p2pListenerCloseCmd = &cmds.Command{ +/////// +// Stream +// + +// p2pStreamCmd is the 'ipfs p2p stream' command +var p2pStreamCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ - Tagline: "Close active p2p listener.", + Tagline: "P2P stream management.", + ShortDescription: "Create and manage p2p streams", }, - Arguments: []cmdkit.Argument{ - cmdkit.StringArg("Protocol", false, false, "P2P listener protocol"), + + Subcommands: map[string]*cmds.Command{ + "ls": p2pStreamLsCmd, + "close": p2pStreamCloseCmd, + }, +} + +var p2pStreamLsCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "List active p2p streams.", }, Options: []cmdkit.Option{ - cmdkit.BoolOption("all", "a", "Close all listeners."), + cmdkit.BoolOption("headers", "v", "Print table headers (ID, Protocol, Local, Remote)."), }, Run: func(req cmds.Request, res cmds.Response) { - res.SetOutput(nil) - - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return } - closeAll, _, _ := req.Option("all").Bool() - var proto string + output := &P2PStreamsOutput{} - if !closeAll { - if len(req.Arguments()) == 0 { - res.SetError(errors.New("no protocol name specified"), cmdkit.ErrNormal) - return - } + n.P2P.Streams.Lock() + for id, s := range n.P2P.Streams.Streams { + output.Streams = append(output.Streams, P2PStreamInfoOutput{ + HandlerID: strconv.FormatUint(id, 10), + + Protocol: string(s.Protocol), - proto = "/p2p/" + req.Arguments()[0] + OriginAddress: s.OriginAddr.String(), + TargetAddress: s.TargetAddr.String(), + }) } + n.P2P.Streams.Unlock() - for _, listener := range n.P2P.Listeners.Listeners { - if !closeAll && listener.Protocol != proto { - continue + res.SetOutput(output) + }, + Type: P2PStreamsOutput{}, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err } - listener.Close() - if !closeAll { - break + + headers, _, _ := res.Request().Option("headers").Bool() + list := v.(*P2PStreamsOutput) + buf := new(bytes.Buffer) + w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) + for _, stream := range list.Streams { + if headers { + fmt.Fprintln(w, "ID\tProtocol\tOrigin\tTarget") + } + + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress) } - } + w.Flush() + + return buf, nil + }, }, } @@ -347,7 +441,7 @@ var p2pStreamCloseCmd = &cmds.Command{ Tagline: "Close active p2p stream.", }, Arguments: []cmdkit.Argument{ - cmdkit.StringArg("HandlerID", false, false, "Stream HandlerID"), + cmdkit.StringArg("id", false, false, "Stream identifier"), }, Options: []cmdkit.Option{ cmdkit.BoolOption("all", "a", "Close all streams."), @@ -355,7 +449,7 @@ var p2pStreamCloseCmd = &cmds.Command{ Run: func(req cmds.Request, res cmds.Response) { res.SetOutput(nil) - n, err := getNode(req) + n, err := p2pGetNode(req) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -366,7 +460,7 @@ var p2pStreamCloseCmd = &cmds.Command{ if !closeAll { if len(req.Arguments()) == 0 { - res.SetError(errors.New("no HandlerID specified"), cmdkit.ErrNormal) + res.SetError(errors.New("no id specified"), cmdkit.ErrNormal) return } @@ -377,19 +471,26 @@ var p2pStreamCloseCmd = &cmds.Command{ } } - for _, stream := range n.P2P.Streams.Streams { - if !closeAll && handlerID != stream.HandlerID { + toClose := make([]*p2p.Stream, 0, 1) + n.P2P.Streams.Lock() + for id, stream := range n.P2P.Streams.Streams { + if !closeAll && handlerID != id { continue } - stream.Close() + toClose = append(toClose, stream) if !closeAll { break } } + n.P2P.Streams.Unlock() + + for _, s := range toClose { + n.P2P.Streams.Reset(s) + } }, } -func getNode(req cmds.Request) (*core.IpfsNode, error) { +func p2pGetNode(req cmds.Request) (*core.IpfsNode, error) { n, err := req.InvocContext().GetNode() if err != nil { return nil, err diff --git a/docs/experimental-features.md b/docs/experimental-features.md index ad52b2f4e28..596b8e7f3b0 100644 --- a/docs/experimental-features.md +++ b/docs/experimental-features.md @@ -250,36 +250,114 @@ configured, the daemon will fail to start. --- ## ipfs p2p -Allows to tunnel TCP connections through Libp2p streams + +Allows tunneling of TCP connections through Libp2p streams. If you've ever used +port forwarding with SSH (the `-L` option in openssh), this feature is quite +similar. ### State + Experimental ### In Version + master, 0.4.10 ### How to enable -P2P command needs to be enabled in config -`ipfs config --json Experimental.Libp2pStreamMounting true` +The `p2p` command needs to be enabled in config: + +```sh +> ipfs config --json Experimental.Libp2pStreamMounting true +``` ### How to use -Basic usage: +**Netcat example:** + +First, pick a protocol name for your application. Think of the protocol name as +a port number, just significantly more user-friendly. In this example, we're +going to use `/x/kickass/1.0`. + +***Setup:*** + +1. A "server" node with peer ID `$SERVER_ID` +2. A "client" node. + +***On the "server" node:*** + +First, start your application and have it listen for TCP connections on +port `$APP_PORT`. + +Then, configure the p2p listener by running: + +```sh +> ipfs p2p listen /x/kickass/1.0 /ip4/127.0.0.1/tcp/$APP_PORT +``` + +This will configure IPFS to forward all incoming `/x/kickass/1.0` streams to +`127.0.0.1:$APP_PORT` (opening a new connection to `127.0.0.1:$APP_PORT` per +incoming stream. + +***On the "client" node:*** + +First, configure the client p2p dialer, so that it forwards all inbound +connections on `127.0.0.1:SOME_PORT` to the server node listening +on `/x/kickass/1.0`. + +```sh +> ipfs p2p forward /x/kickass/1.0 /ip4/127.0.0.1/tcp/$SOME_PORT /ipfs/$SERVER_ID +``` + +Next, have your application open a connection to `127.0.0.1:$SOME_PORT`. This +connection will be forwarded to the service running on `127.0.0.1:$APP_PORT` on +the remote machine. You can test it with netcat: + +***On "server" node:*** +```sh +> nc -v -l -p $APP_PORT +``` + +***On "client" node:*** +```sh +> nc -v 127.0.0.1 $SOME_PORT +``` + +You should now see that a connection has been established and be able to +exchange messages between netcat instances. + +(note that depending on your netcat version you may need to drop the `-v` flag) + +**SSH example** + +**Setup:** + +1. A "server" node with peer ID `$SERVER_ID` and running ssh server on the + default port. +2. A "client" node. + +_you can get `$SERVER_ID` by running `ipfs id -f "\n"`_ + +***First, on the "server" node:*** + +```sh +ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 +``` + +***Then, on "client" node:*** + +```sh +ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /ipfs/$SERVER_ID +``` + +You should now be able to connect to your ssh server through a libp2p connection +with `ssh [user]@127.0.0.1 -p 2222`. -- Open a listener on one node (node A) -`ipfs p2p listener open p2p-test /ip4/127.0.0.1/tcp/10101` -- Where `/ip4/127.0.0.1/tcp/10101` put address of application you want to pass - p2p connections to -- On the other node, connect to the listener on node A -`ipfs p2p stream dial $NODE_A_PEERID p2p-test /ip4/127.0.0.1/tcp/10102` -- Node B is now listening for a connection on TCP at 127.0.0.1:10102, connect - your application there to complete the connection ### Road to being a real feature - [ ] Needs more people to use and report on how well it works / fits use cases - [ ] More documentation -- [ ] Support other protocols +- [ ] Support other protocols (e.g, unix domain sockets, websockets, etc.) --- diff --git a/p2p/listener.go b/p2p/listener.go new file mode 100644 index 00000000000..e62ab236b3f --- /dev/null +++ b/p2p/listener.go @@ -0,0 +1,96 @@ +package p2p + +import ( + "errors" + "sync" + + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" + "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + p2phost "gx/ipfs/QmeMYW7Nj8jnnEfs9qhm7SxKkoDPUWXu3MsxX6BFwz34tf/go-libp2p-host" +) + +// Listener listens for connections and proxies them to a target +type Listener interface { + Protocol() protocol.ID + ListenAddress() ma.Multiaddr + TargetAddress() ma.Multiaddr + + key() string + + // close closes the listener. Does not affect child streams + close() +} + +// Listeners manages a group of Listener implementations, +// checking for conflicts and optionally dispatching connections +type Listeners struct { + sync.RWMutex + + Listeners map[string]Listener +} + +func newListenersLocal() *Listeners { + return &Listeners{ + Listeners: map[string]Listener{}, + } +} + +func newListenersP2P(host p2phost.Host) *Listeners { + reg := &Listeners{ + Listeners: map[string]Listener{}, + } + + host.SetStreamHandlerMatch("/x/", func(p string) bool { + reg.RLock() + defer reg.RUnlock() + + _, ok := reg.Listeners[p] + return ok + }, func(stream net.Stream) { + reg.RLock() + defer reg.RUnlock() + + l := reg.Listeners[string(stream.Protocol())] + if l != nil { + go l.(*remoteListener).handleStream(stream) + } + }) + + return reg +} + +// Register registers listenerInfo into this registry and starts it +func (r *Listeners) Register(l Listener) error { + r.Lock() + defer r.Unlock() + + if _, ok := r.Listeners[l.key()]; ok { + return errors.New("listener already registered") + } + + r.Listeners[l.key()] = l + return nil +} + +func (r *Listeners) Close(matchFunc func(listener Listener) bool) int { + todo := make([]Listener, 0) + r.Lock() + for _, l := range r.Listeners { + if !matchFunc(l) { + continue + } + + if _, ok := r.Listeners[l.key()]; ok { + delete(r.Listeners, l.key()) + todo = append(todo, l) + } + } + r.Unlock() + + for _, l := range todo { + l.close() + } + + return len(todo) +} diff --git a/p2p/local.go b/p2p/local.go new file mode 100644 index 00000000000..861ce5e8899 --- /dev/null +++ b/p2p/local.go @@ -0,0 +1,123 @@ +package p2p + +import ( + "context" + "time" + + "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" + tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" + "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" +) + +// localListener manet streams and proxies them to libp2p services +type localListener struct { + ctx context.Context + + p2p *P2P + + proto protocol.ID + laddr ma.Multiaddr + peer peer.ID + + listener manet.Listener +} + +// ForwardLocal creates new P2P stream to a remote listener +func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) { + listener := &localListener{ + ctx: ctx, + + p2p: p2p, + + proto: proto, + laddr: bindAddr, + peer: peer, + } + + maListener, err := manet.Listen(listener.laddr) + if err != nil { + return nil, err + } + + listener.listener = maListener + + if err := p2p.ListenersLocal.Register(listener); err != nil { + return nil, err + } + + go listener.acceptConns() + + return listener, nil +} + +func (l *localListener) dial(ctx context.Context) (net.Stream, error) { + cctx, cancel := context.WithTimeout(ctx, time.Second*30) //TODO: configurable? + defer cancel() + + return l.p2p.peerHost.NewStream(cctx, l.peer, l.proto) +} + +func (l *localListener) acceptConns() { + for { + local, err := l.listener.Accept() + if err != nil { + if tec.ErrIsTemporary(err) { + continue + } + return + } + + go l.setupStream(local) + } +} + +func (l *localListener) setupStream(local manet.Conn) { + remote, err := l.dial(l.ctx) + if err != nil { + local.Close() + log.Warningf("failed to dial to remote %s/%s", l.peer.Pretty(), l.proto) + return + } + + stream := &Stream{ + Protocol: l.proto, + + OriginAddr: local.RemoteMultiaddr(), + TargetAddr: l.TargetAddress(), + peer: l.peer, + + Local: local, + Remote: remote, + + Registry: l.p2p.Streams, + } + + l.p2p.Streams.Register(stream) +} + +func (l *localListener) close() { + l.listener.Close() +} + +func (l *localListener) Protocol() protocol.ID { + return l.proto +} + +func (l *localListener) ListenAddress() ma.Multiaddr { + return l.laddr +} + +func (l *localListener) TargetAddress() ma.Multiaddr { + addr, err := ma.NewMultiaddr(maPrefix + l.peer.Pretty()) + if err != nil { + panic(err) + } + return addr +} + +func (l *localListener) key() string { + return l.ListenAddress().String() +} diff --git a/p2p/p2p.go b/p2p/p2p.go index d2af892c911..293b30ee326 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -1,23 +1,19 @@ package p2p import ( - "context" - "errors" - "time" - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" - manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" - ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" - pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log" pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" p2phost "gx/ipfs/QmeMYW7Nj8jnnEfs9qhm7SxKkoDPUWXu3MsxX6BFwz34tf/go-libp2p-host" ) -// P2P structure holds information on currently running streams/listeners +var log = logging.Logger("p2p-mount") + +// P2P structure holds information on currently running streams/Listeners type P2P struct { - Listeners ListenerRegistry - Streams StreamRegistry + ListenersLocal *Listeners + ListenersP2P *Listeners + Streams *StreamRegistry identity peer.ID peerHost p2phost.Host @@ -30,200 +26,19 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) identity: identity, peerHost: peerHost, peerstore: peerstore, - } -} - -func (p2p *P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) { - ctx, cancel := context.WithTimeout(ctx2, time.Second*30) //TODO: configurable? - defer cancel() - err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p}) - if err != nil { - return nil, err - } - return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol)) -} - -// Dial creates new P2P stream to a remote listener -func (p2p *P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ListenerInfo, error) { - lnet, _, err := manet.DialArgs(bindAddr) - if err != nil { - return nil, err - } - - listenerInfo := ListenerInfo{ - Identity: p2p.identity, - Protocol: proto, - } - - remote, err := p2p.newStreamTo(ctx, peer, proto) - if err != nil { - return nil, err - } - - switch lnet { - case "tcp", "tcp4", "tcp6": - listener, err := manet.Listen(bindAddr) - if err != nil { - if err2 := remote.Reset(); err2 != nil { - return nil, err2 - } - return nil, err - } - - listenerInfo.Address = listener.Multiaddr() - listenerInfo.Closer = listener - listenerInfo.Running = true - - go p2p.doAccept(&listenerInfo, remote, listener) - - default: - return nil, errors.New("unsupported protocol: " + lnet) - } - - return &listenerInfo, nil -} - -func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) { - defer listener.Close() - - local, err := listener.Accept() - if err != nil { - return - } - - stream := StreamInfo{ - Protocol: listenerInfo.Protocol, - - LocalPeer: listenerInfo.Identity, - LocalAddr: listenerInfo.Address, - - RemotePeer: remote.Conn().RemotePeer(), - RemoteAddr: remote.Conn().RemoteMultiaddr(), - - Local: local, - Remote: remote, - - Registry: &p2p.Streams, - } - - p2p.Streams.Register(&stream) - stream.startStreaming() -} - -// Listener wraps stream handler into a listener -type Listener interface { - Accept() (net.Stream, error) - Close() error -} - -// P2PListener holds information on a listener -type P2PListener struct { - peerHost p2phost.Host - conCh chan net.Stream - proto pro.ID - ctx context.Context - cancel func() -} - -// Accept waits for a connection from the listener -func (il *P2PListener) Accept() (net.Stream, error) { - select { - case c := <-il.conCh: - return c, nil - case <-il.ctx.Done(): - return nil, il.ctx.Err() - } -} - -// Close closes the listener and removes stream handler -func (il *P2PListener) Close() error { - il.cancel() - il.peerHost.RemoveStreamHandler(il.proto) - return nil -} - -// Listen creates new P2PListener -func (p2p *P2P) registerStreamHandler(ctx2 context.Context, protocol string) (*P2PListener, error) { - ctx, cancel := context.WithCancel(ctx2) - - list := &P2PListener{ - peerHost: p2p.peerHost, - proto: pro.ID(protocol), - conCh: make(chan net.Stream), - ctx: ctx, - cancel: cancel, - } - - p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) { - select { - case list.conCh <- s: - case <-ctx.Done(): - s.Reset() - } - }) - - return list, nil -} - -// NewListener creates new p2p listener -func (p2p *P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (*ListenerInfo, error) { - listener, err := p2p.registerStreamHandler(ctx, proto) - if err != nil { - return nil, err - } - listenerInfo := ListenerInfo{ - Identity: p2p.identity, - Protocol: proto, - Address: addr, - Closer: listener, - Running: true, - Registry: &p2p.Listeners, - } - - go p2p.acceptStreams(&listenerInfo, listener) - - p2p.Listeners.Register(&listenerInfo) - - return &listenerInfo, nil -} - -func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) { - for listenerInfo.Running { - remote, err := listener.Accept() - if err != nil { - listener.Close() - break - } - - local, err := manet.Dial(listenerInfo.Address) - if err != nil { - remote.Reset() - continue - } - - stream := StreamInfo{ - Protocol: listenerInfo.Protocol, - - LocalPeer: listenerInfo.Identity, - LocalAddr: listenerInfo.Address, - - RemotePeer: remote.Conn().RemotePeer(), - RemoteAddr: remote.Conn().RemoteMultiaddr(), - - Local: local, - Remote: remote, - - Registry: &p2p.Streams, - } + ListenersLocal: newListenersLocal(), + ListenersP2P: newListenersP2P(peerHost), - p2p.Streams.Register(&stream) - stream.startStreaming() + Streams: &StreamRegistry{ + Streams: map[uint64]*Stream{}, + ConnManager: peerHost.ConnManager(), + conns: map[peer.ID]int{}, + }, } - p2p.Listeners.Deregister(listenerInfo.Protocol) } -// CheckProtoExists checks whether a protocol handler is registered to +// CheckProtoExists checks whether a proto handler is registered to // mux handler func (p2p *P2P) CheckProtoExists(proto string) bool { protos := p2p.peerHost.Mux().Protocols() diff --git a/p2p/registry.go b/p2p/registry.go deleted file mode 100644 index 7ea528c5c5a..00000000000 --- a/p2p/registry.go +++ /dev/null @@ -1,150 +0,0 @@ -package p2p - -import ( - "fmt" - "io" - - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" - manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" - ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" -) - -// ListenerInfo holds information on a p2p listener. -type ListenerInfo struct { - // Application protocol identifier. - Protocol string - - // Node identity - Identity peer.ID - - // Local protocol stream address. - Address ma.Multiaddr - - // Local protocol stream listener. - Closer io.Closer - - // Flag indicating whether we're still accepting incoming connections, or - // whether this application listener has been shutdown. - Running bool - - Registry *ListenerRegistry -} - -// Close closes the listener. Does not affect child streams -func (c *ListenerInfo) Close() error { - c.Closer.Close() - err := c.Registry.Deregister(c.Protocol) - return err -} - -// ListenerRegistry is a collection of local application protocol listeners. -type ListenerRegistry struct { - Listeners []*ListenerInfo -} - -// Register registers listenerInfo2 in this registry -func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) { - c.Listeners = append(c.Listeners, listenerInfo) -} - -// Deregister removes p2p listener from this registry -func (c *ListenerRegistry) Deregister(proto string) error { - foundAt := -1 - for i, a := range c.Listeners { - if a.Protocol == proto { - foundAt = i - break - } - } - - if foundAt != -1 { - c.Listeners = append(c.Listeners[:foundAt], c.Listeners[foundAt+1:]...) - return nil - } - - return fmt.Errorf("failed to deregister proto %s", proto) -} - -// StreamInfo holds information on active incoming and outgoing p2p streams. -type StreamInfo struct { - HandlerID uint64 - - Protocol string - - LocalPeer peer.ID - LocalAddr ma.Multiaddr - - RemotePeer peer.ID - RemoteAddr ma.Multiaddr - - Local manet.Conn - Remote net.Stream - - Registry *StreamRegistry -} - -// Close closes stream endpoints and deregisters it -func (s *StreamInfo) Close() error { - s.Local.Close() - s.Remote.Close() - s.Registry.Deregister(s.HandlerID) - return nil -} - -// Reset closes stream endpoints and deregisters it -func (s *StreamInfo) Reset() error { - s.Local.Close() - s.Remote.Reset() - s.Registry.Deregister(s.HandlerID) - return nil -} - -func (s *StreamInfo) startStreaming() { - go func() { - _, err := io.Copy(s.Local, s.Remote) - if err != nil { - s.Reset() - } else { - s.Close() - } - }() - - go func() { - _, err := io.Copy(s.Remote, s.Local) - if err != nil { - s.Reset() - } else { - s.Close() - } - }() -} - -// StreamRegistry is a collection of active incoming and outgoing protocol app streams. -type StreamRegistry struct { - Streams []*StreamInfo - - nextID uint64 -} - -// Register registers a stream to the registry -func (c *StreamRegistry) Register(streamInfo *StreamInfo) { - streamInfo.HandlerID = c.nextID - c.Streams = append(c.Streams, streamInfo) - c.nextID++ -} - -// Deregister deregisters stream from the registry -func (c *StreamRegistry) Deregister(handlerID uint64) { - foundAt := -1 - for i, s := range c.Streams { - if s.HandlerID == handlerID { - foundAt = i - break - } - } - - if foundAt != -1 { - c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...) - } -} diff --git a/p2p/remote.go b/p2p/remote.go new file mode 100644 index 00000000000..924cab4c5ec --- /dev/null +++ b/p2p/remote.go @@ -0,0 +1,92 @@ +package p2p + +import ( + "context" + + manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" + protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" +) + +var maPrefix = "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/" + +// remoteListener accepts libp2p streams and proxies them to a manet host +type remoteListener struct { + p2p *P2P + + // Application proto identifier. + proto protocol.ID + + // Address to proxy the incoming connections to + addr ma.Multiaddr +} + +// ForwardRemote creates new p2p listener +func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) { + listener := &remoteListener{ + p2p: p2p, + + proto: proto, + addr: addr, + } + + if err := p2p.ListenersP2P.Register(listener); err != nil { + return nil, err + } + + return listener, nil +} + +func (l *remoteListener) handleStream(remote net.Stream) { + local, err := manet.Dial(l.addr) + if err != nil { + remote.Reset() + return + } + + peer := remote.Conn().RemotePeer() + + peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty()) + if err != nil { + remote.Reset() + return + } + + stream := &Stream{ + Protocol: l.proto, + + OriginAddr: peerMa, + TargetAddr: l.addr, + peer: peer, + + Local: local, + Remote: remote, + + Registry: l.p2p.Streams, + } + + l.p2p.Streams.Register(stream) +} + +func (l *remoteListener) Protocol() protocol.ID { + return l.proto +} + +func (l *remoteListener) ListenAddress() ma.Multiaddr { + addr, err := ma.NewMultiaddr(maPrefix + l.p2p.identity.Pretty()) + if err != nil { + panic(err) + } + return addr +} + +func (l *remoteListener) TargetAddress() ma.Multiaddr { + return l.addr +} + +func (l *remoteListener) close() {} + +func (l *remoteListener) key() string { + return string(l.proto) +} diff --git a/p2p/stream.go b/p2p/stream.go new file mode 100644 index 00000000000..0748982fe92 --- /dev/null +++ b/p2p/stream.go @@ -0,0 +1,124 @@ +package p2p + +import ( + "io" + "sync" + + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" + ifconnmgr "gx/ipfs/QmWGGN1nysi1qgqto31bENwESkmZBY4YGK4sZC3qhnqhSv/go-libp2p-interface-connmgr" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" + protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" +) + +const cmgrTag = "stream-fwd" + +// Stream holds information on active incoming and outgoing p2p streams. +type Stream struct { + id uint64 + + Protocol protocol.ID + + OriginAddr ma.Multiaddr + TargetAddr ma.Multiaddr + peer peer.ID + + Local manet.Conn + Remote net.Stream + + Registry *StreamRegistry +} + +// close stream endpoints and deregister it +func (s *Stream) close() error { + s.Registry.Close(s) + return nil +} + +// reset closes stream endpoints and deregisters it +func (s *Stream) reset() error { + s.Registry.Reset(s) + return nil +} + +func (s *Stream) startStreaming() { + go func() { + _, err := io.Copy(s.Local, s.Remote) + if err != nil { + s.reset() + } else { + s.close() + } + }() + + go func() { + _, err := io.Copy(s.Remote, s.Local) + if err != nil { + s.reset() + } else { + s.close() + } + }() +} + +// StreamRegistry is a collection of active incoming and outgoing proto app streams. +type StreamRegistry struct { + sync.Mutex + + Streams map[uint64]*Stream + conns map[peer.ID]int + nextID uint64 + + ifconnmgr.ConnManager +} + +// Register registers a stream to the registry +func (r *StreamRegistry) Register(streamInfo *Stream) { + r.Lock() + defer r.Unlock() + + r.ConnManager.TagPeer(streamInfo.peer, cmgrTag, 20) + r.conns[streamInfo.peer]++ + + streamInfo.id = r.nextID + r.Streams[r.nextID] = streamInfo + r.nextID++ + + streamInfo.startStreaming() +} + +// Deregister deregisters stream from the registry +func (r *StreamRegistry) Deregister(streamID uint64) { + r.Lock() + defer r.Unlock() + + s, ok := r.Streams[streamID] + if !ok { + return + } + p := s.peer + r.conns[p]-- + if r.conns[p] < 1 { + delete(r.conns, p) + r.ConnManager.UntagPeer(p, cmgrTag) + } + + delete(r.Streams, streamID) +} + +// Close stream endpoints and deregister it +func (r *StreamRegistry) Close(s *Stream) error { + s.Local.Close() + s.Remote.Close() + s.Registry.Deregister(s.id) + return nil +} + +// Reset closes stream endpoints and deregisters it +func (r *StreamRegistry) Reset(s *Stream) error { + s.Local.Close() + s.Remote.Reset() + s.Registry.Deregister(s.id) + return nil +} diff --git a/package.json b/package.json index 40404683361..9922db2e672 100644 --- a/package.json +++ b/package.json @@ -486,6 +486,12 @@ "name": "go-ipns", "version": "0.1.8" }, + { + "author": "whyrusleeping", + "hash": "QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb", + "name": "go-temp-err-catcher", + "version": "0.0.0" + }, { "author": "why", "hash": "QmUyaGN3WPr3CTLai7DBvMikagK45V4fUi8p8cNRaJQoU1", diff --git a/test/sharness/t0180-p2p.sh b/test/sharness/t0180-p2p.sh index a11ab35f1d1..544cd1f6f6a 100755 --- a/test/sharness/t0180-p2p.sh +++ b/test/sharness/t0180-p2p.sh @@ -20,11 +20,14 @@ test_expect_success 'peer ids' ' PEERID_0=$(iptb get id 0) && PEERID_1=$(iptb get id 1) ' - -test_expect_success "test ports are closed" ' - (! (netstat -ln | grep "LISTEN" | grep ":10101 ")) && - (! (netstat -ln | grep "LISTEN" | grep ":10102 ")) -' +check_test_ports() { + test_expect_success "test ports are closed" ' + (! (netstat -lnp | grep "LISTEN" | grep ":10101 ")) && + (! (netstat -lnp | grep "LISTEN" | grep ":10102 "))&& + (! (netstat -lnp | grep "LISTEN" | grep ":10103 ")) + ' +} +check_test_ports test_expect_success 'fail without config option being enabled' ' test_must_fail ipfsi 0 p2p stream ls @@ -36,51 +39,109 @@ test_expect_success "enable filestore config setting" ' ' test_expect_success 'start p2p listener' ' - ipfsi 0 p2p listener open p2p-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log + ipfsi 0 p2p listen /x/p2p-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log ' -test_expect_success 'Test server to client communications' ' - ma-pipe-unidir --listen --pidFile=listener.pid send /ip4/127.0.0.1/tcp/10101 < test0.bin & +test_expect_success 'cannot re-register p2p listener' ' + test_must_fail ipfsi 0 p2p listen /x/p2p-test /ip4/127.0.0.1/tcp/10103 2>&1 > listener-stdouterr.log +' - test_wait_for_file 30 100ms listener.pid && - kill -0 $(cat listener.pid) && +# Server to client communications - ipfsi 1 p2p stream dial $PEERID_0 p2p-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && - ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out && - test ! -f listener.pid +spawn_sending_server() { + test_expect_success 'S->C Spawn sending server' ' + ma-pipe-unidir --listen --pidFile=listener.pid send /ip4/127.0.0.1/tcp/10101 < test0.bin & + + test_wait_for_file 30 100ms listener.pid && + kill -0 $(cat listener.pid) + ' +} + +test_server_to_client() { + test_expect_success 'S->C Connect and receive data' ' + ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out + ' + + test_expect_success 'S->C Ensure server finished' ' + test ! -f listener.pid + ' + + test_expect_success 'S->C Output looks good' ' + test_cmp client.out test0.bin + ' +} + +spawn_sending_server + +test_expect_success 'S->C Setup client side' ' + ipfsi 1 p2p forward /x/p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/${PEERID_0} 2>&1 > dialer-stdouterr.log +' + +test_server_to_client + +test_expect_success 'S->C Connect with dead server' ' + ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out +' + +test_expect_success 'S->C Output is empty' ' + test_must_be_empty client.out ' -test_expect_success 'Test client to server communications' ' +spawn_sending_server + +test_server_to_client + +test_expect_success 'S->C Close local listener' ' + ipfsi 1 p2p close -p /x/p2p-test +' + +check_test_ports + +# Client to server communications + +test_expect_success 'C->S Spawn receiving server' ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 > server.out & test_wait_for_file 30 100ms listener.pid && - kill -0 $(cat listener.pid) && + kill -0 $(cat listener.pid) +' + +test_expect_success 'C->S Setup client side' ' + ipfsi 1 p2p forward /x/p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/${PEERID_0} 2>&1 > dialer-stdouterr.log +' - ipfsi 1 p2p stream dial $PEERID_0 p2p-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && - ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin && +test_expect_success 'C->S Connect and receive data' ' + ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin +' + +test_expect_success 'C->S Ensure server finished' ' go-sleep 250ms && test ! -f listener.pid ' -test_expect_success 'server to client output looks good' ' - test_cmp client.out test0.bin +test_expect_success 'C->S Output looks good' ' + test_cmp server.out test1.bin ' -test_expect_success 'client to server output looks good' ' - test_cmp server.out test1.bin +test_expect_success 'C->S Close local listener' ' + ipfsi 1 p2p close -p /x/p2p-test ' -test_expect_success "'ipfs listener p2p ls' succeeds" ' - echo "/ip4/127.0.0.1/tcp/10101 /p2p/p2p-test" > expected && - ipfsi 0 p2p listener ls > actual +check_test_ports + +# Listing streams + +test_expect_success "'ipfs p2p ls' succeeds" ' + echo "/x/p2p-test /ipfs/$PEERID_0 /ip4/127.0.0.1/tcp/10101" > expected && + ipfsi 0 p2p ls > actual ' -test_expect_success "'ipfs p2p listener ls' output looks good" ' +test_expect_success "'ipfs p2p ls' output looks good" ' test_cmp expected actual ' test_expect_success "Cannot re-register app handler" ' - (! ipfsi 0 p2p listener open p2p-test /ip4/127.0.0.1/tcp/10101) + test_must_fail ipfsi 0 p2p listen /x/p2p-test /ip4/127.0.0.1/tcp/10101 ' test_expect_success "'ipfs p2p stream ls' output is empty" ' @@ -88,10 +149,12 @@ test_expect_success "'ipfs p2p stream ls' output is empty" ' test_must_be_empty actual ' +check_test_ports + test_expect_success "Setup: Idle stream" ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & - ipfsi 1 p2p stream dial $PEERID_0 p2p-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 1 p2p forward /x/p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/$PEERID_0 && ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & test_wait_for_file 30 100ms listener.pid && @@ -100,7 +163,7 @@ test_expect_success "Setup: Idle stream" ' ' test_expect_success "'ipfs p2p stream ls' succeeds" ' - echo "2 /p2p/p2p-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected + echo "3 /x/p2p-test /ipfs/$PEERID_1 /ip4/127.0.0.1/tcp/10101" > expected ipfsi 0 p2p stream ls > actual ' @@ -109,23 +172,31 @@ test_expect_success "'ipfs p2p stream ls' output looks good" ' ' test_expect_success "'ipfs p2p stream close' closes stream" ' - ipfsi 0 p2p stream close 2 && + ipfsi 0 p2p stream close 3 && ipfsi 0 p2p stream ls > actual && [ ! -f listener.pid ] && [ ! -f client.pid ] && test_must_be_empty actual ' -test_expect_success "'ipfs p2p listener close' closes app handler" ' - ipfsi 0 p2p listener close p2p-test && - ipfsi 0 p2p listener ls > actual && +test_expect_success "'ipfs p2p close' closes remote handler" ' + ipfsi 0 p2p close -p /x/p2p-test && + ipfsi 0 p2p ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs p2p close' closes local handler" ' + ipfsi 1 p2p close -p /x/p2p-test && + ipfsi 1 p2p ls > actual && test_must_be_empty actual ' +check_test_ports + test_expect_success "Setup: Idle stream(2)" ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & - ipfsi 0 p2p listener open p2p-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && - ipfsi 1 p2p stream dial $PEERID_0 p2p-test2 /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 0 p2p listen /x/p2p-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && + ipfsi 1 p2p forward /x/p2p-test2 /ip4/127.0.0.1/tcp/10102 /ipfs/$PEERID_0 2>&1 > dialer-stdouterr.log && ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & test_wait_for_file 30 100ms listener.pid && @@ -134,14 +205,20 @@ test_expect_success "Setup: Idle stream(2)" ' ' test_expect_success "'ipfs p2p stream ls' succeeds(2)" ' - echo "3 /p2p/p2p-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected + echo "4 /x/p2p-test2 /ipfs/$PEERID_1 /ip4/127.0.0.1/tcp/10101" > expected ipfsi 0 p2p stream ls > actual test_cmp expected actual ' -test_expect_success "'ipfs p2p listener close -a' closes app handlers" ' - ipfsi 0 p2p listener close -a && - ipfsi 0 p2p listener ls > actual && +test_expect_success "'ipfs p2p close -a' closes remote app handlers" ' + ipfsi 0 p2p close -a && + ipfsi 0 p2p ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs p2p close -a' closes local app handlers" ' + ipfsi 1 p2p close -a && + ipfsi 1 p2p ls > actual && test_must_be_empty actual ' @@ -152,16 +229,74 @@ test_expect_success "'ipfs p2p stream close -a' closes streams" ' test_must_be_empty actual ' -test_expect_success "'ipfs p2p listener close' closes app numeric handlers" ' - ipfsi 0 p2p listener open 1234 /ip4/127.0.0.1/tcp/10101 && - ipfsi 0 p2p listener close 1234 && - ipfsi 0 p2p listener ls > actual && +check_test_ports + +test_expect_success "'ipfs p2p close' closes app numeric handlers" ' + ipfsi 0 p2p listen /x/1234 /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 p2p close -p /x/1234 && + ipfsi 0 p2p ls > actual && test_must_be_empty actual ' +test_expect_success "'ipfs p2p close' closes by target addr" ' + ipfsi 0 p2p listen /x/p2p-test /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 p2p close -t /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 p2p ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs p2p close' closes right listeners" ' + ipfsi 0 p2p listen /x/p2p-test /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 p2p forward /x/p2p-test /ip4/127.0.0.1/tcp/10101 /ipfs/$PEERID_1 && + echo "/x/p2p-test /ipfs/$PEERID_0 /ip4/127.0.0.1/tcp/10101" > expected && + + ipfsi 0 p2p close -l /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 p2p ls > actual && + test_cmp expected actual +' + +check_test_ports + +test_expect_success "'ipfs p2p close' closes by listen addr" ' + ipfsi 0 p2p close -l /ipfs/$PEERID_0 && + ipfsi 0 p2p ls > actual && + test_must_be_empty actual +' + +test_expect_success "non /x/ scoped protocols are not allowed" ' + test_must_fail ipfsi 0 p2p listen /its/not/a/x/path /ip4/127.0.0.1/tcp/10101 2> actual && + echo "Error: protocol name must be within '"'"'/x/'"'"' namespace" > expected + test_cmp expected actual +' + +check_test_ports + +test_expect_success 'start p2p listener on custom proto' ' + ipfsi 0 p2p listen --allow-custom-protocol /p2p-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && + test_must_be_empty listener-stdouterr.log +' + +spawn_sending_server + +test_expect_success 'S->C Setup client side (custom proto)' ' + ipfsi 1 p2p forward --allow-custom-protocol /p2p-test /ip4/127.0.0.1/tcp/10102 /ipfs/${PEERID_0} 2>&1 > dialer-stdouterr.log +' + +test_server_to_client + +test_expect_success 'C->S Close local listener' ' + ipfsi 1 p2p close -p /p2p-test + ipfsi 1 p2p ls > actual && + test_must_be_empty actual +' + +check_test_ports + test_expect_success 'stop iptb' ' iptb stop ' +check_test_ports + test_done