diff --git a/core/commands/ptp.go b/core/commands/ptp.go new file mode 100644 index 00000000000..daeecae71de --- /dev/null +++ b/core/commands/ptp.go @@ -0,0 +1,395 @@ +package commands + +import ( + "bytes" + "errors" + "fmt" + "io" + "strconv" + "text/tabwriter" + + cmds "github.com/ipfs/go-ipfs/commands" + core "github.com/ipfs/go-ipfs/core" + + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" +) + +// PTPListenerInfoOutput is output type of ls command +type PTPListenerInfoOutput struct { + Protocol string + Address string +} + +// PTPStreamInfoOutput is output type of streams command +type PTPStreamInfoOutput struct { + HandlerID string + Protocol string + LocalPeer string + LocalAddress string + RemotePeer string + RemoteAddress string +} + +// PTPLsOutput is output type of ls command +type PTPLsOutput struct { + Listeners []PTPListenerInfoOutput +} + +// PTPStreamsOutput is output type of streams command +type PTPStreamsOutput struct { + Streams []PTPStreamInfoOutput +} + +// PTPCmd is the 'ipfs ptp' command +var PTPCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Libp2p stream mounting.", + ShortDescription: ` +Create and use tunnels to remote peers over libp2p + +Note: this command is experimental and subject to change as usecases and APIs are refined`, + }, + + Subcommands: map[string]*cmds.Command{ + "listener": ptpListenerCmd, + "stream": ptpStreamCmd, + }, +} + +// ptpListenerCmd is the 'ipfs ptp listener' command +var ptpListenerCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "P2P listener management.", + ShortDescription: "Create and manage listener p2p endpoints", + }, + + Subcommands: map[string]*cmds.Command{ + "ls": ptpListenerLsCmd, + "open": ptpListenerListenCmd, + "close": ptpListenerCloseCmd, + }, +} + +// ptpStreamCmd is the 'ipfs ptp stream' command +var ptpStreamCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "P2P stream management.", + ShortDescription: "Create and manage p2p streams", + }, + + Subcommands: map[string]*cmds.Command{ + "ls": ptpStreamLsCmd, + "dial": ptpStreamDialCmd, + "close": ptpStreamCloseCmd, + }, +} + +var ptpListenerLsCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List active p2p listeners.", + }, + Options: []cmds.Option{ + cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + output := &PTPLsOutput{} + + for _, listener := range n.PTP.Listeners.Listeners { + output.Listeners = append(output.Listeners, PTPListenerInfoOutput{ + Protocol: listener.Protocol, + Address: listener.Address.String(), + }) + } + + res.SetOutput(output) + }, + Type: PTPLsOutput{}, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + headers, _, _ := res.Request().Option("headers").Bool() + list, _ := res.Output().(*PTPLsOutput) + buf := new(bytes.Buffer) + w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) + for _, listener := range list.Listeners { + if headers { + fmt.Fprintln(w, "Address\tProtocol") + } + + fmt.Fprintf(w, "%s\t%s\n", listener.Address, listener.Protocol) + } + w.Flush() + + return buf, nil + }, + }, +} + +var ptpStreamLsCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List active p2p streams.", + }, + Options: []cmds.Option{ + cmds.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote).").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + output := &PTPStreamsOutput{} + + for _, s := range n.PTP.Streams.Streams { + output.Streams = append(output.Streams, PTPStreamInfoOutput{ + HandlerID: strconv.FormatUint(s.HandlerID, 10), + + Protocol: s.Protocol, + + LocalPeer: s.LocalPeer.Pretty(), + LocalAddress: s.LocalAddr.String(), + + RemotePeer: s.RemotePeer.Pretty(), + RemoteAddress: s.RemoteAddr.String(), + }) + } + + res.SetOutput(output) + }, + Type: PTPStreamsOutput{}, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + headers, _, _ := res.Request().Option("headers").Bool() + list, _ := res.Output().(*PTPStreamsOutput) + buf := new(bytes.Buffer) + w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) + for _, stream := range list.Streams { + if headers { + fmt.Fprintln(w, "HandlerID\tProtocol\tLocal\tRemote") + } + + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer) + } + w.Flush() + + return buf, nil + }, + }, +} + +var ptpListenerListenCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Forward p2p connections to a network multiaddr.", + ShortDescription: ` +Register a p2p connection handler and forward the connections to a specified address. + +Note that the connections originate from the ipfs daemon process. + `, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("Protocol", true, false, "Protocol identifier."), + cmds.StringArg("Address", true, false, "Request handling application address."), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + proto := "/ptp/" + req.Arguments()[0] + if n.PTP.CheckProtoExists(proto) { + res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal) + return + } + + addr, err := ma.NewMultiaddr(req.Arguments()[1]) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + _, err = n.PTP.NewListener(n.Context(), proto, addr) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + // Successful response. + res.SetOutput(&PTPListenerInfoOutput{ + Protocol: proto, + Address: addr.String(), + }) + }, +} + +var ptpStreamDialCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Dial to a p2p listener.", + + ShortDescription: ` +Establish a new connection to a peer service. + +When a connection is made to a peer service the ipfs daemon will setup one time +TCP listener and return it's bind port, this way a dialing application can +transparently connect to a p2p service. + `, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("Peer", true, false, "Remote peer to connect to"), + cmds.StringArg("Protocol", true, false, "Protocol identifier."), + cmds.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + addr, peer, err := ParsePeerParam(req.Arguments()[0]) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + proto := "/ptp/" + req.Arguments()[1] + + bindAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") + if len(req.Arguments()) == 3 { + bindAddr, err = ma.NewMultiaddr(req.Arguments()[2]) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + } + + listenerInfo, err := n.PTP.Dial(n.Context(), addr, peer, proto, bindAddr) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + output := PTPListenerInfoOutput{ + Protocol: listenerInfo.Protocol, + Address: listenerInfo.Address.String(), + } + + res.SetOutput(&output) + }, +} + +var ptpListenerCloseCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Close active p2p listener.", + }, + Arguments: []cmds.Argument{ + cmds.StringArg("Protocol", false, false, "P2P listener protocol"), + }, + Options: []cmds.Option{ + cmds.BoolOption("all", "a", "Close all listeners.").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + closeAll, _, _ := req.Option("all").Bool() + var proto string + + if !closeAll { + if len(req.Arguments()) == 0 { + res.SetError(errors.New("no protocol name specified"), cmds.ErrNormal) + return + } + + proto = "/ptp/" + req.Arguments()[0] + } + + for _, listener := range n.PTP.Listeners.Listeners { + if !closeAll && listener.Protocol != proto { + continue + } + listener.Close() + if !closeAll { + break + } + } + }, +} + +var ptpStreamCloseCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Close active p2p stream.", + }, + Arguments: []cmds.Argument{ + cmds.StringArg("HandlerID", false, false, "Stream HandlerID"), + }, + Options: []cmds.Option{ + cmds.BoolOption("all", "a", "Close all streams.").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + closeAll, _, _ := req.Option("all").Bool() + var handlerID uint64 + + if !closeAll { + if len(req.Arguments()) == 0 { + res.SetError(errors.New("no HandlerID specified"), cmds.ErrNormal) + return + } + + handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + } + + for _, stream := range n.PTP.Streams.Streams { + if !closeAll && handlerID != stream.HandlerID { + continue + } + stream.Close() + if !closeAll { + break + } + } + }, +} + +func getNode(req cmds.Request) (*core.IpfsNode, error) { + n, err := req.InvocContext().GetNode() + if err != nil { + return nil, err + } + + config, err := n.Repo.Config() + if err != nil { + return nil, err + } + + if !config.Experimental.Libp2pStreamMounting { + return nil, errors.New("libp2p stream mounting not enabled") + } + + if !n.OnlineMode() { + return nil, errNotOnline + } + + return n, nil +} diff --git a/core/commands/root.go b/core/commands/root.go index e4a2b7a1b56..fa9eda05184 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -47,6 +47,7 @@ ADVANCED COMMANDS pin Pin objects to local storage repo Manipulate the IPFS repository stats Various operational stats + ptp Libp2p stream mounting filestore Manage the filestore (experimental) NETWORK COMMANDS @@ -113,6 +114,7 @@ var rootSubcommands = map[string]*cmds.Command{ "object": ocmd.ObjectCmd, "pin": PinCmd, "ping": PingCmd, + "ptp": PTPCmd, "pubsub": PubsubCmd, "refs": RefsCmd, "repo": RepoCmd, diff --git a/core/core.go b/core/core.go index 3b0b9ded176..5da24e14f5d 100644 --- a/core/core.go +++ b/core/core.go @@ -35,6 +35,7 @@ import ( ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" path "github.com/ipfs/go-ipfs/path" pin "github.com/ipfs/go-ipfs/pin" + ptp "github.com/ipfs/go-ipfs/ptp" repo "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" nilrouting "github.com/ipfs/go-ipfs/routing/none" @@ -131,6 +132,7 @@ type IpfsNode struct { IpnsRepub *ipnsrp.Republisher Floodsub *floodsub.PubSub + PTP *ptp.PTP proc goprocess.Process ctx context.Context @@ -246,6 +248,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) } + n.PTP = ptp.NewPTP(n.Identity, n.PeerHost, n.Peerstore) + // setup local discovery if do != nil { service, err := do(ctx, n.PeerHost) diff --git a/core/corenet/net.go b/core/corenet/net.go deleted file mode 100644 index 800e97618c3..00000000000 --- a/core/corenet/net.go +++ /dev/null @@ -1,65 +0,0 @@ -package corenet - -import ( - "time" - - context "context" - core "github.com/ipfs/go-ipfs/core" - net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net" - pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore" - pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" - peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" -) - -type ipfsListener struct { - conCh chan net.Stream - proto pro.ID - ctx context.Context - cancel func() -} - -func (il *ipfsListener) Accept() (net.Stream, error) { - select { - case c := <-il.conCh: - return c, nil - case <-il.ctx.Done(): - return nil, il.ctx.Err() - } -} - -func (il *ipfsListener) Close() error { - il.cancel() - // TODO: unregister handler from peerhost - return nil -} - -func Listen(nd *core.IpfsNode, protocol string) (*ipfsListener, error) { - ctx, cancel := context.WithCancel(nd.Context()) - - list := &ipfsListener{ - proto: pro.ID(protocol), - conCh: make(chan net.Stream), - ctx: ctx, - cancel: cancel, - } - - nd.PeerHost.SetStreamHandler(list.proto, func(s net.Stream) { - select { - case list.conCh <- s: - case <-ctx.Done(): - s.Close() - } - }) - - return list, nil -} - -func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) { - ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30) - defer cancel() - err := nd.PeerHost.Connect(ctx, pstore.PeerInfo{ID: p}) - if err != nil { - return nil, err - } - return nd.PeerHost.NewStream(nd.Context(), p, pro.ID(protocol)) -} diff --git a/ptp/ptp.go b/ptp/ptp.go new file mode 100644 index 00000000000..e7842838bce --- /dev/null +++ b/ptp/ptp.go @@ -0,0 +1,238 @@ +package ptp + +import ( + "context" + "errors" + "time" + + net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net" + p2phost "gx/ipfs/QmUywuGNZoUKV8B9iyvup9bPkLiMrhTsyVMkeSXW5VxAfC/go-libp2p-host" + pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore" + pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" + manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net" +) + +// PTP structure holds information on currently running streams/listeners +type PTP struct { + Listeners ListenerRegistry + Streams StreamRegistry + + identity peer.ID + peerHost p2phost.Host + peerstore pstore.Peerstore +} + +// NewPTP creates new PTP struct +func NewPTP(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *PTP { + return &PTP{ + identity: identity, + peerHost: peerHost, + peerstore: peerstore, + } +} + +func (ptp *PTP) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) { + ctx, cancel := context.WithTimeout(ctx2, time.Second*30) //TODO: configurable? + defer cancel() + err := ptp.peerHost.Connect(ctx, pstore.PeerInfo{ID: p}) + if err != nil { + return nil, err + } + return ptp.peerHost.NewStream(ctx2, p, pro.ID(protocol)) +} + +// Dial creates new P2P stream to a remote listener +func (ptp *PTP) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ListenerInfo, error) { + lnet, _, err := manet.DialArgs(bindAddr) + if err != nil { + return nil, err + } + + listenerInfo := ListenerInfo{ + Identity: ptp.identity, + Protocol: proto, + } + + remote, err := ptp.newStreamTo(ctx, peer, proto) + if err != nil { + return nil, err + } + + switch lnet { + case "tcp", "tcp4", "tcp6": + listener, err := manet.Listen(bindAddr) + if err != nil { + if err2 := remote.Close(); err2 != nil { + return nil, err2 + } + return nil, err + } + + listenerInfo.Address = listener.Multiaddr() + listenerInfo.Closer = listener + listenerInfo.Running = true + + go ptp.doAccept(&listenerInfo, remote, listener) + + default: + return nil, errors.New("unsupported protocol: " + lnet) + } + + return &listenerInfo, nil +} + +func (ptp *PTP) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) { + defer listener.Close() + + local, err := listener.Accept() + if err != nil { + return + } + + stream := StreamInfo{ + Protocol: listenerInfo.Protocol, + + LocalPeer: listenerInfo.Identity, + LocalAddr: listenerInfo.Address, + + RemotePeer: remote.Conn().RemotePeer(), + RemoteAddr: remote.Conn().RemoteMultiaddr(), + + Local: local, + Remote: remote, + + Registry: &ptp.Streams, + } + + ptp.Streams.Register(&stream) + stream.startStreaming() +} + +// Listener wraps stream handler into a listener +type Listener interface { + Accept() (net.Stream, error) + Close() error +} + +// P2PListener holds information on a listener +type P2PListener struct { + peerHost p2phost.Host + conCh chan net.Stream + proto pro.ID + ctx context.Context + cancel func() +} + +// Accept waits for a connection from the listener +func (il *P2PListener) Accept() (net.Stream, error) { + select { + case c := <-il.conCh: + return c, nil + case <-il.ctx.Done(): + return nil, il.ctx.Err() + } +} + +// Close closes the listener and removes stream handler +func (il *P2PListener) Close() error { + il.cancel() + il.peerHost.RemoveStreamHandler(il.proto) + return nil +} + +// Listen creates new P2PListener +func (ptp *PTP) registerStreamHandler(ctx2 context.Context, protocol string) (*P2PListener, error) { + ctx, cancel := context.WithCancel(ctx2) + + list := &P2PListener{ + peerHost: ptp.peerHost, + proto: pro.ID(protocol), + conCh: make(chan net.Stream), + ctx: ctx, + cancel: cancel, + } + + ptp.peerHost.SetStreamHandler(list.proto, func(s net.Stream) { + select { + case list.conCh <- s: + case <-ctx.Done(): + s.Close() + } + }) + + return list, nil +} + +// NewListener creates new ptp listener +func (ptp *PTP) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (*ListenerInfo, error) { + listener, err := ptp.registerStreamHandler(ctx, proto) + if err != nil { + return nil, err + } + + listenerInfo := ListenerInfo{ + Identity: ptp.identity, + Protocol: proto, + Address: addr, + Closer: listener, + Running: true, + Registry: &ptp.Listeners, + } + + go ptp.acceptStreams(&listenerInfo, listener) + + ptp.Listeners.Register(&listenerInfo) + + return &listenerInfo, nil +} + +func (ptp *PTP) acceptStreams(listenerInfo *ListenerInfo, listener Listener) { + for listenerInfo.Running { + remote, err := listener.Accept() + if err != nil { + listener.Close() + break + } + + local, err := manet.Dial(listenerInfo.Address) + if err != nil { + remote.Close() + continue + } + + stream := StreamInfo{ + Protocol: listenerInfo.Protocol, + + LocalPeer: listenerInfo.Identity, + LocalAddr: listenerInfo.Address, + + RemotePeer: remote.Conn().RemotePeer(), + RemoteAddr: remote.Conn().RemoteMultiaddr(), + + Local: local, + Remote: remote, + + Registry: &ptp.Streams, + } + + ptp.Streams.Register(&stream) + stream.startStreaming() + } + ptp.Listeners.Deregister(listenerInfo.Protocol) +} + +// CheckProtoExists checks whether a protocol handler is registered to +// mux handler +func (ptp *PTP) CheckProtoExists(proto string) bool { + protos := ptp.peerHost.Mux().Protocols() + + for _, p := range protos { + if p != proto { + continue + } + return true + } + return false +} diff --git a/ptp/registry.go b/ptp/registry.go new file mode 100644 index 00000000000..586719eba24 --- /dev/null +++ b/ptp/registry.go @@ -0,0 +1,132 @@ +package ptp + +import ( + "fmt" + "io" + + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" +) + +// ListenerInfo holds information on a p2p listener. +type ListenerInfo struct { + // Application protocol identifier. + Protocol string + + // Node identity + Identity peer.ID + + // Local protocol stream address. + Address ma.Multiaddr + + // Local protocol stream listener. + Closer io.Closer + + // Flag indicating whether we're still accepting incoming connections, or + // whether this application listener has been shutdown. + Running bool + + Registry *ListenerRegistry +} + +// Close closes the listener. Does not affect child streams +func (c *ListenerInfo) Close() error { + c.Closer.Close() + err := c.Registry.Deregister(c.Protocol) + return err +} + +// ListenerRegistry is a collection of local application protocol listeners. +type ListenerRegistry struct { + Listeners []*ListenerInfo +} + +// Register registers listenerInfo2 in this registry +func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) { + c.Listeners = append(c.Listeners, listenerInfo) +} + +// Deregister removes p2p listener from this registry +func (c *ListenerRegistry) Deregister(proto string) error { + foundAt := -1 + for i, a := range c.Listeners { + if a.Protocol == proto { + foundAt = i + break + } + } + + if foundAt != -1 { + c.Listeners = append(c.Listeners[:foundAt], c.Listeners[foundAt+1:]...) + return nil + } + + return fmt.Errorf("failed to deregister proto %s", proto) +} + +// StreamInfo holds information on active incoming and outgoing p2p streams. +type StreamInfo struct { + HandlerID uint64 + + Protocol string + + LocalPeer peer.ID + LocalAddr ma.Multiaddr + + RemotePeer peer.ID + RemoteAddr ma.Multiaddr + + Local io.ReadWriteCloser + Remote io.ReadWriteCloser + + Registry *StreamRegistry +} + +// Close closes stream endpoints and deregisters it +func (s *StreamInfo) Close() error { + s.Local.Close() + s.Remote.Close() + s.Registry.Deregister(s.HandlerID) + return nil +} + +func (s *StreamInfo) startStreaming() { + go func() { + io.Copy(s.Local, s.Remote) + s.Close() + }() + + go func() { + io.Copy(s.Remote, s.Local) + s.Close() + }() +} + +// StreamRegistry is a collection of active incoming and outgoing protocol app streams. +type StreamRegistry struct { + Streams []*StreamInfo + + nextID uint64 +} + +// Register registers a stream to the registry +func (c *StreamRegistry) Register(streamInfo *StreamInfo) { + streamInfo.HandlerID = c.nextID + c.Streams = append(c.Streams, streamInfo) + c.nextID++ +} + +// Deregister deregisters stream from the registry +func (c *StreamRegistry) Deregister(handlerID uint64) { + foundAt := -1 + for i, s := range c.Streams { + if s.HandlerID == handlerID { + foundAt = i + break + } + } + + if foundAt != -1 { + c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...) + } +} diff --git a/repo/config/experiments.go b/repo/config/experiments.go index 8eb942b3989..f76572ee2af 100644 --- a/repo/config/experiments.go +++ b/repo/config/experiments.go @@ -1,6 +1,7 @@ package config type Experiments struct { - FilestoreEnabled bool - ShardingEnabled bool + FilestoreEnabled bool + ShardingEnabled bool + Libp2pStreamMounting bool } diff --git a/test/bin/Rules.mk b/test/bin/Rules.mk index 08f1a295cf6..279b59f21a8 100644 --- a/test/bin/Rules.mk +++ b/test/bin/Rules.mk @@ -22,6 +22,10 @@ $(d)/go-timeout: test/dependencies/go-timeout $(go-build) TGTS_$(d) += $(d)/go-timeout +$(d)/ma-pipe-unidir: test/dependencies/ma-pipe-unidir + $(go-build) +TGTS_$(d) += $(d)/ma-pipe-unidir + TGTS_GX_$(d) := hang-fds iptb TGTS_GX_$(d) := $(addprefix $(d)/,$(TGTS_GX_$(d))) diff --git a/test/dependencies/ma-pipe-unidir/LICENSE b/test/dependencies/ma-pipe-unidir/LICENSE new file mode 100644 index 00000000000..e1159e5d4f3 --- /dev/null +++ b/test/dependencies/ma-pipe-unidir/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2017 Ɓukasz Magiera + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + diff --git a/test/dependencies/ma-pipe-unidir/main.go b/test/dependencies/ma-pipe-unidir/main.go new file mode 100644 index 00000000000..24412d00079 --- /dev/null +++ b/test/dependencies/ma-pipe-unidir/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "strconv" + + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net" +) + +const USAGE = "ma-pipe-unidir [-l|--listen] [--pidFile=path] [-h|--help] \n" + +type Opts struct { + Listen bool + PidFile string +} + +func app() int { + opts := Opts{} + flag.BoolVar(&opts.Listen, "l", false, "") + flag.BoolVar(&opts.Listen, "listen", false, "") + flag.StringVar(&opts.PidFile, "pidFile", "", "") + flag.Usage = func() { + fmt.Print(USAGE) + } + flag.Parse() + args := flag.Args() + + if len(args) < 2 { // + fmt.Print(USAGE) + return 1 + } + + mode := args[0] + addr := args[1] + + if mode != "send" && mode != "recv" { + fmt.Print(USAGE) + return 1 + } + + if len(opts.PidFile) > 0 { + data := []byte(strconv.Itoa(os.Getpid())) + err := ioutil.WriteFile(opts.PidFile, data, 0644) + if err != nil { + return 1 + } + + defer os.Remove(opts.PidFile) + } + + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return 1 + } + + var conn manet.Conn + + if opts.Listen { + listener, err := manet.Listen(maddr) + if err != nil { + return 1 + } + + conn, err = listener.Accept() + if err != nil { + return 1 + } + } else { + var err error + conn, err = manet.Dial(maddr) + if err != nil { + return 1 + } + } + + defer conn.Close() + switch mode { + case "recv": + io.Copy(os.Stdout, conn) + case "send": + io.Copy(conn, os.Stdin) + default: + return 1 + } + return 0 +} + +func main() { + os.Exit(app()) +} diff --git a/test/sharness/Rules.mk b/test/sharness/Rules.mk index 5832fb9b8a3..916f376c145 100644 --- a/test/sharness/Rules.mk +++ b/test/sharness/Rules.mk @@ -7,7 +7,7 @@ T_$(d) = $(sort $(wildcard $(d)/t[0-9][0-9][0-9][0-9]-*.sh)) DEPS_$(d) := test/bin/random test/bin/multihash test/bin/pollEndpoint \ test/bin/iptb test/bin/go-sleep test/bin/random-files \ - test/bin/go-timeout test/bin/hang-fds + test/bin/go-timeout test/bin/hang-fds test/bin/ma-pipe-unidir DEPS_$(d) += cmd/ipfs/ipfs DEPS_$(d) += $(d)/clean-test-results DEPS_$(d) += $(SHARNESS_$(d)) diff --git a/test/sharness/t0180-ptp.sh b/test/sharness/t0180-ptp.sh new file mode 100755 index 00000000000..b321bad8e68 --- /dev/null +++ b/test/sharness/t0180-ptp.sh @@ -0,0 +1,160 @@ +#!/bin/sh + +test_description="Test experimental ptp commands" + +. lib/test-lib.sh + +# start iptb + wait for peering +test_expect_success 'init iptb' ' + iptb init -n 2 --bootstrap=none --port=0 +' + +test_expect_success 'generate test data' ' + echo "ABCDEF" > test0.bin && + echo "012345" > test1.bin +' + +startup_cluster 2 + +test_expect_success 'peer ids' ' + PEERID_0=$(iptb get id 0) && + PEERID_1=$(iptb get id 1) +' + +test_expect_success "test ports are closed" ' + (! (netstat -ln | grep "LISTEN" | grep ":10101 ")) && + (! (netstat -ln | grep "LISTEN" | grep ":10102 ")) +' + +test_must_fail 'fail without config option being enabled' ' + ipfsi 0 ptp stream ls +' + +test_expect_success "enable filestore config setting" ' + ipfsi 0 config --json Experimental.Libp2pStreamMounting true + ipfsi 1 config --json Experimental.Libp2pStreamMounting true +' + +test_expect_success 'start ptp listener' ' + ipfsi 0 ptp listener open ptp-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log +' + +test_expect_success 'Test server to client communications' ' + ma-pipe-unidir --listen send /ip4/127.0.0.1/tcp/10101 < test0.bin & + SERVER_PID=$! + + ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out && + wait $SERVER_PID +' + +test_expect_success 'Test client to server communications' ' + ma-pipe-unidir --listen recv /ip4/127.0.0.1/tcp/10101 > server.out & + SERVER_PID=$! + + ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin + wait $SERVER_PID +' + +test_expect_success 'server to client output looks good' ' + test_cmp client.out test0.bin +' + +test_expect_success 'client to server output looks good' ' + test_cmp server.out test1.bin +' + +test_expect_success "'ipfs listener ptp ls' succeeds" ' + echo "/ip4/127.0.0.1/tcp/10101 /ptp/ptp-test" > expected && + ipfsi 0 ptp listener ls > actual +' + +test_expect_success "'ipfs ptp listener ls' output looks good" ' + test_cmp expected actual +' + +test_expect_success "Cannot re-register app handler" ' + (! ipfsi 0 ptp listener open ptp-test /ip4/127.0.0.1/tcp/10101) +' + +test_expect_success "'ipfs ptp stream ls' output is empty" ' + ipfsi 0 ptp stream ls > actual && + test_must_be_empty actual +' + +test_expect_success "Setup: Idle stream" ' + ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & + + ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & + + go-sleep 500ms && + kill -0 $(cat listener.pid) && kill -0 $(cat client.pid) +' + +test_expect_success "'ipfs ptp stream ls' succeeds" ' + echo "2 /ptp/ptp-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected + ipfsi 0 ptp stream ls > actual +' + +test_expect_success "'ipfs ptp stream ls' output looks good" ' + test_cmp expected actual +' + +test_expect_success "'ipfs ptp stream close' closes stream" ' + ipfsi 0 ptp stream close 2 && + ipfsi 0 ptp stream ls > actual && + [ ! -f listener.pid ] && [ ! -f client.pid ] && + test_must_be_empty actual +' + +test_expect_success "'ipfs ptp listener close' closes app handler" ' + ipfsi 0 ptp listener close ptp-test && + ipfsi 0 ptp listener ls > actual && + test_must_be_empty actual +' + +test_expect_success "Setup: Idle stream(2)" ' + ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & + + ipfsi 0 ptp listener open ptp-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && + ipfsi 1 ptp stream dial $PEERID_0 ptp-test2 /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & + + go-sleep 500ms && + kill -0 $(cat listener.pid) && kill -0 $(cat client.pid) +' + +test_expect_success "'ipfs ptp stream ls' succeeds(2)" ' + echo "3 /ptp/ptp-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected + ipfsi 0 ptp stream ls > actual + test_cmp expected actual +' + +test_expect_success "'ipfs ptp listener close -a' closes app handlers" ' + ipfsi 0 ptp listener close -a && + ipfsi 0 ptp listener ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs ptp stream close -a' closes streams" ' + ipfsi 0 ptp stream close -a && + ipfsi 0 ptp stream ls > actual && + [ ! -f listener.pid ] && [ ! -f client.pid ] && + test_must_be_empty actual +' + +test_expect_success "'ipfs ptp listener close' closes app numeric handlers" ' + ipfsi 0 ptp listener open 1234 /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 ptp listener close 1234 && + ipfsi 0 ptp listener ls > actual && + test_must_be_empty actual +' + +test_expect_success 'stop iptb' ' + iptb stop +' + +test_done +