Skip to content

Commit

Permalink
Fix the bug of listner intermediary state
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Sep 18, 2023
1 parent 7f029e8 commit 20b7929
Showing 1 changed file with 57 additions and 21 deletions.
78 changes: 57 additions & 21 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,21 @@ export interface TCPListenerMetrics {
events: CounterGroup
}

type Status = { started: false } | {
started: true
enum TCPListenerStatusCode {
/**
* When server object is initialized but we don't know the listening address yet or
* the server object is stopped manually, can be resumed only by calling listen()
**/
INERT = 'inert',
/* When listener is aware of the address but the server is not started listening */
INITIALIZED = 'initializing',
LISTENING = 'listening',
/* During the connection limits */
PAUSED = 'paused',
}

type Status = { code: TCPListenerStatusCode.INERT } | {
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.INERT>
listeningAddr: Multiaddr
peerId: string | null
netConfig: NetConfig
Expand All @@ -66,7 +79,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { started: false }
private status: Status = { code: TCPListenerStatusCode.INERT }
private metrics?: TCPListenerMetrics
private addr: string

Expand Down Expand Up @@ -144,6 +157,8 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
if(this.status.code === TCPListenerStatusCode.PAUSED) return

this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
})
Expand All @@ -152,6 +167,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

private onSocket (socket: net.Socket): void {
if(this.status.code === TCPListenerStatusCode.INERT) {
throw new Error('Server is is not listening yet')
}
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
Expand All @@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
listeningAddr: this.status.code ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events,
Expand Down Expand Up @@ -191,7 +209,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// another process during the time the server if closed. In that case there's not much
// we can do. netListen() will be called again every time a connection is dropped, which
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
this.netListen().catch(e => {
this.resume().catch(e => {
log.error('error attempting to listen server once connection count under limit', e)
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
})
Expand All @@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.context.closeServerOnMaxConnections != null &&
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
) {
this.netClose()
this.pause(false).catch(e => {
log.error('error attempting to close server once connection count over limit', e)
})
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
Expand All @@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

getAddrs (): Multiaddr[] {
if (!this.status.started) {
if (this.status.code === TCPListenerStatusCode.INERT) {
return []
}

Expand Down Expand Up @@ -264,7 +284,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

async listen (ma: Multiaddr): Promise<void> {
if (this.status.started) {
if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) {
throw Error('server is already listening')
}

Expand All @@ -273,26 +293,31 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
const { backlog } = this.context

this.status = {
started: true,
code: TCPListenerStatusCode.INITIALIZED,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
}

await this.netListen()
await this.resume()
}

async close (): Promise<void> {
await Promise.all(
Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) })
)

// netClose already checks if server.listening
this.netClose()
await this.pause(true)
}

private async netListen (): Promise<void> {
if (!this.status.started || this.server.listening) {
/**
* Can resume a stopped or start an inert server
*/
private async resume (): Promise<void> {
if (
!(this.status.code === TCPListenerStatusCode.INITIALIZED ||
this.status.code === TCPListenerStatusCode.PAUSED) ||
this.server.listening) {
return
}

Expand All @@ -303,12 +328,20 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.server.once('error', reject)
this.server.listen(netConfig, resolve)
})

this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING }
log('Listening on %s', this.server.address())
}

private netClose (): void {
if (!this.status.started || !this.server.listening) {
private async pause (permanent: boolean): Promise<void> {
if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.INERT }
return
}

if (
!(this.status.code === TCPListenerStatusCode.INITIALIZED ||
this.status.code === TCPListenerStatusCode.LISTENING) ||
!this.server.listening) {
return
}

Expand All @@ -326,9 +359,12 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// Stops the server from accepting new connections and keeps existing connections.
// 'close' event is emitted only emitted when all connections are ended.
// The optional callback will be called once the 'close' event occurs.
//
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
// to pass a callback to close.
this.server.close()

// We need to set this status before closing server, so other procedures are aware
// during the time the server is closing
this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
await new Promise<void>((resolve, reject) => {
this.server.close( err => { err ? reject(err) : resolve() })
})
}
}

0 comments on commit 20b7929

Please sign in to comment.