From e8b8f2eaf547640f2566b18a8d061912965f2a55 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 31 Aug 2022 16:37:36 +0200 Subject: [PATCH] fix: destroy sockets on close (#204) * fix: destroy sockets on close We call `.end` on a socket and wait for the `close` event, but calling `.end` only closes the writable end of the socket. To close both ends we either need to wait for the remote to close their writable end or we need to `.destroy` our socket. If we call `.destroy` all data is lost and no more I/O occurs. The change here is to call `.end` then check to see if we have any outgoing writes, if we do, wait for the `drain` event which means the outgoing data has been sent, then call `.destroy`, otherwise call `.destroy` immediately. At the same time use a timer to call `.destroy` if the `drain` event never arrives. It also set up the `timeout` event for the socket to allow closing the socket after a period of inactivity. Three new constructor options are added to control the behvaiour: - `inboundSocketInactivityTimeout` the socket will be closed after this many ms of not sending/recieving data (default 30s) - `outboundSocketInactivityTimeout` the socket will be closed after this many ms of not sending/recieving data (default 30s) - `socketCloseTimeout` how long to wait for the `drain` event (default 2s) Fixes #201 * chore: account for networking differences * chore: ignore client errors --- README.md | 3 +- package.json | 1 + src/constants.ts | 3 + src/index.ts | 36 ++- src/listener.ts | 14 +- src/socket-to-conn.ts | 130 +++++++---- test/socket-to-conn.spec.ts | 429 ++++++++++++++++++++++++++++++++++++ 7 files changed, 570 insertions(+), 46 deletions(-) create mode 100644 test/socket-to-conn.spec.ts diff --git a/README.md b/README.md index ebca2c9..231725b 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,10 @@ const upgrader = { upgradeOutbound: maConn => maConn } -const tcp = new TCP({ upgrader }) +const tcp = new TCP() const listener = tcp.createListener({ + upgrader, handler: (socket) => { console.log('new connection opened') pipe( diff --git a/package.json b/package.json index ff43d8d..c44a79e 100644 --- a/package.json +++ b/package.json @@ -153,6 +153,7 @@ "aegir": "^37.0.4", "it-all": "^1.0.6", "it-pipe": "^2.0.3", + "p-defer": "^4.0.0", "sinon": "^14.0.0", "uint8arrays": "^3.0.0" } diff --git a/src/constants.ts b/src/constants.ts index 75362ae..af76483 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -4,3 +4,6 @@ export const CODE_CIRCUIT = 290 // Time to wait for a connection to close gracefully before destroying it manually export const CLOSE_TIMEOUT = 2000 + +// Close the socket if there is no activity after this long in ms +export const SOCKET_TIMEOUT = 30000 diff --git a/src/index.ts b/src/index.ts index 7dfc012..35aa136 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,7 +15,30 @@ import type { Connection } from '@libp2p/interface-connection' const log = logger('libp2p:tcp') +export interface TCPOptions { + /** + * An optional number in ms that is used as an inactivity timeout after which the socket will be closed + */ + inboundSocketInactivityTimeout?: number + + /** + * An optional number in ms that is used as an inactivity timeout after which the socket will be closed + */ + outboundSocketInactivityTimeout?: number + + /** + * When closing a socket, wait this long for it to close gracefully before it is closed more forcibly + */ + socketCloseTimeout?: number +} + export class TCP implements Transport { + private readonly opts: TCPOptions + + constructor (options: TCPOptions = {}) { + this.opts = options + } + get [symbol] (): true { return true } @@ -32,7 +55,12 @@ export class TCP implements Transport { log('socket error', err) }) - const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, signal: options.signal }) + const maConn = toMultiaddrConnection(socket, { + remoteAddr: ma, + signal: options.signal, + socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout, + socketCloseTimeout: this.opts.socketCloseTimeout + }) log('new outbound connection %s', maConn.remoteAddr) const conn = await options.upgrader.upgradeOutbound(maConn) log('outbound connection %s upgraded', maConn.remoteAddr) @@ -108,7 +136,11 @@ export class TCP implements Transport { * `upgrader.upgradeInbound`. */ createListener (options: CreateListenerOptions) { - return createListener(options) + return createListener({ + ...options, + socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout, + socketCloseTimeout: this.opts.socketCloseTimeout + }) } /** diff --git a/src/listener.ts b/src/listener.ts index 2e54c32..5cdb937 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -32,6 +32,8 @@ async function attemptClose (maConn: MultiaddrConnection) { interface Context { handler?: (conn: Connection) => void upgrader: Upgrader + socketInactivityTimeout?: number + socketCloseTimeout?: number } /** @@ -39,7 +41,7 @@ interface Context { */ export function createListener (context: Context) { const { - handler, upgrader + handler, upgrader, socketInactivityTimeout, socketCloseTimeout } = context let peerId: string | null @@ -53,7 +55,11 @@ export function createListener (context: Context) { let maConn: MultiaddrConnection try { - maConn = toMultiaddrConnection(socket, { listeningAddr }) + maConn = toMultiaddrConnection(socket, { + listeningAddr, + socketInactivityTimeout, + socketCloseTimeout + }) } catch (err) { log.error('inbound connection failed', err) return @@ -139,9 +145,9 @@ export function createListener (context: Context) { return } - await Promise.all([ + await Promise.all( server.__connections.map(async maConn => await attemptClose(maConn)) - ]) + ) await new Promise((resolve, reject) => { server.close(err => (err != null) ? reject(err) : resolve()) diff --git a/src/socket-to-conn.ts b/src/socket-to-conn.ts index 57cc3a2..4e6ff63 100644 --- a/src/socket-to-conn.ts +++ b/src/socket-to-conn.ts @@ -3,7 +3,8 @@ import { logger } from '@libp2p/logger' // @ts-expect-error no types import toIterable from 'stream-to-it' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' -import { CLOSE_TIMEOUT } from './constants.js' +import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js' +import errCode from 'err-code' import type { Socket } from 'net' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrConnection } from '@libp2p/interface-connection' @@ -15,6 +16,8 @@ interface ToConnectionOptions { remoteAddr?: Multiaddr localAddr?: Multiaddr signal?: AbortSignal + socketInactivityTimeout?: number + socketCloseTimeout?: number } /** @@ -23,6 +26,8 @@ interface ToConnectionOptions { */ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => { options = options ?? {} + const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT + const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT // Check if we are connected on a unix path if (options.listeningAddr?.getPath() != null) { @@ -33,8 +38,43 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti options.localAddr = options.remoteAddr } + const remoteAddr = options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? '') + const { host, port } = remoteAddr.toOptions() const { sink, source } = toIterable.duplex(socket) + // by default there is no timeout + // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback + socket.setTimeout(inactivityTimeout, () => { + log('%s:%s socket read timeout', host, port) + + // only destroy with an error if the remote has not sent the FIN message + let err: Error | undefined + if (socket.readable) { + err = errCode(new Error('Socket read timeout'), 'ERR_SOCKET_READ_TIMEOUT') + } + + // if the socket times out due to inactivity we must manually close the connection + // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout + socket.destroy(err) + }) + + socket.once('close', () => { + log('%s:%s socket closed', host, port) + + // In instances where `close` was not explicitly called, + // such as an iterable stream ending, ensure we have set the close + // timeline + if (maConn.timeline.close == null) { + maConn.timeline.close = Date.now() + } + }) + + socket.once('end', () => { + // the remote sent a FIN packet which means no more data will be sent + // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end + log('socket ended', maConn.remoteAddr.toString()) + }) + const maConn: MultiaddrConnection = { async sink (source) { if ((options?.signal) != null) { @@ -42,13 +82,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti } try { - await sink((async function * () { - for await (const chunk of source) { - // Convert BufferList to Buffer - // Sink in StreamMuxer define argument as Uint8Array so chunk type infers as number which can't be sliced - yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() - } - })()) + await sink(source) } catch (err: any) { // If aborted we can safely ignore if (err.type !== 'aborted') { @@ -58,66 +92,84 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti log(err) } } + + // we have finished writing, send the FIN message + socket.end() }, - // Missing Type for "abortable" source: (options.signal != null) ? abortableSource(source, options.signal) : source, // If the remote address was passed, use it - it may have the peer ID encapsulated - remoteAddr: options.remoteAddr ?? toMultiaddr(socket.remoteAddress ?? '', socket.remotePort ?? ''), + remoteAddr, timeline: { open: Date.now() }, async close () { - if (socket.destroyed) return + if (socket.destroyed) { + log('%s:%s socket was already destroyed when trying to close', host, port) + return + } - return await new Promise((resolve, reject) => { + log('%s:%s closing socket', host, port) + await new Promise((resolve, reject) => { const start = Date.now() // Attempt to end the socket. If it takes longer to close than the // timeout, destroy it manually. const timeout = setTimeout(() => { - const { host, port } = maConn.remoteAddr.toOptions() - log( - 'timeout closing socket to %s:%s after %dms, destroying it manually', - host, - port, - Date.now() - start - ) - if (socket.destroyed) { log('%s:%s is already destroyed', host, port) + resolve() } else { - socket.destroy() - } + log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start) - resolve() - }, CLOSE_TIMEOUT).unref() + // will trigger 'error' and 'close' events that resolves promise + socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT')) + } + }, closeTimeout).unref() socket.once('close', () => { + log('%s:%s socket closed', host, port) + // socket completely closed clearTimeout(timeout) resolve() }) - socket.end((err?: Error & { code?: string }) => { - clearTimeout(timeout) - maConn.timeline.close = Date.now() - if (err != null) { - return reject(err) + socket.once('error', (err: Error) => { + log('%s:%s socket error', host, port, err) + + // error closing socket + if (maConn.timeline.close == null) { + maConn.timeline.close = Date.now() } - resolve() + + if (socket.destroyed) { + clearTimeout(timeout) + } + + reject(err) }) + + // shorten inactivity timeout + socket.setTimeout(closeTimeout) + + // close writable end of the socket + socket.end() + + if (socket.writableLength > 0) { + // there are outgoing bytes waiting to be sent + socket.once('drain', () => { + log('%s:%s socket drained', host, port) + + // all bytes have been sent we can destroy the socket (maybe) before the timeout + socket.destroy() + }) + } else { + // nothing to send, destroy immediately + socket.destroy() + } }) } } - socket.once('close', () => { - // In instances where `close` was not explicitly called, - // such as an iterable stream ending, ensure we have set the close - // timeline - if (maConn.timeline.close == null) { - maConn.timeline.close = Date.now() - } - }) - return maConn } diff --git a/test/socket-to-conn.spec.ts b/test/socket-to-conn.spec.ts new file mode 100644 index 0000000..49c339f --- /dev/null +++ b/test/socket-to-conn.spec.ts @@ -0,0 +1,429 @@ +import { expect } from 'aegir/chai' +import { createServer, Socket, Server } from 'net' +import defer from 'p-defer' +import { toMultiaddrConnection } from '../src/socket-to-conn.js' +import os from 'os' +import type { ServerOpts, SocketConstructorOpts } from 'net' + +async function setup (opts?: { server?: ServerOpts, client?: SocketConstructorOpts }) { + const serverListening = defer() + + const server = createServer(opts?.server) + server.listen(0, () => { + serverListening.resolve() + }) + + await serverListening.promise + + const serverSocket = defer() + const clientSocket = defer() + + server.once('connection', (socket) => { + serverSocket.resolve(socket) + }) + + const address = server.address() + + if (address == null || typeof address === 'string') { + throw new Error('Wrong socket type') + } + + const client = new Socket(opts?.client) + client.once('connect', () => { + clientSocket.resolve(client) + }) + client.connect(address.port, address.address) + + return { + server, + serverSocket: await serverSocket.promise, + clientSocket: await clientSocket.promise + } +} + +describe('socket-to-conn', () => { + let server: Server + let clientSocket: Socket + let serverSocket: Socket + + afterEach(async () => { + if (serverSocket != null) { + serverSocket.destroy() + } + + if (clientSocket != null) { + clientSocket.destroy() + } + + if (server != null) { + server.close() + } + }) + + it('should destroy a socket that is closed by the client', async () => { + ({ server, clientSocket, serverSocket } = await setup()) + + // promise that is resolved when client socket is closed + const clientClosed = defer() + + // promise that is resolved when client socket errors + const clientErrored = defer() + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + // promise that is resolved when our outgoing socket errors + const serverErrored = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('close', () => { + clientClosed.resolve(true) + }) + clientSocket.once('error', err => { + clientErrored.resolve(err) + }) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + serverSocket.once('error', err => { + serverErrored.resolve(err) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + // close the client for writing + clientSocket.end() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket that is forcibly closed by the client', async () => { + ({ server, clientSocket, serverSocket } = await setup()) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + // promise that is resolved when our outgoing socket errors + const serverErrored = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + serverSocket.once('error', err => { + serverErrored.resolve(err) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + // close the client for reading and writing immediately + clientSocket.destroy() + + // client closed the connection - error code is platform specific + if (os.platform() === 'linux') { + await expect(serverErrored.promise).to.eventually.have.property('code', 'ERR_SOCKET_READ_TIMEOUT') + } else { + await expect(serverErrored.promise).to.eventually.have.property('code', 'ECONNRESET') + } + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket that is half-closed by the client', async () => { + ({ server, clientSocket, serverSocket } = await setup({ + client: { + allowHalfOpen: true + } + })) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + // promise that is resolved when our outgoing socket errors + const serverErrored = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + serverSocket.once('error', err => { + serverErrored.resolve(err) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + // close the client for writing + clientSocket.end() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // remote stopped sending us data + await expect(serverErrored.promise).to.eventually.have.property('code', 'ERR_SOCKET_READ_TIMEOUT') + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket after sinking', async () => { + ({ server, clientSocket, serverSocket } = await setup()) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + // promise that is resolved when our outgoing socket errors + const serverErrored = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + serverSocket.once('error', err => { + serverErrored.resolve(err) + }) + + // send some data between the client and server + await inboundMaConn.sink([ + Uint8Array.from([0, 1, 2, 3]) + ]) + + // server socket should no longer be writable + expect(serverSocket.writable).to.be.false() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // remote didn't send us any data + await expect(serverErrored.promise).to.eventually.have.property('code', 'ERR_SOCKET_READ_TIMEOUT') + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket when containing MultiaddrConnection is closed', async () => { + ({ server, clientSocket, serverSocket } = await setup()) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100, + socketCloseTimeout: 10 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('error', () => {}) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + await inboundMaConn.close() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => { + ({ server, clientSocket, serverSocket } = await setup({ + server: { + allowHalfOpen: true + } + })) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100, + socketCloseTimeout: 10 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('error', () => {}) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + await inboundMaConn.close() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket by timeout when containing MultiaddrConnection is closed but remote keeps sending data', async () => { + ({ server, clientSocket, serverSocket } = await setup({ + server: { + allowHalfOpen: true + } + })) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 500, + socketCloseTimeout: 100 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('error', () => {}) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + setInterval(() => { + clientSocket.write(`some data ${Date.now()}`) + }, 10).unref() + + await inboundMaConn.close() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) + + it('should destroy a socket by timeout when containing MultiaddrConnection is closed but closing remote times out', async () => { + ({ server, clientSocket, serverSocket } = await setup()) + + // promise that is resolved when our outgoing socket is closed + const serverClosed = defer() + + // promise that is resolved when our outgoing socket errors + const serverErrored = defer() + + let maConnCloseError: Error | undefined + + const inboundMaConn = toMultiaddrConnection(serverSocket, { + socketInactivityTimeout: 100, + socketCloseTimeout: 100 + }) + expect(inboundMaConn.timeline.open).to.be.ok() + expect(inboundMaConn.timeline.close).to.not.be.ok() + + clientSocket.once('error', () => {}) + + serverSocket.once('close', () => { + serverClosed.resolve(true) + }) + serverSocket.once('error', err => { + serverErrored.resolve(err) + }) + + // send some data between the client and server + clientSocket.write('hello') + serverSocket.write('goodbye') + + // stop reading data + clientSocket.pause() + + // have to write enough data quickly enough to overwhelm the client + while (serverSocket.writableLength < 1024) { + serverSocket.write('goodbyeeeeeeeeeeeeee') + } + + await inboundMaConn.close().catch(err => { + // should throw this error + maConnCloseError = err + }) + + // server socket should no longer be writable + expect(serverSocket.writable).to.be.false() + + // server socket was closed for reading and writing + await expect(serverClosed.promise).to.eventually.be.true() + + // remote didn't read our data + await expect(serverErrored.promise).to.eventually.have.property('code', 'ERR_SOCKET_CLOSE_TIMEOUT') + + // closing should have thrown + expect(maConnCloseError).to.have.property('code', 'ERR_SOCKET_CLOSE_TIMEOUT') + + // the connection closing was recorded + expect(inboundMaConn.timeline.close).to.be.a('number') + + // server socket is destroyed + expect(serverSocket.destroyed).to.be.true() + }) +})