From 79c61fbf9fe559db9634adaac17892ed6801b0e3 Mon Sep 17 00:00:00 2001 From: lnykww Date: Thu, 21 Mar 2019 20:13:28 +0800 Subject: [PATCH 1/2] reuse listening connections for dialing --- go.mod | 6 ++- go.sum | 24 +++++++++ listener.go | 27 +++------- reuse.go | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++ reuse_test.go | 79 +++++++++++++++++++++++++++++ transport.go | 82 ++++++++++++++++++------------ 6 files changed, 303 insertions(+), 52 deletions(-) create mode 100644 reuse.go create mode 100644 reuse_test.go diff --git a/go.mod b/go.mod index 6a20599..2c00b07 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,9 @@ require ( github.com/lucas-clemente/quic-go v0.12.0 github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-net v0.0.1 - github.com/onsi/ginkgo v1.7.0 - github.com/onsi/gomega v1.4.3 + github.com/onsi/ginkgo v1.8.0 + github.com/onsi/gomega v1.5.0 + github.com/vishvananda/netlink v1.0.0 + github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f // indirect github.com/whyrusleeping/mafmt v1.2.8 ) diff --git a/go.sum b/go.sum index 2cf5868..74e98ec 100644 --- a/go.sum +++ b/go.sum @@ -13,12 +13,16 @@ github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wX github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= @@ -37,8 +41,11 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= +github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= +github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= +github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-tls v0.1.1 h1:tjW7njTM8JX8FbEvqr8/VSKBdZYZ7CtGtv3i6NiFf10= github.com/libp2p/go-libp2p-tls v0.1.1/go.mod h1:wZfuewxOndz5RTnCAxFliGjvYSDA40sKitV4c50uI1M= github.com/lucas-clemente/quic-go v0.12.0 h1:dYHUyB50gEQlK3KqytmNySzuyzAcaQ3iuI2ZReAfVrE= @@ -50,10 +57,16 @@ github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= +github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= +github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY= +github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY= +github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4= @@ -68,12 +81,20 @@ github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKT github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= +github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM= +github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= +github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f h1:nBX3nTcmxEtHSERBJaIo1Qa26VwRaopnZmfDQUXsF4I= +github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA= github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -81,6 +102,9 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU= golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190228165749-92fc7df08ae7 h1:Qe/u+eY379X4He4GBMFZYu3pmh1ML5yT1aL1ndNM1zQ= golang.org/x/net v0.0.0-20190228165749-92fc7df08ae7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/listener.go b/listener.go index 0d1b835..acc76f1 100644 --- a/listener.go +++ b/listener.go @@ -12,14 +12,13 @@ import ( quic "github.com/lucas-clemente/quic-go" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" ) // A listener listens for QUIC connections. type listener struct { - quicListener quic.Listener - transport tpt.Transport - + quicListener quic.Listener + conn *reuseConn + transport *transport privKey ic.PrivKey localPeer peer.ID localMultiaddr ma.Multiaddr @@ -27,7 +26,7 @@ type listener struct { var _ tpt.Listener = &listener{} -func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) { +func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) { var tlsConf tls.Config tlsConf.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) { // return a tls.Config that verifies the peer's certificate chain. @@ -37,19 +36,7 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, conf, _ := identity.ConfigForAny() return conf, nil } - lnet, host, err := manet.DialArgs(addr) - if err != nil { - return nil, err - } - laddr, err := net.ResolveUDPAddr(lnet, host) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP(lnet, laddr) - if err != nil { - return nil, err - } - ln, err := quic.Listen(conn, &tlsConf, quicConfig) + ln, err := quic.Listen(rconn, &tlsConf, quicConfig) if err != nil { return nil, err } @@ -58,8 +45,9 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, return nil, err } return &listener{ + conn: rconn, quicListener: ln, - transport: transport, + transport: t, privKey: key, localPeer: localPeer, localMultiaddr: localMultiaddr, @@ -113,6 +101,7 @@ func (l *listener) setupConn(sess quic.Session) (tpt.CapableConn, error) { // Close closes the listener. func (l *listener) Close() error { + l.conn.DecreaseCount() return l.quicListener.Close() } diff --git a/reuse.go b/reuse.go new file mode 100644 index 0000000..56aba1b --- /dev/null +++ b/reuse.go @@ -0,0 +1,137 @@ +package libp2pquic + +import ( + "net" + "sync" + "sync/atomic" + + "github.com/vishvananda/netlink" +) + +type reuseConn struct { + net.PacketConn + refCount int32 // to be used as an atomic +} + +func newReuseConn(conn net.PacketConn) *reuseConn { + return &reuseConn{PacketConn: conn} +} + +func (c *reuseConn) IncreaseCount() { atomic.AddInt32(&c.refCount, 1) } +func (c *reuseConn) DecreaseCount() { atomic.AddInt32(&c.refCount, -1) } +func (c *reuseConn) GetCount() int { return int(atomic.LoadInt32(&c.refCount)) } + +type reuse struct { + mutex sync.Mutex + + unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn + // global contains connections that are listening on 0.0.0.0 / :: + global map[int]*reuseConn +} + +func newReuse() *reuse { + return &reuse{ + unicast: make(map[string]map[int]*reuseConn), + global: make(map[int]*reuseConn), + } +} + +func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, error) { + // Determine the source address that the kernel would use for this IP address. + // Note: This only works on Linux. + // On other OSes, this will return a netlink.ErrNotImplemetned. + routes, err := (&netlink.Handle{}).RouteGet(raddr.IP) + if err != nil { + return nil, err + } + + ips := make([]net.IP, 0, len(routes)) + for _, route := range routes { + ips = append(ips, route.Src) + } + return ips, nil +} + +func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) { + ips, err := r.getSourceIPs(network, raddr) + if err != nil && err != netlink.ErrNotImplemented { + return nil, err + } + + r.mutex.Lock() + defer r.mutex.Unlock() + + conn, err := r.dialLocked(network, raddr, ips) + if err != nil { + return nil, err + } + conn.IncreaseCount() + return conn, nil +} + +func (r *reuse) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP) (*reuseConn, error) { + for _, ip := range ips { + // We already have at least one suitable connection... + if conns, ok := r.unicast[ip.String()]; ok { + // ... we don't care which port we're dialing from. Just use the first. + for _, c := range conns { + return c, nil + } + } + } + + // Use a connection listening on 0.0.0.0 (or ::). + // Again, we don't care about the port number. + for _, conn := range r.global { + return conn, nil + } + + // We don't have a connection that we can use for dialing. + // Dial a new connection from a random port. + var addr *net.UDPAddr + switch network { + case "udp4": + addr = &net.UDPAddr{IP: net.IPv4zero, Port: 0} + case "udp6": + addr = &net.UDPAddr{IP: net.IPv6zero, Port: 0} + } + conn, err := net.ListenUDP(network, addr) + if err != nil { + return nil, err + } + rconn := newReuseConn(conn) + r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn + return rconn, nil +} + +func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { + conn, err := net.ListenUDP(network, laddr) + if err != nil { + return nil, err + } + localAddr := conn.LocalAddr().(*net.UDPAddr) + + rconn := newReuseConn(conn) + rconn.IncreaseCount() + + r.mutex.Lock() + defer r.mutex.Unlock() + + // Deal with listen on a global address + if laddr.IP.IsUnspecified() { + // The kernel already checked that the laddr is not already listen + // so we need not check here (when we create ListenUDP). + r.global[laddr.Port] = rconn + return rconn, err + } + + // Deal with listen on a unicast address + if _, ok := r.unicast[localAddr.IP.String()]; !ok { + r.unicast[laddr.IP.String()] = make(map[int]*reuseConn) + } + + // The kernel already checked that the laddr is not already listen + // so we need not check here (when we create ListenUDP). + r.unicast[laddr.IP.String()][localAddr.Port] = rconn + return rconn, err +} diff --git a/reuse_test.go b/reuse_test.go new file mode 100644 index 0000000..2259a39 --- /dev/null +++ b/reuse_test.go @@ -0,0 +1,79 @@ +package libp2pquic + +import ( + "net" + "runtime" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Reuse", func() { + var reuse *reuse + + BeforeEach(func() { + reuse = newReuse() + }) + + It("creates a new global connection when listening on 0.0.0.0", func() { + addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") + Expect(err).ToNot(HaveOccurred()) + conn, err := reuse.Listen("udp4", addr) + Expect(err).ToNot(HaveOccurred()) + Expect(conn.GetCount()).To(Equal(1)) + }) + + It("creates a new global connection when listening on [::]", func() { + addr, err := net.ResolveUDPAddr("udp6", "[::]:1234") + Expect(err).ToNot(HaveOccurred()) + conn, err := reuse.Listen("udp6", addr) + Expect(err).ToNot(HaveOccurred()) + Expect(conn.GetCount()).To(Equal(1)) + }) + + It("creates a new global connection when dialing", func() { + addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") + Expect(err).ToNot(HaveOccurred()) + conn, err := reuse.Dial("udp4", addr) + Expect(err).ToNot(HaveOccurred()) + Expect(conn.GetCount()).To(Equal(1)) + laddr := conn.LocalAddr().(*net.UDPAddr) + Expect(laddr.IP.String()).To(Equal("0.0.0.0")) + Expect(laddr.Port).ToNot(BeZero()) + }) + + It("reuses a connection it created for listening when dialing", func() { + // listen + addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") + Expect(err).ToNot(HaveOccurred()) + lconn, err := reuse.Listen("udp4", addr) + Expect(err).ToNot(HaveOccurred()) + Expect(lconn.GetCount()).To(Equal(1)) + // dial + raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") + Expect(err).ToNot(HaveOccurred()) + conn, err := reuse.Dial("udp4", raddr) + Expect(err).ToNot(HaveOccurred()) + Expect(conn.GetCount()).To(Equal(2)) + }) + + if runtime.GOOS == "linux" { + It("reuses a connection it created for listening on a specific interface", func() { + raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") + Expect(err).ToNot(HaveOccurred()) + ips, err := reuse.getSourceIPs("udp4", raddr) + Expect(err).ToNot(HaveOccurred()) + Expect(ips).ToNot(BeEmpty()) + // listen + addr, err := net.ResolveUDPAddr("udp4", ips[0].String()+":0") + Expect(err).ToNot(HaveOccurred()) + lconn, err := reuse.Listen("udp4", addr) + Expect(err).ToNot(HaveOccurred()) + Expect(lconn.GetCount()).To(Equal(1)) + // dial + conn, err := reuse.Dial("udp4", raddr) + Expect(err).ToNot(HaveOccurred()) + Expect(conn.GetCount()).To(Equal(2)) + }) + } +}) diff --git a/transport.go b/transport.go index 5b21708..bd0adfc 100644 --- a/transport.go +++ b/transport.go @@ -3,9 +3,7 @@ package libp2pquic import ( "context" "errors" - "fmt" "net" - "sync" ic "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" @@ -31,42 +29,42 @@ var quicConfig = &quic.Config{ } type connManager struct { - mutex sync.Mutex - - connIPv4 net.PacketConn - connIPv6 net.PacketConn + reuseUDP4 *reuse + reuseUDP6 *reuse } -func (c *connManager) GetConnForAddr(network string) (net.PacketConn, error) { - c.mutex.Lock() - defer c.mutex.Unlock() +func newConnManager() *connManager { + return &connManager{ + reuseUDP4: newReuse(), + reuseUDP6: newReuse(), + } +} +func (c *connManager) getReuse(network string) (*reuse, error) { switch network { case "udp4": - if c.connIPv4 != nil { - return c.connIPv4, nil - } - var err error - c.connIPv4, err = c.createConn(network, "0.0.0.0:0") - return c.connIPv4, err + return c.reuseUDP4, nil case "udp6": - if c.connIPv6 != nil { - return c.connIPv6, nil - } - var err error - c.connIPv6, err = c.createConn(network, ":0") - return c.connIPv6, err + return c.reuseUDP6, nil default: - return nil, fmt.Errorf("unsupported network: %s", network) + return nil, errors.New("invalid network: must be either udp4 or udp6") + } +} + +func (c *connManager) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { + reuse, err := c.getReuse(network) + if err != nil { + return nil, err } + return reuse.Listen(network, laddr) } -func (c *connManager) createConn(network, host string) (net.PacketConn, error) { - addr, err := net.ResolveUDPAddr(network, host) +func (c *connManager) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) { + reuse, err := c.getReuse(network) if err != nil { return nil, err } - return net.ListenUDP(network, addr) + return reuse.Dial(network, raddr) } // The Transport implements the tpt.Transport interface for QUIC connections. @@ -94,7 +92,7 @@ func NewTransport(key ic.PrivKey) (tpt.Transport, error) { privKey: key, localPeer: localPeer, identity: identity, - connManager: &connManager{}, + connManager: newConnManager(), }, nil } @@ -104,7 +102,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp if err != nil { return nil, err } - pconn, err := t.connManager.GetConnForAddr(network) + udpAddr, err := net.ResolveUDPAddr(network, host) if err != nil { return nil, err } @@ -113,15 +111,15 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp return nil, err } tlsConf, keyCh := t.identity.ConfigForPeer(p) - sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, quicConfig) + pconn, err := t.connManager.Dial(network, udpAddr) if err != nil { return nil, err } - localMultiaddr, err := toQuicMultiaddr(sess.LocalAddr()) + sess, err := quic.DialContext(ctx, pconn, addr, host, tlsConf, quicConfig) if err != nil { + pconn.DecreaseCount() return nil, err } - // Should be ready by this point, don't block. var remotePubKey ic.PubKey select { @@ -129,9 +127,19 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp default: } if remotePubKey == nil { + pconn.DecreaseCount() return nil, errors.New("go-libp2p-quic-transport BUG: expected remote pub key to be set") } + go func() { + <-sess.Context().Done() + pconn.DecreaseCount() + }() + localMultiaddr, err := toQuicMultiaddr(pconn.LocalAddr()) + if err != nil { + pconn.DecreaseCount() + return nil, err + } return &conn{ sess: sess, transport: t, @@ -151,7 +159,19 @@ func (t *transport) CanDial(addr ma.Multiaddr) bool { // Listen listens for new QUIC connections on the passed multiaddr. func (t *transport) Listen(addr ma.Multiaddr) (tpt.Listener, error) { - return newListener(addr, t, t.localPeer, t.privKey, t.identity) + lnet, host, err := manet.DialArgs(addr) + if err != nil { + return nil, err + } + laddr, err := net.ResolveUDPAddr(lnet, host) + if err != nil { + return nil, err + } + conn, err := t.connManager.Listen(lnet, laddr) + if err != nil { + return nil, err + } + return newListener(conn, t, t.localPeer, t.privKey, t.identity) } // Proxy returns true if this transport proxies. From 01a06cdc5461af1bb223a6620bf9c697016a95bd Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 5 Aug 2019 18:12:51 +0700 Subject: [PATCH 2/2] use a single handle for each reuse --- reuse.go | 28 +++++++++++++++++++++------- reuse_test.go | 4 +++- transport.go | 22 +++++++++++++++++----- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/reuse.go b/reuse.go index 56aba1b..08fc14f 100644 --- a/reuse.go +++ b/reuse.go @@ -24,23 +24,37 @@ func (c *reuseConn) GetCount() int { return int(atomic.LoadInt32(&c.refCount)) type reuse struct { mutex sync.Mutex + handle *netlink.Handle // Only set on Linux. nil on other systems. + unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn // global contains connections that are listening on 0.0.0.0 / :: global map[int]*reuseConn } -func newReuse() *reuse { +func newReuse() (*reuse, error) { + // On non-Linux systems, this will return ErrNotImplemented. + handle, err := netlink.NewHandle() + if err == netlink.ErrNotImplemented { + handle = nil + } else if err != nil { + return nil, err + } return &reuse{ unicast: make(map[string]map[int]*reuseConn), global: make(map[int]*reuseConn), - } + handle: handle, + }, nil } +// Get the source IP that the kernel would use for dialing. +// This only works on Linux. +// On other systems, this returns an empty slice of IP addresses. func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, error) { - // Determine the source address that the kernel would use for this IP address. - // Note: This only works on Linux. - // On other OSes, this will return a netlink.ErrNotImplemetned. - routes, err := (&netlink.Handle{}).RouteGet(raddr.IP) + if r.handle == nil { + return nil, nil + } + + routes, err := r.handle.RouteGet(raddr.IP) if err != nil { return nil, err } @@ -54,7 +68,7 @@ func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, erro func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) { ips, err := r.getSourceIPs(network, raddr) - if err != nil && err != netlink.ErrNotImplemented { + if err != nil { return nil, err } diff --git a/reuse_test.go b/reuse_test.go index 2259a39..91668b0 100644 --- a/reuse_test.go +++ b/reuse_test.go @@ -12,7 +12,9 @@ var _ = Describe("Reuse", func() { var reuse *reuse BeforeEach(func() { - reuse = newReuse() + var err error + reuse, err = newReuse() + Expect(err).ToNot(HaveOccurred()) }) It("creates a new global connection when listening on 0.0.0.0", func() { diff --git a/transport.go b/transport.go index bd0adfc..a9ddd41 100644 --- a/transport.go +++ b/transport.go @@ -33,11 +33,19 @@ type connManager struct { reuseUDP6 *reuse } -func newConnManager() *connManager { - return &connManager{ - reuseUDP4: newReuse(), - reuseUDP6: newReuse(), +func newConnManager() (*connManager, error) { + reuseUDP4, err := newReuse() + if err != nil { + return nil, err + } + reuseUDP6, err := newReuse() + if err != nil { + return nil, err } + return &connManager{ + reuseUDP4: reuseUDP4, + reuseUDP6: reuseUDP6, + }, nil } func (c *connManager) getReuse(network string) (*reuse, error) { @@ -87,12 +95,16 @@ func NewTransport(key ic.PrivKey) (tpt.Transport, error) { if err != nil { return nil, err } + connManager, err := newConnManager() + if err != nil { + return nil, err + } return &transport{ privKey: key, localPeer: localPeer, identity: identity, - connManager: newConnManager(), + connManager: connManager, }, nil }