From 24fa1d5af3be19f60f31261e8e0242c1747da0b2 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 30 Sep 2024 13:16:07 +0100 Subject: [PATCH] fix: refactor connection opening and closing (#2735) Simplifies connection opening/closing, catches some instances where we were not destroying sockets for incoming and outgoing connections which caused a file descriptor leak. --- .../test/circuit-relay.node.ts | 4 +- packages/transport-tcp/package.json | 3 +- packages/transport-tcp/src/constants.ts | 2 +- packages/transport-tcp/src/index.ts | 3 +- packages/transport-tcp/src/listener.ts | 17 +- packages/transport-tcp/src/socket-to-conn.ts | 158 +++++++++--------- packages/transport-tcp/src/tcp.ts | 56 ++++--- .../transport-tcp/test/socket-to-conn.spec.ts | 123 +++++++------- 8 files changed, 195 insertions(+), 171 deletions(-) diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index b15c41a7d5..d874c5408f 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -602,8 +602,8 @@ describe('circuit-relay', () => { await deferred.promise // should have closed connections to remote and to relay - expect(events[0].detail.remotePeer.toString()).to.equal(remote.peerId.toString()) - expect(events[1].detail.remotePeer.toString()).to.equal(relay1.peerId.toString()) + expect(events[0].detail.remotePeer.toString()).to.equal(relay1.peerId.toString()) + expect(events[1].detail.remotePeer.toString()).to.equal(remote.peerId.toString()) }) it('should remove the relay event listener when the relay stops', async () => { diff --git a/packages/transport-tcp/package.json b/packages/transport-tcp/package.json index beb4dcc7f3..0d075606e3 100644 --- a/packages/transport-tcp/package.json +++ b/packages/transport-tcp/package.json @@ -65,7 +65,9 @@ "@multiformats/mafmt": "^12.1.6", "@multiformats/multiaddr": "^12.2.3", "@types/sinon": "^17.0.3", + "p-defer": "^4.0.1", "progress-events": "^1.0.0", + "race-event": "^1.3.0", "stream-to-it": "^1.0.1" }, "devDependencies": { @@ -74,7 +76,6 @@ "aegir": "^44.0.1", "it-all": "^3.0.6", "it-pipe": "^3.0.1", - "p-defer": "^4.0.1", "sinon": "^18.0.0", "uint8arrays": "^5.1.0", "wherearewe": "^2.0.1" diff --git a/packages/transport-tcp/src/constants.ts b/packages/transport-tcp/src/constants.ts index 6501500744..91477850d0 100644 --- a/packages/transport-tcp/src/constants.ts +++ b/packages/transport-tcp/src/constants.ts @@ -7,4 +7,4 @@ export const CODE_UNIX = 400 export const CLOSE_TIMEOUT = 500 // Close the socket if there is no activity after this long in ms -export const SOCKET_TIMEOUT = 5 * 60000 // 5 mins +export const SOCKET_TIMEOUT = 2 * 60000 // 2 mins diff --git a/packages/transport-tcp/src/index.ts b/packages/transport-tcp/src/index.ts index 9a74dd3e93..c416d409bf 100644 --- a/packages/transport-tcp/src/index.ts +++ b/packages/transport-tcp/src/index.ts @@ -123,7 +123,8 @@ export interface TCPComponents { } export interface TCPMetrics { - dialerEvents: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'> + events: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'> + errors: CounterGroup<'outbound_to_connection' | 'outbound_upgrade'> } export function tcp (init: TCPOptions = {}): (components: TCPComponents) => Transport { diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index ffc18bfb82..b83b35d843 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -163,20 +163,19 @@ export class TCPListener extends TypedEventEmitter implements Li this.safeDispatchEvent('close') } }) + .on('drop', () => { + this.metrics?.events.increment({ [`${this.addr} drop`]: true }) + }) } private onSocket (socket: net.Socket): void { + this.metrics?.events.increment({ [`${this.addr} connection`]: true }) + if (this.status.code !== TCPListenerStatusCode.ACTIVE) { socket.destroy() throw new NotStartedError('Server is not listening yet') } - // Avoid uncaught errors caused by unstable connections - socket.on('error', err => { - this.log('socket error', err) - this.metrics?.events.increment({ [`${this.addr} error`]: true }) - }) - let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { @@ -185,11 +184,13 @@ export class TCPListener extends TypedEventEmitter implements Li socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, metricPrefix: `${this.addr} `, - logger: this.context.logger + logger: this.context.logger, + direction: 'inbound' }) - } catch (err) { + } catch (err: any) { this.log.error('inbound connection failed', err) this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true }) + socket.destroy() return } diff --git a/packages/transport-tcp/src/socket-to-conn.ts b/packages/transport-tcp/src/socket-to-conn.ts index 09dea95986..b2b88e7460 100644 --- a/packages/transport-tcp/src/socket-to-conn.ts +++ b/packages/transport-tcp/src/socket-to-conn.ts @@ -1,11 +1,14 @@ -import { AbortError, InvalidParametersError, TimeoutError } from '@libp2p/interface' +import { InvalidParametersError, TimeoutError } from '@libp2p/interface' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' +import pDefer from 'p-defer' +import { raceEvent } from 'race-event' import { duplex } from 'stream-to-it' import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js' import { multiaddrToNetConfig } from './utils.js' import type { ComponentLogger, MultiaddrConnection, CounterGroup } from '@libp2p/interface' import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' import type { Socket } from 'net' +import type { DeferredPromise } from 'p-defer' interface ToConnectionOptions { listeningAddr?: Multiaddr @@ -16,6 +19,7 @@ interface ToConnectionOptions { metrics?: CounterGroup metricPrefix?: string logger: ComponentLogger + direction: 'inbound' | 'outbound' } /** @@ -23,12 +27,15 @@ interface ToConnectionOptions { * https://github.com/libp2p/interface-transport#multiaddrconnection */ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => { - let closePromise: Promise | null = null + let closePromise: DeferredPromise const log = options.logger.forComponent('libp2p:tcp:socket') + const direction = options.direction const metrics = options.metrics const metricPrefix = options.metricPrefix ?? '' const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT + let timedout = false + let errored = false // Check if we are connected on a unix path if (options.listeningAddr?.getPath() != null) { @@ -39,6 +46,19 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio options.localAddr = options.remoteAddr } + // handle socket errors + socket.on('error', err => { + errored = true + + if (!timedout) { + log.error('%s socket error - %e', direction, err) + metrics?.increment({ [`${metricPrefix}error`]: true }) + } + + socket.destroy() + maConn.timeline.close = Date.now() + }) + let remoteAddr: Multiaddr if (options.remoteAddr != null) { @@ -59,37 +79,37 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio // by default there is no timeout // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback - socket.setTimeout(inactivityTimeout, () => { - log('%s socket read timeout', lOptsStr) - metrics?.increment({ [`${metricPrefix}timeout`]: true }) + socket.setTimeout(inactivityTimeout) - // only destroy with an error if the remote has not sent the FIN message - let err: Error | undefined - if (socket.readable) { - err = new TimeoutError('Socket read timeout') - } + socket.once('timeout', () => { + timedout = true + log('%s %s socket read timeout', direction, lOptsStr) + metrics?.increment({ [`${metricPrefix}timeout`]: true }) // if the socket times out due to inactivity we must manually close the connection // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout - socket.destroy(err) + socket.destroy(new TimeoutError()) + maConn.timeline.close = Date.now() }) socket.once('close', () => { - log('%s socket close', lOptsStr) - metrics?.increment({ [`${metricPrefix}close`]: true }) + // record metric for clean exit + if (!timedout && !errored) { + log('%s %s socket close', direction, lOptsStr) + metrics?.increment({ [`${metricPrefix}close`]: true }) + } // In instances where `close` was not explicitly called, // such as an iterable stream ending, ensure we have set the close // timeline - if (maConn.timeline.close == null) { - maConn.timeline.close = Date.now() - } + socket.destroy() + maConn.timeline.close = Date.now() }) socket.once('end', () => { // the remote sent a FIN packet which means no more data will be sent // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end - log('%s socket end', lOptsStr) + log('%s %s socket end', direction, lOptsStr) metrics?.increment({ [`${metricPrefix}end`]: true }) }) @@ -111,7 +131,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio // If the source errored the socket will already have been destroyed by // duplex(). If the socket errored it will already be // destroyed. There's nothing to do here except log the error & return. - log.error('%s error in sink', lOptsStr, err) + log.error('%s %s error in sink - %e', direction, lOptsStr, err) } } @@ -128,96 +148,66 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio async close (options: AbortOptions = {}) { if (socket.closed) { - log('The %s socket is already closed', lOptsStr) + log('the %s %s socket is already closed', direction, lOptsStr) return } if (socket.destroyed) { - log('The %s socket is already destroyed', lOptsStr) + log('the %s %s socket is already destroyed', direction, lOptsStr) return } - const abortSignalListener = (): void => { - socket.destroy(new AbortError('Destroying socket after timeout')) + if (closePromise != null) { + return closePromise.promise } try { - if (closePromise != null) { - log('The %s socket is already closing', lOptsStr) - await closePromise - return - } + closePromise = pDefer() - if (options.signal == null) { - const signal = AbortSignal.timeout(closeTimeout) + // close writable end of socket + socket.end() - options = { - ...options, - signal - } - } + // convert EventEmitter to EventTarget + const eventTarget = socketToEventTarget(socket) - options.signal?.addEventListener('abort', abortSignalListener) + // don't wait forever to close + const signal = options.signal ?? AbortSignal.timeout(closeTimeout) - log('%s closing socket', lOptsStr) - closePromise = new Promise((resolve, reject) => { - socket.once('close', () => { - // socket completely closed - log('%s socket closed', lOptsStr) - resolve() + // wait for any unsent data to be sent + if (socket.writableLength > 0) { + log('%s %s draining socket', direction, lOptsStr) + await raceEvent(eventTarget, 'drain', signal, { + errorEvent: 'error' }) - socket.once('error', (err: Error) => { - log('%s socket error', lOptsStr, err) - - if (!socket.destroyed) { - reject(err) - } - // if socket is destroyed, 'closed' event will be emitted later to resolve the promise - }) - - // shorten inactivity timeout - socket.setTimeout(closeTimeout) - - // close writable end of the socket - socket.end() - - if (socket.writableLength > 0) { - // there are outgoing bytes waiting to be sent - socket.once('drain', () => { - log('%s socket drained', lOptsStr) + log('%s %s socket drained', direction, lOptsStr) + } - // all bytes have been sent we can destroy the socket (maybe) before the timeout - socket.destroy() - }) - } else { - // nothing to send, destroy immediately, no need for the timeout - socket.destroy() - } - }) + await Promise.all([ + raceEvent(eventTarget, 'close', signal, { + errorEvent: 'error' + }), - await closePromise + // all bytes have been sent we can destroy the socket + socket.destroy() + ]) } catch (err: any) { this.abort(err) } finally { - options.signal?.removeEventListener('abort', abortSignalListener) + closePromise.resolve() } }, abort: (err: Error) => { - log('%s socket abort due to error', lOptsStr, err) + log('%s %s socket abort due to error - %e', direction, lOptsStr, err) // the abortSignalListener may already destroyed the socket with an error - if (!socket.destroyed) { - socket.destroy(err) - } + socket.destroy() // closing a socket is always asynchronous (must wait for "close" event) // but the tests expect this to be a synchronous operation so we have to // set the close time here. the tests should be refactored to reflect // reality. - if (maConn.timeline.close == null) { - maConn.timeline.close = Date.now() - } + maConn.timeline.close = Date.now() }, log @@ -225,3 +215,17 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio return maConn } + +function socketToEventTarget (obj?: any): EventTarget { + const eventTarget = { + addEventListener: (type: any, cb: any) => { + obj.addListener(type, cb) + }, + removeEventListener: (type: any, cb: any) => { + obj.removeListener(type, cb) + } + } + + // @ts-expect-error partial implementation + return eventTarget +} diff --git a/packages/transport-tcp/src/tcp.ts b/packages/transport-tcp/src/tcp.ts index 9b039988f0..ecf7b5212d 100644 --- a/packages/transport-tcp/src/tcp.ts +++ b/packages/transport-tcp/src/tcp.ts @@ -36,7 +36,7 @@ import { TCPListener } from './listener.js' import { toMultiaddrConnection } from './socket-to-conn.js' import { multiaddrToNetConfig } from './utils.js' import type { TCPComponents, TCPCreateListenerOptions, TCPDialEvents, TCPDialOptions, TCPMetrics, TCPOptions } from './index.js' -import type { Logger, Connection, Transport, Listener } from '@libp2p/interface' +import type { Logger, Connection, Transport, Listener, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net' @@ -53,7 +53,11 @@ export class TCP implements Transport { if (components.metrics != null) { this.metrics = { - dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_events_total', { + events: components.metrics.registerCounterGroup('libp2p_tcp_dialer_events_total', { + label: 'event', + help: 'Total count of TCP dialer events by type' + }), + errors: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', { label: 'event', help: 'Total count of TCP dialer events by type' }) @@ -76,23 +80,28 @@ export class TCP implements Transport { // options.signal destroys the socket before 'connect' event const socket = await this._connect(ma, options) - // Avoid uncaught errors caused by unstable connections - socket.on('error', err => { - this.log('socket error', err) - }) + let maConn: MultiaddrConnection - const maConn = toMultiaddrConnection(socket, { - remoteAddr: ma, - socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout, - socketCloseTimeout: this.opts.socketCloseTimeout, - metrics: this.metrics?.dialerEvents, - logger: this.components.logger - }) + try { + maConn = toMultiaddrConnection(socket, { + remoteAddr: ma, + socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout, + socketCloseTimeout: this.opts.socketCloseTimeout, + metrics: this.metrics?.events, + logger: this.components.logger, + direction: 'outbound' + }) + } catch (err: any) { + this.metrics?.errors.increment({ outbound_to_connection: true }) + socket.destroy(err) + throw err + } try { this.log('new outbound connection %s', maConn.remoteAddr) return await options.upgrader.upgradeOutbound(maConn, options) } catch (err: any) { + this.metrics?.errors.increment({ outbound_upgrade: true }) this.log.error('error upgrading outbound connection', err) maConn.abort(err) throw err @@ -103,6 +112,8 @@ export class TCP implements Transport { options.signal?.throwIfAborted() options.onProgress?.(new CustomProgressEvent('tcp:open-connection')) + let rawSocket: Socket + return new Promise((resolve, reject) => { const start = Date.now() const cOpts = multiaddrToNetConfig(ma, { @@ -111,35 +122,34 @@ export class TCP implements Transport { }) as (IpcSocketConnectOpts & TcpSocketConnectOpts) this.log('dialing %a', ma) - const rawSocket = net.connect(cOpts) + rawSocket = net.connect(cOpts) const onError = (err: Error): void => { + this.log.error('dial to %a errored - %e', ma, err) const cOptsStr = cOpts.path ?? `${cOpts.host ?? ''}:${cOpts.port}` err.message = `connection error ${cOptsStr}: ${err.message}` - this.metrics?.dialerEvents.increment({ error: true }) - + this.metrics?.events.increment({ error: true }) done(err) } const onTimeout = (): void => { this.log('connection timeout %a', ma) - this.metrics?.dialerEvents.increment({ timeout: true }) + this.metrics?.events.increment({ timeout: true }) - const err = new TimeoutError(`connection timeout after ${Date.now() - start}ms`) + const err = new TimeoutError(`Connection timeout after ${Date.now() - start}ms`) // Note: this will result in onError() being called rawSocket.emit('error', err) } const onConnect = (): void => { this.log('connection opened %a', ma) - this.metrics?.dialerEvents.increment({ connect: true }) + this.metrics?.events.increment({ connect: true }) done() } const onAbort = (): void => { this.log('connection aborted %a', ma) - this.metrics?.dialerEvents.increment({ abort: true }) - rawSocket.destroy() + this.metrics?.events.increment({ abort: true }) done(new AbortError()) } @@ -167,6 +177,10 @@ export class TCP implements Transport { options.signal.addEventListener('abort', onAbort) } }) + .catch(err => { + rawSocket?.destroy() + throw err + }) } /** diff --git a/packages/transport-tcp/test/socket-to-conn.spec.ts b/packages/transport-tcp/test/socket-to-conn.spec.ts index 4750533a32..599095fadb 100644 --- a/packages/transport-tcp/test/socket-to-conn.spec.ts +++ b/packages/transport-tcp/test/socket-to-conn.spec.ts @@ -1,5 +1,4 @@ import { createServer, Socket, type Server, type ServerOpts, type SocketConstructorOpts } from 'net' -import os from 'os' import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' import defer from 'p-defer' @@ -78,7 +77,8 @@ describe('socket-to-conn', () => { const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -121,11 +121,12 @@ describe('socket-to-conn', () => { const serverClosed = defer() // promise that is resolved when our outgoing socket errors - const serverErrored = defer() + const serverErrored = defer() const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -144,11 +145,11 @@ describe('socket-to-conn', () => { // close the client for reading and writing immediately clientSocket.destroy() - // client closed the connection - error code is platform specific - if (os.platform() === 'linux') { - await expect(serverErrored.promise).to.eventually.have.property('name', 'TimeoutError') - } else { - await expect(serverErrored.promise).to.eventually.have.property('code', 'ECONNRESET') + const error = await serverErrored.promise + + // the error can be of either type + if (error.name !== 'TimeoutError' && error.code !== 'ECONNRESET') { + expect.fail('promise rejected with unknown error type') } // server socket was closed for reading and writing @@ -168,15 +169,18 @@ describe('socket-to-conn', () => { } })) + clientSocket.setTimeout(100) + // promise that is resolved when our outgoing socket is closed const serverClosed = defer() - // promise that is resolved when our outgoing socket errors - const serverErrored = defer() + // promise that is resolved when the incoming socket is closed + const clientClosed = defer() const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -184,8 +188,12 @@ describe('socket-to-conn', () => { serverSocket.once('close', () => { serverClosed.resolve(true) }) - serverSocket.once('error', err => { - serverErrored.resolve(err) + + clientSocket.once('close', () => { + clientClosed.resolve(true) + }) + clientSocket.once('timeout', () => { + clientSocket.destroy() }) // send some data between the client and server @@ -195,11 +203,13 @@ describe('socket-to-conn', () => { // close the client for writing clientSocket.end() - // server socket was closed for reading and writing - await expect(serverClosed.promise).to.eventually.be.true() + await Promise.all([ + // server socket was closed for reading and writing + expect(serverClosed.promise).to.eventually.be.true(), - // remote stopped sending us data - await expect(serverErrored.promise).to.eventually.have.property('name', 'TimeoutError') + // remote socket was closed by server + expect(clientClosed.promise).to.eventually.be.true() + ]) // the connection closing was recorded expect(inboundMaConn.timeline.close).to.be.a('number') @@ -214,12 +224,10 @@ describe('socket-to-conn', () => { // promise that is resolved when our outgoing socket is closed const serverClosed = defer() - // promise that is resolved when our outgoing socket errors - const serverErrored = defer() - const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -227,9 +235,6 @@ describe('socket-to-conn', () => { serverSocket.once('close', () => { serverClosed.resolve(true) }) - serverSocket.once('error', err => { - serverErrored.resolve(err) - }) // send some data between the client and server await inboundMaConn.sink(async function * () { @@ -242,9 +247,6 @@ describe('socket-to-conn', () => { // server socket was closed for reading and writing await expect(serverClosed.promise).to.eventually.be.true() - // remote didn't send us any data - await expect(serverErrored.promise).to.eventually.have.property('name', 'TimeoutError') - // the connection closing was recorded expect(inboundMaConn.timeline.close).to.be.a('number') @@ -261,7 +263,8 @@ describe('socket-to-conn', () => { const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, socketCloseTimeout: 10, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -300,8 +303,6 @@ describe('socket-to-conn', () => { } }) - // spy on `.destroy()` invocations - const serverSocketDestroySpy = Sinon.spy(serverSocket, 'destroy') // promise that is resolved when our outgoing socket is closed const serverClosed = defer() const socketCloseTimeout = 10 @@ -309,7 +310,8 @@ describe('socket-to-conn', () => { const inboundMaConn = toMultiaddrConnection(proxyServerSocket, { socketInactivityTimeout: 100, socketCloseTimeout, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -344,11 +346,10 @@ describe('socket-to-conn', () => { expect(serverSocket.destroyed).to.be.true() // the server socket was only closed once - expect(serverSocketDestroySpy.callCount).to.equal(1) expect(addEventListenerSpy.callCount).to.equal(1) }) - it('should destroy a socket by timeout when containing MultiaddrConnection is closed', async () => { + it('should destroy a socket when incoming MultiaddrConnection is closed', async () => { ({ server, clientSocket, serverSocket } = await setup({ server: { allowHalfOpen: true @@ -361,7 +362,8 @@ describe('socket-to-conn', () => { const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, socketCloseTimeout: 10, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -388,7 +390,7 @@ describe('socket-to-conn', () => { expect(serverSocket.destroyed).to.be.true() }) - it('should destroy a socket by timeout when containing MultiaddrConnection is closed but remote keeps sending data', async () => { + it('should destroy a socket when incoming MultiaddrConnection is closed but remote keeps sending data', async () => { ({ server, clientSocket, serverSocket } = await setup({ server: { allowHalfOpen: true @@ -401,7 +403,8 @@ describe('socket-to-conn', () => { const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 500, socketCloseTimeout: 100, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() @@ -432,58 +435,58 @@ describe('socket-to-conn', () => { expect(serverSocket.destroyed).to.be.true() }) - it('should destroy a socket by timeout when containing MultiaddrConnection is closed but closing remote times out', async () => { + it('should destroy a socket by inactivity timeout', async () => { ({ server, clientSocket, serverSocket } = await setup()) // promise that is resolved when our outgoing socket is closed const serverClosed = defer() - // promise that is resolved when our outgoing socket errors - const serverErrored = defer() + // promise that resolves when reading from the outgoing socket times out + const serverTimedOut = defer() + + const clientError = defer() const inboundMaConn = toMultiaddrConnection(serverSocket, { socketInactivityTimeout: 100, socketCloseTimeout: 100, - logger: defaultLogger() + logger: defaultLogger(), + direction: 'inbound' }) expect(inboundMaConn.timeline.open).to.be.ok() expect(inboundMaConn.timeline.close).to.not.be.ok() - clientSocket.once('error', () => {}) + clientSocket.once('error', (err) => { + clientError.resolve(err) + }) serverSocket.once('close', () => { serverClosed.resolve(true) }) - serverSocket.once('error', err => { - serverErrored.resolve(err) + serverSocket.once('timeout', () => { + serverTimedOut.resolve(true) }) // send some data between the client and server clientSocket.write('hello') serverSocket.write('goodbye') - // stop reading data - clientSocket.pause() + // ...send no more data - // have to write enough data quickly enough to overwhelm the client - while (serverSocket.writableLength < 1024) { - serverSocket.write('goodbyeeeeeeeeeeeeee') - } + // wait for server to time out socket + await Promise.all([ + // server socket timed out reading from the client + expect(serverTimedOut.promise).to.eventually.be.true(), - await inboundMaConn.close() + // server socket was closed for reading and writing + expect(serverClosed.promise).to.eventually.be.true(), + + // client connection was closed abruptly + expect(clientError.promise).to.eventually.have.property('code', 'ECONNRESET') + ]) // server socket should no longer be writable expect(serverSocket.writable).to.be.false() - // server socket was closed for reading and writing - await expect(serverClosed.promise).to.eventually.be.true() - - // remote didn't read our data - await expect(serverErrored.promise).to.eventually.have.property('name', 'AbortError') - - // the connection closing was recorded - expect(inboundMaConn.timeline.close).to.be.a('number') - // server socket is destroyed expect(serverSocket.destroyed).to.be.true() })