Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
fix: port listener to ES6 class syntax (#214)
Browse files Browse the repository at this point in the history
Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
dapplion and achingbrain authored Oct 11, 2022
1 parent 493219c commit af7b8e2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 97 deletions.
10 changes: 5 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import * as mafmt from '@multiformats/mafmt'
import errCode from 'err-code'
import { logger } from '@libp2p/logger'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { createListener } from './listener.js'
import { TCPListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from '@libp2p/interfaces/errors'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport'
import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
Expand Down Expand Up @@ -155,8 +155,8 @@ export class TCP implements Transport {
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*/
createListener (options: TCPCreateListenerOptions) {
return createListener({
createListener (options: TCPCreateListenerOptions): Listener {
return new TCPListener({
...options,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
Expand All @@ -166,7 +166,7 @@ export class TCP implements Transport {
/**
* Takes a list of `Multiaddr`s and returns only valid TCP addresses
*/
filter (multiaddrs: Multiaddr[]) {
filter (multiaddrs: Multiaddr[]): Multiaddr[] {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter(ma => {
Expand Down
176 changes: 84 additions & 92 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@ import {
} from './utils.js'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection'
import type { Upgrader, Listener } from '@libp2p/interface-transport'
import type { Server } from 'net'
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'

const log = logger('libp2p:tcp:listener')

interface ServerWithMultiaddrConnections extends Server {
__connections: MultiaddrConnection[]
}

/**
* Attempts to close the given maConn. If a failure occurs, it will be logged
*/
Expand All @@ -37,20 +32,29 @@ interface Context extends TCPCreateListenerOptions {
socketCloseTimeout?: number
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader, socketInactivityTimeout, socketCloseTimeout
} = context
type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }

export class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()

context.keepAlive = context.keepAlive ?? true
private status: Status = { started: false }

let peerId: string | null
let listeningAddr: Multiaddr
constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true

this.server = net.createServer(context, this.onSocket.bind(this))

this.server
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => this.dispatchEvent(new CustomEvent('close')))
}

const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(context, socket => {
private onSocket (socket: net.Socket) {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
Expand All @@ -59,9 +63,9 @@ export function createListener (context: Context) {
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr,
socketInactivityTimeout,
socketCloseTimeout
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout
})
} catch (err) {
log.error('inbound connection failed', err)
Expand All @@ -70,16 +74,20 @@ export function createListener (context: Context) {

log('new inbound connection %s', maConn.remoteAddr)
try {
upgrader.upgradeInbound(maConn)
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
trackConn(server, maConn, socket)
this.connections.add(maConn)

if (handler != null) {
handler(conn)
socket.once('close', () => {
this.connections.delete(maConn)
})

if (this.context.handler != null) {
this.context.handler(conn)
}

listener.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
})
.catch(async err => {
log.error('inbound connection failed', err)
Expand All @@ -97,85 +105,69 @@ export function createListener (context: Context) {
log.error('closing inbound connection failed', err)
})
}
}),
// Keep track of open connections to destroy in case of timeout
{ __connections: [] })
}

const listener: Listener = Object.assign(new EventEmitter(), {
getAddrs: () => {
let addrs: Multiaddr[] = []
const address = server.address()
getAddrs () {
if (!this.status.started) {
return []
}

if (address == null) {
return []
}
let addrs: Multiaddr[] = []
const address = this.server.address()
const { listeningAddr, peerId } = this.status

if (typeof address === 'string') {
addrs = [listeningAddr]
} else {
try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
if (address == null) {
return []
}

if (typeof address === 'string') {
addrs = [listeningAddr]
} else {
try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
}
}

return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
},
listen: async (ma: Multiaddr) => {
listeningAddr = ma
peerId = ma.getPeerId()

if (peerId == null) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}
return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
}

return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', server.address())
resolve()
})
})
},
close: async () => {
if (!server.listening) {
return
}
async listen (ma: Multiaddr) {
const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma

await Promise.all(
server.__connections.map(async maConn => await attemptClose(maConn))
)
this.status = { started: true, listeningAddr, peerId }

await new Promise<void>((resolve, reject) => {
server.close(err => (err != null) ? reject(err) : resolve())
return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
this.server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', this.server.address())
resolve()
})
}
})

server
.on('listening', () => listener.dispatchEvent(new CustomEvent('listening')))
.on('error', err => listener.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => listener.dispatchEvent(new CustomEvent('close')))
})
}

return listener
}
async close () {
if (!this.server.listening) {
return
}

function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) {
server.__connections.push(maConn)
await Promise.all(
Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn))
)

const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
await new Promise<void>((resolve, reject) => {
this.server.close(err => (err != null) ? reject(err) : resolve())
})
}

socket.once('close', untrackConn)
}

0 comments on commit af7b8e2

Please sign in to comment.