Skip to content

Commit

Permalink
fix: refactor connection opening and closing (#2735)
Browse files Browse the repository at this point in the history
Simplifies connection opening/closing, catches some instances where
we were not destroying sockets for incoming and outgoing connections
which caused a file descriptor leak.
  • Loading branch information
achingbrain authored Sep 30, 2024
1 parent 58784ab commit 24fa1d5
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 171 deletions.
4 changes: 2 additions & 2 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ describe('circuit-relay', () => {
await deferred.promise

// should have closed connections to remote and to relay
expect(events[0].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
expect(events[1].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
expect(events[0].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
expect(events[1].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
})

it('should remove the relay event listener when the relay stops', async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/transport-tcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.2.3",
"@types/sinon": "^17.0.3",
"p-defer": "^4.0.1",
"progress-events": "^1.0.0",
"race-event": "^1.3.0",
"stream-to-it": "^1.0.1"
},
"devDependencies": {
Expand All @@ -74,7 +76,6 @@
"aegir": "^44.0.1",
"it-all": "^3.0.6",
"it-pipe": "^3.0.1",
"p-defer": "^4.0.1",
"sinon": "^18.0.0",
"uint8arrays": "^5.1.0",
"wherearewe": "^2.0.1"
Expand Down
2 changes: 1 addition & 1 deletion packages/transport-tcp/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ export const CODE_UNIX = 400
export const CLOSE_TIMEOUT = 500

// Close the socket if there is no activity after this long in ms
export const SOCKET_TIMEOUT = 5 * 60000 // 5 mins
export const SOCKET_TIMEOUT = 2 * 60000 // 2 mins
3 changes: 2 additions & 1 deletion packages/transport-tcp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ export interface TCPComponents {
}

export interface TCPMetrics {
dialerEvents: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'>
events: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'>
errors: CounterGroup<'outbound_to_connection' | 'outbound_upgrade'>
}

export function tcp (init: TCPOptions = {}): (components: TCPComponents) => Transport {
Expand Down
17 changes: 9 additions & 8 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,19 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
this.safeDispatchEvent('close')
}
})
.on('drop', () => {
this.metrics?.events.increment({ [`${this.addr} drop`]: true })
})
}

private onSocket (socket: net.Socket): void {
this.metrics?.events.increment({ [`${this.addr} connection`]: true })

if (this.status.code !== TCPListenerStatusCode.ACTIVE) {
socket.destroy()
throw new NotStartedError('Server is not listening yet')
}

// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
this.log('socket error', err)
this.metrics?.events.increment({ [`${this.addr} error`]: true })
})

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
Expand All @@ -185,11 +184,13 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `,
logger: this.context.logger
logger: this.context.logger,
direction: 'inbound'
})
} catch (err) {
} catch (err: any) {
this.log.error('inbound connection failed', err)
this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true })
socket.destroy()
return
}

Expand Down
158 changes: 81 additions & 77 deletions packages/transport-tcp/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { AbortError, InvalidParametersError, TimeoutError } from '@libp2p/interface'
import { InvalidParametersError, TimeoutError } from '@libp2p/interface'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import pDefer from 'p-defer'
import { raceEvent } from 'race-event'
import { duplex } from 'stream-to-it'
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
import { multiaddrToNetConfig } from './utils.js'
import type { ComponentLogger, MultiaddrConnection, CounterGroup } from '@libp2p/interface'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket } from 'net'
import type { DeferredPromise } from 'p-defer'

interface ToConnectionOptions {
listeningAddr?: Multiaddr
Expand All @@ -16,19 +19,23 @@ interface ToConnectionOptions {
metrics?: CounterGroup
metricPrefix?: string
logger: ComponentLogger
direction: 'inbound' | 'outbound'
}

/**
* Convert a socket into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => {
let closePromise: Promise<void> | null = null
let closePromise: DeferredPromise<void>
const log = options.logger.forComponent('libp2p:tcp:socket')
const direction = options.direction
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT
let timedout = false
let errored = false

// Check if we are connected on a unix path
if (options.listeningAddr?.getPath() != null) {
Expand All @@ -39,6 +46,19 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
options.localAddr = options.remoteAddr
}

// handle socket errors
socket.on('error', err => {
errored = true

if (!timedout) {
log.error('%s socket error - %e', direction, err)
metrics?.increment({ [`${metricPrefix}error`]: true })
}

socket.destroy()
maConn.timeline.close = Date.now()
})

let remoteAddr: Multiaddr

if (options.remoteAddr != null) {
Expand All @@ -59,37 +79,37 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio

// by default there is no timeout
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ [`${metricPrefix}timeout`]: true })
socket.setTimeout(inactivityTimeout)

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
if (socket.readable) {
err = new TimeoutError('Socket read timeout')
}
socket.once('timeout', () => {
timedout = true
log('%s %s socket read timeout', direction, lOptsStr)
metrics?.increment({ [`${metricPrefix}timeout`]: true })

// if the socket times out due to inactivity we must manually close the connection
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout
socket.destroy(err)
socket.destroy(new TimeoutError())
maConn.timeline.close = Date.now()
})

socket.once('close', () => {
log('%s socket close', lOptsStr)
metrics?.increment({ [`${metricPrefix}close`]: true })
// record metric for clean exit
if (!timedout && !errored) {
log('%s %s socket close', direction, lOptsStr)
metrics?.increment({ [`${metricPrefix}close`]: true })
}

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
socket.destroy()
maConn.timeline.close = Date.now()
})

socket.once('end', () => {
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('%s socket end', lOptsStr)
log('%s %s socket end', direction, lOptsStr)
metrics?.increment({ [`${metricPrefix}end`]: true })
})

Expand All @@ -111,7 +131,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// If the source errored the socket will already have been destroyed by
// duplex(). If the socket errored it will already be
// destroyed. There's nothing to do here except log the error & return.
log.error('%s error in sink', lOptsStr, err)
log.error('%s %s error in sink - %e', direction, lOptsStr, err)
}
}

Expand All @@ -128,100 +148,84 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio

async close (options: AbortOptions = {}) {
if (socket.closed) {
log('The %s socket is already closed', lOptsStr)
log('the %s %s socket is already closed', direction, lOptsStr)
return
}

if (socket.destroyed) {
log('The %s socket is already destroyed', lOptsStr)
log('the %s %s socket is already destroyed', direction, lOptsStr)
return
}

const abortSignalListener = (): void => {
socket.destroy(new AbortError('Destroying socket after timeout'))
if (closePromise != null) {
return closePromise.promise
}

try {
if (closePromise != null) {
log('The %s socket is already closing', lOptsStr)
await closePromise
return
}
closePromise = pDefer()

if (options.signal == null) {
const signal = AbortSignal.timeout(closeTimeout)
// close writable end of socket
socket.end()

options = {
...options,
signal
}
}
// convert EventEmitter to EventTarget
const eventTarget = socketToEventTarget(socket)

options.signal?.addEventListener('abort', abortSignalListener)
// don't wait forever to close
const signal = options.signal ?? AbortSignal.timeout(closeTimeout)

log('%s closing socket', lOptsStr)
closePromise = new Promise<void>((resolve, reject) => {
socket.once('close', () => {
// socket completely closed
log('%s socket closed', lOptsStr)
resolve()
// wait for any unsent data to be sent
if (socket.writableLength > 0) {
log('%s %s draining socket', direction, lOptsStr)
await raceEvent(eventTarget, 'drain', signal, {
errorEvent: 'error'
})
socket.once('error', (err: Error) => {
log('%s socket error', lOptsStr, err)

if (!socket.destroyed) {
reject(err)
}
// if socket is destroyed, 'closed' event will be emitted later to resolve the promise
})

// shorten inactivity timeout
socket.setTimeout(closeTimeout)

// close writable end of the socket
socket.end()

if (socket.writableLength > 0) {
// there are outgoing bytes waiting to be sent
socket.once('drain', () => {
log('%s socket drained', lOptsStr)
log('%s %s socket drained', direction, lOptsStr)
}

// all bytes have been sent we can destroy the socket (maybe) before the timeout
socket.destroy()
})
} else {
// nothing to send, destroy immediately, no need for the timeout
socket.destroy()
}
})
await Promise.all([
raceEvent(eventTarget, 'close', signal, {
errorEvent: 'error'
}),

await closePromise
// all bytes have been sent we can destroy the socket
socket.destroy()
])
} catch (err: any) {
this.abort(err)
} finally {
options.signal?.removeEventListener('abort', abortSignalListener)
closePromise.resolve()
}
},

abort: (err: Error) => {
log('%s socket abort due to error', lOptsStr, err)
log('%s %s socket abort due to error - %e', direction, lOptsStr, err)

// the abortSignalListener may already destroyed the socket with an error
if (!socket.destroyed) {
socket.destroy(err)
}
socket.destroy()

// closing a socket is always asynchronous (must wait for "close" event)
// but the tests expect this to be a synchronous operation so we have to
// set the close time here. the tests should be refactored to reflect
// reality.
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
maConn.timeline.close = Date.now()
},

log
}

return maConn
}

function socketToEventTarget (obj?: any): EventTarget {
const eventTarget = {
addEventListener: (type: any, cb: any) => {
obj.addListener(type, cb)
},
removeEventListener: (type: any, cb: any) => {
obj.removeListener(type, cb)
}
}

// @ts-expect-error partial implementation
return eventTarget
}
Loading

0 comments on commit 24fa1d5

Please sign in to comment.