diff --git a/src/listener.ts b/src/listener.ts index 63dafed..f3ea8ea 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js' import { CODE_P2P } from './constants.js' import { getMultiaddrs, - multiaddrToNetConfig + multiaddrToNetConfig, + NetConfig } from './utils.js' import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection' @@ -25,15 +26,27 @@ async function attemptClose (maConn: MultiaddrConnection) { } } +export interface LimitServerConnectionsOpts { + acceptBelow: number + rejectAbove: number + onListenError?: (err: Error) => void +} + interface Context extends TCPCreateListenerOptions { handler?: (conn: Connection) => void upgrader: Upgrader socketInactivityTimeout?: number socketCloseTimeout?: number maxConnections?: number + limitServerConnections?: LimitServerConnectionsOpts } -type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null } +type Status = {started: false} | { + started: true + listeningAddr: Multiaddr + peerId: string | null + netConfig: NetConfig +} export class TCPListener extends EventEmitter implements Listener { private readonly server: net.Server @@ -89,12 +102,33 @@ export class TCPListener extends EventEmitter implements Listene socket.once('close', () => { this.connections.delete(maConn) + + if ( + this.context.limitServerConnections != null && + this.connections.size < this.context.limitServerConnections.acceptBelow + ) { + // The most likely case of error is if the port taken by this application is binded by + // another process during the time the server if closed. In that case there's not much + // we can do. netListen() will be called again every time a connection is dropped, which + // acts as an eventual retry mechanism. onListenError allows the consumer act on this. + this.netListen().catch(e => { + log.error('error attempting to listen server once connection count under limit', e) + this.context.limitServerConnections?.onListenError?.(e as Error) + }) + } }) if (this.context.handler != null) { this.context.handler(conn) } + if ( + this.context.limitServerConnections != null && + this.connections.size >= this.context.limitServerConnections.rejectAbove + ) { + this.netClose() + } + this.dispatchEvent(new CustomEvent('connection', { detail: conn })) }) .catch(async err => { @@ -148,21 +182,21 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr) { + if (this.status.started) { + throw Error('server is already listening') + } + const peerId = ma.getPeerId() const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma - this.status = { started: true, listeningAddr, peerId } + this.status = { + started: true, + listeningAddr, + peerId, + netConfig: multiaddrToNetConfig(listeningAddr) + } - return await new Promise((resolve, reject) => { - const options = multiaddrToNetConfig(listeningAddr) - this.server.listen(options, (err?: any) => { - if (err != null) { - return reject(err) - } - log('Listening on %s', this.server.address()) - resolve() - }) - }) + await this.netListen() } async close () { @@ -174,8 +208,47 @@ export class TCPListener extends EventEmitter implements Listene Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn)) ) + await this.netClose() + } + + private async netListen (): Promise { + if (!this.status.started || this.server.listening) { + return + } + + const netConfig = this.status.netConfig + await new Promise((resolve, reject) => { - this.server.close(err => (err != null) ? reject(err) : resolve()) + // NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error' + this.server.once('error', reject) + this.server.listen(netConfig, resolve) }) + + log('Listening on %s', this.server.address()) + } + + private netClose (): void { + if (!this.status.started || !this.server.listening) { + return + } + + log('Closing server on %s', this.server.address()) + + // NodeJS implementation tracks listening status with `this._handle` property. + // - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown + // - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675 + // - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN + // + // NOTE: Both listen and close are technically not async actions, so it's not necessary to track + // states 'pending-close' or 'pending-listen' + + // From docs https://nodejs.org/api/net.html#serverclosecallback + // Stops the server from accepting new connections and keeps existing connections. + // 'close' event is emitted only emitted when all connections are ended. + // The optional callback will be called once the 'close' event occurs. + // + // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary + // to pass a callback to close. + this.server.close() } } diff --git a/src/utils.ts b/src/utils.ts index dcdad4a..502c9b1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -6,7 +6,9 @@ import path from 'path' const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } -export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) { +export type NetConfig = ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) + +export function multiaddrToNetConfig (addr: Multiaddr): NetConfig { const listenPath = addr.getPath() // unix socket listening