Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
Close TCP server on maxConnections
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Oct 14, 2022
1 parent f5c6d00 commit 507c19d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
101 changes: 87 additions & 14 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
import { CODE_P2P } from './constants.js'
import {
getMultiaddrs,
multiaddrToNetConfig
multiaddrToNetConfig,
NetConfig
} from './utils.js'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection'
Expand All @@ -25,15 +26,27 @@ async function attemptClose (maConn: MultiaddrConnection) {
}
}

export interface LimitServerConnectionsOpts {
acceptBelow: number
rejectAbove: number
onListenError?: (err: Error) => void
}

interface Context extends TCPCreateListenerOptions {
handler?: (conn: Connection) => void
upgrader: Upgrader
socketInactivityTimeout?: number
socketCloseTimeout?: number
maxConnections?: number
limitServerConnections?: LimitServerConnectionsOpts
}

type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
type Status = {started: false} | {
started: true
listeningAddr: Multiaddr
peerId: string | null
netConfig: NetConfig
}

export class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
private readonly server: net.Server
Expand Down Expand Up @@ -89,12 +102,33 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene

socket.once('close', () => {
this.connections.delete(maConn)

if (
this.context.limitServerConnections != null &&
this.connections.size < this.context.limitServerConnections.acceptBelow
) {
// The most likely case of error is if the port taken by this application is binded by
// another process during the time the server if closed. In that case there's not much
// we can do. netListen() will be called again every time a connection is dropped, which
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
this.netListen().catch(e => {
log.error('error attempting to listen server once connection count under limit', e)
this.context.limitServerConnections?.onListenError?.(e as Error)
})
}
})

if (this.context.handler != null) {
this.context.handler(conn)
}

if (
this.context.limitServerConnections != null &&
this.connections.size >= this.context.limitServerConnections.rejectAbove
) {
this.netClose()
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
})
.catch(async err => {
Expand Down Expand Up @@ -148,21 +182,21 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

async listen (ma: Multiaddr) {
if (this.status.started) {
throw Error('server is already listening')
}

const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma

this.status = { started: true, listeningAddr, peerId }
this.status = {
started: true,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr)
}

return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
this.server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', this.server.address())
resolve()
})
})
await this.netListen()
}

async close () {
Expand All @@ -174,8 +208,47 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn))
)

await this.netClose()
}

private async netListen (): Promise<void> {
if (!this.status.started || this.server.listening) {
return
}

const netConfig = this.status.netConfig

await new Promise<void>((resolve, reject) => {
this.server.close(err => (err != null) ? reject(err) : resolve())
// NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error'
this.server.once('error', reject)
this.server.listen(netConfig, resolve)
})

log('Listening on %s', this.server.address())
}

private netClose (): void {
if (!this.status.started || !this.server.listening) {
return
}

log('Closing server on %s', this.server.address())

// NodeJS implementation tracks listening status with `this._handle` property.
// - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown
// - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675
// - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN
//
// NOTE: Both listen and close are technically not async actions, so it's not necessary to track
// states 'pending-close' or 'pending-listen'

// From docs https://nodejs.org/api/net.html#serverclosecallback
// Stops the server from accepting new connections and keeps existing connections.
// 'close' event is emitted only emitted when all connections are ended.
// The optional callback will be called once the 'close' event occurs.
//
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
// to pass a callback to close.
this.server.close()
}
}
4 changes: 3 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import path from 'path'

const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }

export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) {
export type NetConfig = ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts)

export function multiaddrToNetConfig (addr: Multiaddr): NetConfig {
const listenPath = addr.getPath()

// unix socket listening
Expand Down

0 comments on commit 507c19d

Please sign in to comment.