Skip to content

Commit

Permalink
Close conn when transport closes
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Jul 12, 2023
1 parent d1ad906 commit efe0d08
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
26 changes: 19 additions & 7 deletions p2p/transport/quicreuse/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func (c *singleOwnerTransport) LocalAddr() net.Addr {
return c.Transport.Conn.LocalAddr()
}

func (c *singleOwnerTransport) Close() error {
// TODO(when we drop support for go 1.19) use errors.Join
c.Transport.Close()
return c.packetConn.Close()
}

func (c *singleOwnerTransport) WriteTo(b []byte, addr net.Addr) (int, error) {
// Safe because we called quic.OptimizeConn ourselves.
return c.packetConn.WriteTo(b, addr)
Expand Down Expand Up @@ -71,6 +77,12 @@ func (c *refcountedTransport) IncreaseCount() {
c.mutex.Unlock()
}

func (c *refcountedTransport) Close() error {
// TODO(when we drop support for go 1.19) use errors.Join
c.Transport.Close()
return c.packetConn.Close()
}

func (c *refcountedTransport) WriteTo(b []byte, addr net.Addr) (int, error) {
// Safe because we called quic.OptimizeConn ourselves.
return c.packetConn.WriteTo(b, addr)
Expand Down Expand Up @@ -131,15 +143,15 @@ func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse {
func (r *reuse) gc() {
defer func() {
r.mutex.Lock()
for _, conn := range r.globalListeners {
conn.Close()
for _, tr := range r.globalListeners {
tr.Close()
}
for _, conn := range r.globalDialers {
conn.Close()
for _, tr := range r.globalDialers {
tr.Close()
}
for _, conns := range r.unicast {
for _, conn := range conns {
conn.Close()
for _, trs := range r.unicast {
for _, tr := range trs {
tr.Close()
}
}
r.mutex.Unlock()
Expand Down
33 changes: 16 additions & 17 deletions p2p/transport/quicreuse/reuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ func (c *refcountedTransport) GetCount() int {

func closeAllConns(reuse *reuse) {
reuse.mutex.Lock()
for _, conn := range reuse.globalListeners {
for conn.GetCount() > 0 {
conn.DecreaseCount()
for _, tr := range reuse.globalListeners {
for tr.GetCount() > 0 {
tr.DecreaseCount()
}
}
for _, conn := range reuse.globalDialers {
for conn.GetCount() > 0 {
conn.DecreaseCount()
for _, tr := range reuse.globalDialers {
for tr.GetCount() > 0 {
tr.DecreaseCount()
}
}
for _, conns := range reuse.unicast {
for _, conn := range conns {
for conn.GetCount() > 0 {
conn.DecreaseCount()
for _, trs := range reuse.unicast {
for _, tr := range trs {
for tr.GetCount() > 0 {
tr.DecreaseCount()
}
}
}
Expand Down Expand Up @@ -138,28 +138,27 @@ func TestReuseConnectionWhenDialBeforeListen(t *testing.T) {
// dial any address
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
require.NoError(t, err)
rconn, err := reuse.TransportForDial("udp4", raddr)
rTr, err := reuse.TransportForDial("udp4", raddr)
require.NoError(t, err)

// open a listener
laddr := &net.UDPAddr{IP: net.IPv4zero, Port: 1234}
tr, err := reuse.TransportForListen("udp4", laddr)
lTr, err := reuse.TransportForListen("udp4", laddr)
require.NoError(t, err)
defer tr.Close()

// new dials should go via the listener connection
raddr, err = net.ResolveUDPAddr("udp4", "1.1.1.1:1235")
require.NoError(t, err)
conn, err := reuse.TransportForDial("udp4", raddr)
tr, err := reuse.TransportForDial("udp4", raddr)
require.NoError(t, err)
require.Equal(t, conn, tr)
require.Equal(t, conn.GetCount(), 2)
require.Equal(t, tr, lTr)
require.Equal(t, tr.GetCount(), 2)

// a listener on an unspecified port should reuse the dialer
laddr2 := &net.UDPAddr{IP: net.IPv4zero, Port: 0}
lconn2, err := reuse.TransportForListen("udp4", laddr2)
require.NoError(t, err)
require.Equal(t, lconn2, rconn)
require.Equal(t, lconn2, rTr)
require.Equal(t, lconn2.GetCount(), 2)
}

Expand Down

0 comments on commit efe0d08

Please sign in to comment.