Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
reuse listening connections for dialing
Browse files Browse the repository at this point in the history
  • Loading branch information
lnykww authored and marten-seemann committed Aug 5, 2019
1 parent fd198fe commit 79c61fb
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 52 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ require (
github.com/lucas-clemente/quic-go v0.12.0
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-net v0.0.1
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/vishvananda/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f // indirect
github.com/whyrusleeping/mafmt v1.2.8
)
24 changes: 24 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wX
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
Expand All @@ -37,8 +41,11 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I=
github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-tls v0.1.1 h1:tjW7njTM8JX8FbEvqr8/VSKBdZYZ7CtGtv3i6NiFf10=
github.com/libp2p/go-libp2p-tls v0.1.1/go.mod h1:wZfuewxOndz5RTnCAxFliGjvYSDA40sKitV4c50uI1M=
github.com/lucas-clemente/quic-go v0.12.0 h1:dYHUyB50gEQlK3KqytmNySzuyzAcaQ3iuI2ZReAfVrE=
Expand All @@ -50,10 +57,16 @@ github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4=
Expand All @@ -68,19 +81,30 @@ github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKT
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM=
github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f h1:nBX3nTcmxEtHSERBJaIo1Qa26VwRaopnZmfDQUXsF4I=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190228165749-92fc7df08ae7 h1:Qe/u+eY379X4He4GBMFZYu3pmh1ML5yT1aL1ndNM1zQ=
golang.org/x/net v0.0.0-20190228165749-92fc7df08ae7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
27 changes: 8 additions & 19 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,21 @@ import (

quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// A listener listens for QUIC connections.
type listener struct {
quicListener quic.Listener
transport tpt.Transport

quicListener quic.Listener
conn *reuseConn
transport *transport
privKey ic.PrivKey
localPeer peer.ID
localMultiaddr ma.Multiaddr
}

var _ tpt.Listener = &listener{}

func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) {
func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) {
var tlsConf tls.Config
tlsConf.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) {
// return a tls.Config that verifies the peer's certificate chain.
Expand All @@ -37,19 +36,7 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID,
conf, _ := identity.ConfigForAny()
return conf, nil
}
lnet, host, err := manet.DialArgs(addr)
if err != nil {
return nil, err
}
laddr, err := net.ResolveUDPAddr(lnet, host)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(lnet, laddr)
if err != nil {
return nil, err
}
ln, err := quic.Listen(conn, &tlsConf, quicConfig)
ln, err := quic.Listen(rconn, &tlsConf, quicConfig)
if err != nil {
return nil, err
}
Expand All @@ -58,8 +45,9 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID,
return nil, err
}
return &listener{
conn: rconn,
quicListener: ln,
transport: transport,
transport: t,
privKey: key,
localPeer: localPeer,
localMultiaddr: localMultiaddr,
Expand Down Expand Up @@ -113,6 +101,7 @@ func (l *listener) setupConn(sess quic.Session) (tpt.CapableConn, error) {

// Close closes the listener.
func (l *listener) Close() error {
l.conn.DecreaseCount()
return l.quicListener.Close()
}

Expand Down
137 changes: 137 additions & 0 deletions reuse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package libp2pquic

import (
"net"
"sync"
"sync/atomic"

"github.com/vishvananda/netlink"
)

type reuseConn struct {
net.PacketConn
refCount int32 // to be used as an atomic
}

func newReuseConn(conn net.PacketConn) *reuseConn {
return &reuseConn{PacketConn: conn}
}

func (c *reuseConn) IncreaseCount() { atomic.AddInt32(&c.refCount, 1) }
func (c *reuseConn) DecreaseCount() { atomic.AddInt32(&c.refCount, -1) }
func (c *reuseConn) GetCount() int { return int(atomic.LoadInt32(&c.refCount)) }

type reuse struct {
mutex sync.Mutex

unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
global map[int]*reuseConn
}

func newReuse() *reuse {
return &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
}
}

func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, error) {
// Determine the source address that the kernel would use for this IP address.
// Note: This only works on Linux.
// On other OSes, this will return a netlink.ErrNotImplemetned.
routes, err := (&netlink.Handle{}).RouteGet(raddr.IP)
if err != nil {
return nil, err
}

ips := make([]net.IP, 0, len(routes))
for _, route := range routes {
ips = append(ips, route.Src)
}
return ips, nil
}

func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
ips, err := r.getSourceIPs(network, raddr)
if err != nil && err != netlink.ErrNotImplemented {
return nil, err
}

r.mutex.Lock()
defer r.mutex.Unlock()

conn, err := r.dialLocked(network, raddr, ips)
if err != nil {
return nil, err
}
conn.IncreaseCount()
return conn, nil
}

func (r *reuse) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP) (*reuseConn, error) {
for _, ip := range ips {
// We already have at least one suitable connection...
if conns, ok := r.unicast[ip.String()]; ok {
// ... we don't care which port we're dialing from. Just use the first.
for _, c := range conns {
return c, nil
}
}
}

// Use a connection listening on 0.0.0.0 (or ::).
// Again, we don't care about the port number.
for _, conn := range r.global {
return conn, nil
}

// We don't have a connection that we can use for dialing.
// Dial a new connection from a random port.
var addr *net.UDPAddr
switch network {
case "udp4":
addr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
case "udp6":
addr = &net.UDPAddr{IP: net.IPv6zero, Port: 0}
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
rconn := newReuseConn(conn)
r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
return rconn, nil
}

func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
localAddr := conn.LocalAddr().(*net.UDPAddr)

rconn := newReuseConn(conn)
rconn.IncreaseCount()

r.mutex.Lock()
defer r.mutex.Unlock()

// Deal with listen on a global address
if laddr.IP.IsUnspecified() {
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.global[laddr.Port] = rconn
return rconn, err
}

// Deal with listen on a unicast address
if _, ok := r.unicast[localAddr.IP.String()]; !ok {
r.unicast[laddr.IP.String()] = make(map[int]*reuseConn)
}

// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.unicast[laddr.IP.String()][localAddr.Port] = rconn
return rconn, err
}
79 changes: 79 additions & 0 deletions reuse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package libp2pquic

import (
"net"
"runtime"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Reuse", func() {
var reuse *reuse

BeforeEach(func() {
reuse = newReuse()
})

It("creates a new global connection when listening on 0.0.0.0", func() {
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(1))
})

It("creates a new global connection when listening on [::]", func() {
addr, err := net.ResolveUDPAddr("udp6", "[::]:1234")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Listen("udp6", addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(1))
})

It("creates a new global connection when dialing", func() {
addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Dial("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(1))
laddr := conn.LocalAddr().(*net.UDPAddr)
Expect(laddr.IP.String()).To(Equal("0.0.0.0"))
Expect(laddr.Port).ToNot(BeZero())
})

It("reuses a connection it created for listening when dialing", func() {
// listen
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
// dial
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
conn, err := reuse.Dial("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})

if runtime.GOOS == "linux" {
It("reuses a connection it created for listening on a specific interface", func() {
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expect(err).ToNot(HaveOccurred())
ips, err := reuse.getSourceIPs("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(ips).ToNot(BeEmpty())
// listen
addr, err := net.ResolveUDPAddr("udp4", ips[0].String()+":0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
// dial
conn, err := reuse.Dial("udp4", raddr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.GetCount()).To(Equal(2))
})
}
})
Loading

0 comments on commit 79c61fb

Please sign in to comment.