Skip to content

Commit

Permalink
Finish renaming conn -> transport where appropriate
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Jul 13, 2023
1 parent a451cfa commit 7773888
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 61 deletions.
36 changes: 18 additions & 18 deletions p2p/transport/quicreuse/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@ type ConnManager struct {
serverConfig *quic.Config
clientConfig *quic.Config

connsMu sync.Mutex
conns map[string]connListenerEntry
quicListenersMu sync.Mutex
quicListeners map[string]quicListenerEntry

srk quic.StatelessResetKey
mt *metricsTracer
}

type connListenerEntry struct {
type quicListenerEntry struct {
refCount int
ln *connListener
ln *quicListener
}

func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (*ConnManager, error) {
cm := &ConnManager{
enableReuseport: true,
enableDraft29: true,
conns: make(map[string]connListenerEntry),
quicListeners: make(map[string]quicListenerEntry),
srk: statelessResetKey,
}
for _, o := range opts {
Expand Down Expand Up @@ -104,22 +104,22 @@ func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWi
return nil, err
}

c.connsMu.Lock()
defer c.connsMu.Unlock()
c.quicListenersMu.Lock()
defer c.quicListenersMu.Unlock()

key := laddr.String()
entry, ok := c.conns[key]
entry, ok := c.quicListeners[key]
if !ok {
conn, err := c.transportForListen(netw, laddr)
tr, err := c.transportForListen(netw, laddr)
if err != nil {
return nil, err
}
ln, err := newConnListener(conn, c.serverConfig, c.enableDraft29)
ln, err := newQuicListener(tr, c.serverConfig, c.enableDraft29)
if err != nil {
return nil, err
}
key = conn.LocalAddr().String()
entry = connListenerEntry{ln: ln}
key = tr.LocalAddr().String()
entry = quicListenerEntry{ln: ln}
}
l, err := entry.ln.Add(tlsConf, allowWindowIncrease, func() { c.onListenerClosed(key) })
if err != nil {
Expand All @@ -129,21 +129,21 @@ func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWi
return nil, err
}
entry.refCount++
c.conns[key] = entry
c.quicListeners[key] = entry
return l, nil
}

func (c *ConnManager) onListenerClosed(key string) {
c.connsMu.Lock()
defer c.connsMu.Unlock()
c.quicListenersMu.Lock()
defer c.quicListenersMu.Unlock()

entry := c.conns[key]
entry := c.quicListeners[key]
entry.refCount = entry.refCount - 1
if entry.refCount <= 0 {
delete(c.conns, key)
delete(c.quicListeners, key)
entry.ln.Close()
} else {
c.conns[key] = entry
c.quicListeners[key] = entry
}
}

Expand Down
14 changes: 7 additions & 7 deletions p2p/transport/quicreuse/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func checkClosed(t *testing.T, cm *ConnManager) {
continue
}
r.mutex.Lock()
for _, conn := range r.globalListeners {
require.Zero(t, conn.GetCount())
for _, tr := range r.globalListeners {
require.Zero(t, tr.GetCount())
}
for _, conns := range r.unicast {
for _, conn := range conns {
require.Zero(t, conn.GetCount())
for _, trs := range r.unicast {
for _, tr := range trs {
require.Zero(t, tr.GetCount())
}
}
r.mutex.Unlock()
Expand Down Expand Up @@ -93,7 +93,7 @@ func testListenOnSameProto(t *testing.T, enableReuseport bool) {
// type-asserted to a UDPConn. That way, it can use all kinds of optimizations.
func TestConnectionPassedToQUICForListening(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping on windows. Not sure why this fails")
t.Skip("skipping on windows. Windows doesn't support these optimizations")
}
cm, err := NewConnManager([32]byte{}, DisableReuseport())
require.NoError(t, err)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestAcceptErrorGetCleanedUp(t *testing.T) {
// in order to enable features like batch processing and ECN.
func TestConnectionPassedToQUICForDialing(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping on windows. Not sure why this fails")
t.Skip("skipping on windows. Windows doesn't support these optimizations")
}
cm, err := NewConnManager([32]byte{}, DisableReuseport())
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions p2p/transport/quicreuse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type protoConf struct {
allowWindowIncrease func(conn quic.Connection, delta uint64) bool
}

type connListener struct {
type quicListener struct {
l *quic.Listener
transport refCountedQuicTransport
running chan struct{}
Expand All @@ -38,7 +38,7 @@ type connListener struct {
protocols map[string]protoConf
}

func newConnListener(c refCountedQuicTransport, quicConfig *quic.Config, enableDraft29 bool) (*connListener, error) {
func newQuicListener(c refCountedQuicTransport, quicConfig *quic.Config, enableDraft29 bool) (*quicListener, error) {
localMultiaddrs := make([]ma.Multiaddr, 0, 2)
a, err := ToQuicMultiaddr(c.LocalAddr(), quic.Version1)
if err != nil {
Expand All @@ -52,7 +52,7 @@ func newConnListener(c refCountedQuicTransport, quicConfig *quic.Config, enableD
}
localMultiaddrs = append(localMultiaddrs, a)
}
cl := &connListener{
cl := &quicListener{
protocols: map[string]protoConf{},
running: make(chan struct{}),
transport: c,
Expand Down Expand Up @@ -85,7 +85,7 @@ func newConnListener(c refCountedQuicTransport, quicConfig *quic.Config, enableD
return cl, nil
}

func (l *connListener) allowWindowIncrease(conn quic.Connection, delta uint64) bool {
func (l *quicListener) allowWindowIncrease(conn quic.Connection, delta uint64) bool {
l.protocolsMu.Lock()
defer l.protocolsMu.Unlock()

Expand All @@ -96,7 +96,7 @@ func (l *connListener) allowWindowIncrease(conn quic.Connection, delta uint64) b
return conf.allowWindowIncrease(conn, delta)
}

func (l *connListener) Add(tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool, onRemove func()) (Listener, error) {
func (l *quicListener) Add(tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool, onRemove func()) (Listener, error) {
l.protocolsMu.Lock()
defer l.protocolsMu.Unlock()

Expand Down Expand Up @@ -128,7 +128,7 @@ func (l *connListener) Add(tlsConf *tls.Config, allowWindowIncrease func(conn qu
return ln, nil
}

func (l *connListener) Run() error {
func (l *quicListener) Run() error {
defer close(l.running)
defer l.transport.DecreaseCount()
for {
Expand All @@ -152,7 +152,7 @@ func (l *connListener) Run() error {
}
}

func (l *connListener) Close() error {
func (l *quicListener) Close() error {
err := l.l.Close()
<-l.running // wait for Run to return
return err
Expand Down
58 changes: 29 additions & 29 deletions p2p/transport/quicreuse/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type refCountedQuicTransport interface {

Close() error

// count conn reference
// count transport reference
DecreaseCount()
IncreaseCount()

Expand Down Expand Up @@ -169,28 +169,28 @@ func (r *reuse) gc() {
case <-ticker.C:
now := time.Now()
r.mutex.Lock()
for key, conn := range r.globalListeners {
if conn.ShouldGarbageCollect(now) {
conn.Close()
for key, tr := range r.globalListeners {
if tr.ShouldGarbageCollect(now) {
tr.Close()
delete(r.globalListeners, key)
}
}
for key, conn := range r.globalDialers {
if conn.ShouldGarbageCollect(now) {
conn.Close()
for key, tr := range r.globalDialers {
if tr.ShouldGarbageCollect(now) {
tr.Close()
delete(r.globalDialers, key)
}
}
for ukey, conns := range r.unicast {
for key, conn := range conns {
if conn.ShouldGarbageCollect(now) {
conn.Close()
delete(conns, key)
for ukey, trs := range r.unicast {
for key, tr := range trs {
if tr.ShouldGarbageCollect(now) {
tr.Close()
delete(trs, key)
}
}
if len(conns) == 0 {
if len(trs) == 0 {
delete(r.unicast, ukey)
// If we've dropped all connections with a unicast binding,
// If we've dropped all transports with a unicast binding,
// assume our routes may have changed.
if len(r.unicast) == 0 {
r.routes = nil
Expand Down Expand Up @@ -236,27 +236,27 @@ func (r *reuse) TransportForDial(network string, raddr *net.UDPAddr) (*refcounte

func (r *reuse) transportForDialLocked(network string, source *net.IP) (*refcountedTransport, error) {
if source != nil {
// We already have at least one suitable connection...
if conns, ok := r.unicast[source.String()]; ok {
// We already have at least one suitable transport...
if trs, ok := r.unicast[source.String()]; ok {
// ... we don't care which port we're dialing from. Just use the first.
for _, c := range conns {
return c, nil
for _, tr := range trs {
return tr, nil
}
}
}

// Use a connection listening on 0.0.0.0 (or ::).
// Use a transport listening on 0.0.0.0 (or ::).
// Again, we don't care about the port number.
for _, conn := range r.globalListeners {
return conn, nil
for _, tr := range r.globalListeners {
return tr, nil
}

// Use a connection we've previously dialed from
for _, conn := range r.globalDialers {
return conn, nil
// Use a transport we've previously dialed from
for _, tr := range r.globalDialers {
return tr, nil
}

// We don't have a connection that we can use for dialing.
// We don't have a transport that we can use for dialing.
// Dial a new connection from a random port.
var addr *net.UDPAddr
switch network {
Expand Down Expand Up @@ -284,17 +284,17 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun
r.mutex.Lock()
defer r.mutex.Unlock()

// Check if we can reuse a connection we have already dialed out from.
// We reuse a connection from globalDialers when the requested port is 0 or the requested
// Check if we can reuse a transport we have already dialed out from.
// We reuse a transport from globalDialers when the requested port is 0 or the requested
// port is already in the globalDialers.
// If we are reusing a connection from globalDialers, we move the globalDialers entry to
// If we are reusing a transport from globalDialers, we move the globalDialers entry to
// globalListeners
if laddr.IP.IsUnspecified() {
var rTr *refcountedTransport
var localAddr *net.UDPAddr

if laddr.Port == 0 {
// the requested port is 0, we can reuse any connection
// the requested port is 0, we can reuse any transport
for _, tr := range r.globalDialers {
rTr = tr
localAddr = rTr.LocalAddr().(*net.UDPAddr)
Expand Down

0 comments on commit 7773888

Please sign in to comment.