Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
fix: destroy sockets on close (#204)
Browse files Browse the repository at this point in the history
* fix: destroy sockets on close

We call `.end` on a socket and wait for the `close` event, but calling `.end` only closes the writable end of the socket.

To close both ends we either need to wait for the remote to close their writable end or we need to `.destroy` our socket.  If we call `.destroy` all data is lost and no more I/O occurs.

The change here is to call `.end` then check to see if we have any outgoing writes, if we do, wait for the `drain` event which means the outgoing data has been sent, then call `.destroy`, otherwise call `.destroy` immediately.

At the same time use a timer to call `.destroy` if the `drain` event never arrives.

It also set up the `timeout` event for the socket to allow closing the socket after a period of inactivity.

Three new constructor options are added to control the behvaiour:

- `inboundSocketInactivityTimeout` the socket will be closed after this many ms of not sending/recieving data (default 30s)
- `outboundSocketInactivityTimeout` the socket will be closed after this many ms of not sending/recieving data (default 30s)
- `socketCloseTimeout` how long to wait for the `drain` event (default 2s)

Fixes #201

* chore: account for networking differences

* chore: ignore client errors
  • Loading branch information
achingbrain authored Aug 31, 2022
1 parent 3ac4be4 commit e8b8f2e
Show file tree
Hide file tree
Showing 7 changed files with 570 additions and 46 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ const upgrader = {
upgradeOutbound: maConn => maConn
}

const tcp = new TCP({ upgrader })
const tcp = new TCP()

const listener = tcp.createListener({
upgrader,
handler: (socket) => {
console.log('new connection opened')
pipe(
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"aegir": "^37.0.4",
"it-all": "^1.0.6",
"it-pipe": "^2.0.3",
"p-defer": "^4.0.0",
"sinon": "^14.0.0",
"uint8arrays": "^3.0.0"
}
Expand Down
3 changes: 3 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ export const CODE_CIRCUIT = 290

// Time to wait for a connection to close gracefully before destroying it manually
export const CLOSE_TIMEOUT = 2000

// Close the socket if there is no activity after this long in ms
export const SOCKET_TIMEOUT = 30000
36 changes: 34 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,30 @@ import type { Connection } from '@libp2p/interface-connection'

const log = logger('libp2p:tcp')

export interface TCPOptions {
/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
*/
inboundSocketInactivityTimeout?: number

/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
*/
outboundSocketInactivityTimeout?: number

/**
* When closing a socket, wait this long for it to close gracefully before it is closed more forcibly
*/
socketCloseTimeout?: number
}

export class TCP implements Transport {
private readonly opts: TCPOptions

constructor (options: TCPOptions = {}) {
this.opts = options
}

get [symbol] (): true {
return true
}
Expand All @@ -32,7 +55,12 @@ export class TCP implements Transport {
log('socket error', err)
})

const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, signal: options.signal })
const maConn = toMultiaddrConnection(socket, {
remoteAddr: ma,
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
Expand Down Expand Up @@ -108,7 +136,11 @@ export class TCP implements Transport {
* `upgrader.upgradeInbound`.
*/
createListener (options: CreateListenerOptions) {
return createListener(options)
return createListener({
...options,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
})
}

/**
Expand Down
14 changes: 10 additions & 4 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ async function attemptClose (maConn: MultiaddrConnection) {
interface Context {
handler?: (conn: Connection) => void
upgrader: Upgrader
socketInactivityTimeout?: number
socketCloseTimeout?: number
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader
handler, upgrader, socketInactivityTimeout, socketCloseTimeout
} = context

let peerId: string | null
Expand All @@ -53,7 +55,11 @@ export function createListener (context: Context) {

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, { listeningAddr })
maConn = toMultiaddrConnection(socket, {
listeningAddr,
socketInactivityTimeout,
socketCloseTimeout
})
} catch (err) {
log.error('inbound connection failed', err)
return
Expand Down Expand Up @@ -139,9 +145,9 @@ export function createListener (context: Context) {
return
}

await Promise.all([
await Promise.all(
server.__connections.map(async maConn => await attemptClose(maConn))
])
)

await new Promise<void>((resolve, reject) => {
server.close(err => (err != null) ? reject(err) : resolve())
Expand Down
130 changes: 91 additions & 39 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { logger } from '@libp2p/logger'
// @ts-expect-error no types
import toIterable from 'stream-to-it'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import { CLOSE_TIMEOUT } from './constants.js'
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
Expand All @@ -15,6 +16,8 @@ interface ToConnectionOptions {
remoteAddr?: Multiaddr
localAddr?: Multiaddr
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
}

/**
Expand All @@ -23,6 +26,8 @@ interface ToConnectionOptions {
*/
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
options = options ?? {}
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

// Check if we are connected on a unix path
if (options.listeningAddr?.getPath() != null) {
Expand All @@ -33,22 +38,51 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
options.localAddr = options.remoteAddr
}

const remoteAddr = options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? '')
const { host, port } = remoteAddr.toOptions()
const { sink, source } = toIterable.duplex(socket)

// by default there is no timeout
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s:%s socket read timeout', host, port)

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
if (socket.readable) {
err = errCode(new Error('Socket read timeout'), 'ERR_SOCKET_READ_TIMEOUT')
}

// if the socket times out due to inactivity we must manually close the connection
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout
socket.destroy(err)
})

socket.once('close', () => {
log('%s:%s socket closed', host, port)

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
})

socket.once('end', () => {
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
})

const maConn: MultiaddrConnection = {
async sink (source) {
if ((options?.signal) != null) {
source = abortableSource(source, options.signal)
}

try {
await sink((async function * () {
for await (const chunk of source) {
// Convert BufferList to Buffer
// Sink in StreamMuxer define argument as Uint8Array so chunk type infers as number which can't be sliced
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
}
})())
await sink(source)
} catch (err: any) {
// If aborted we can safely ignore
if (err.type !== 'aborted') {
Expand All @@ -58,66 +92,84 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
log(err)
}
}

// we have finished writing, send the FIN message
socket.end()
},

// Missing Type for "abortable"
source: (options.signal != null) ? abortableSource(source, options.signal) : source,

// If the remote address was passed, use it - it may have the peer ID encapsulated
remoteAddr: options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? ''),
remoteAddr,

timeline: { open: Date.now() },

async close () {
if (socket.destroyed) return
if (socket.destroyed) {
log('%s:%s socket was already destroyed when trying to close', host, port)
return
}

return await new Promise((resolve, reject) => {
log('%s:%s closing socket', host, port)
await new Promise<void>((resolve, reject) => {
const start = Date.now()

// Attempt to end the socket. If it takes longer to close than the
// timeout, destroy it manually.
const timeout = setTimeout(() => {
const { host, port } = maConn.remoteAddr.toOptions()
log(
'timeout closing socket to %s:%s after %dms, destroying it manually',
host,
port,
Date.now() - start
)

if (socket.destroyed) {
log('%s:%s is already destroyed', host, port)
resolve()
} else {
socket.destroy()
}
log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start)

resolve()
}, CLOSE_TIMEOUT).unref()
// will trigger 'error' and 'close' events that resolves promise
socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT'))
}
}, closeTimeout).unref()

socket.once('close', () => {
log('%s:%s socket closed', host, port)
// socket completely closed
clearTimeout(timeout)
resolve()
})
socket.end((err?: Error & { code?: string }) => {
clearTimeout(timeout)
maConn.timeline.close = Date.now()
if (err != null) {
return reject(err)
socket.once('error', (err: Error) => {
log('%s:%s socket error', host, port, err)

// error closing socket
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
resolve()

if (socket.destroyed) {
clearTimeout(timeout)
}

reject(err)
})

// shorten inactivity timeout
socket.setTimeout(closeTimeout)

// close writable end of the socket
socket.end()

if (socket.writableLength > 0) {
// there are outgoing bytes waiting to be sent
socket.once('drain', () => {
log('%s:%s socket drained', host, port)

// all bytes have been sent we can destroy the socket (maybe) before the timeout
socket.destroy()
})
} else {
// nothing to send, destroy immediately
socket.destroy()
}
})
}
}

socket.once('close', () => {
// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
})

return maConn
}
Loading

0 comments on commit e8b8f2e

Please sign in to comment.