Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
fix: update to latest interfaces (#170)
Browse files Browse the repository at this point in the history
Updates interface tests
  • Loading branch information
achingbrain authored Feb 21, 2022
1 parent 3953791 commit d8840e8
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 154 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import { TCP } from '@libp2p/tcp'
import { Multiaddr } from '@multiformats/multiaddr'
import pipe from 'it-pipe'
import { collect } from 'streaming-iterables'
import all from 'it-all'

// A simple upgrader that just returns the MultiaddrConnection
const upgrader = {
Expand All @@ -66,7 +66,7 @@ console.log('listening')
const socket = await tcp.dial(addr)
const values = await pipe(
socket,
collect
all
)
console.log(`Value: ${values.toString()}`)

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@
"@libp2p/interface-compliance-tests": "^1.1.2",
"@libp2p/interfaces": "^1.3.2",
"aegir": "^36.1.3",
"it-all": "^1.0.6",
"it-pipe": "^2.0.3",
"sinon": "^13.0.0",
"streaming-iterables": "^6.0.0",
"uint8arrays": "^3.0.0"
}
}
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { createListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from 'abortable-iterator'
import { CODE_CIRCUIT, CODE_P2P } from './constants.js'
import type { Transport, Upgrader, ListenerOptions, Listener } from '@libp2p/interfaces/transport'
import type { Transport, Upgrader, ListenerOptions } from '@libp2p/interfaces/transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Socket } from 'net'

Expand Down Expand Up @@ -125,7 +125,7 @@ export class TCP implements Transport<DialOptions, ListenerOptions> {
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*/
createListener (options: ListenerOptions = {}): Listener {
createListener (options: ListenerOptions = {}) {
return createListener({ upgrader: this._upgrader, ...options })
}

Expand Down
246 changes: 113 additions & 133 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import {
} from './utils.js'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces'
import type { Connection } from '@libp2p/interfaces/connection'
import type { MultiaddrConnection, Upgrader, Listener, ListenerEvents, ConnectionHandler } from '@libp2p/interfaces/transport'
import type { MultiaddrConnection, Upgrader, Listener } from '@libp2p/interfaces/transport'
import type { Server } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'

const log = logger('libp2p:tcp:listener')

interface ServerWithMultiaddrConnections extends Server {
__connections: MultiaddrConnection[]
}

/**
* Attempts to close the given maConn. If a failure occurs, it will be logged
*/
Expand All @@ -30,159 +34,135 @@ interface Context {
upgrader: Upgrader
}

class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
private peerId?: string
private listeningAddr?: Multiaddr
private readonly server: Server
private connections: MultiaddrConnection[]

constructor (upgrader: Upgrader, handler?: ConnectionHandler) {
super()

this.connections = []

this.server = net.createServer(socket => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
})
/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader
} = context

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, { listeningAddr: this.listeningAddr })
} catch (err) {
log.error('inbound connection failed', err)
return
}
let peerId: string | null
let listeningAddr: Multiaddr

log('new inbound connection %s', maConn.remoteAddr)
try {
upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)

this.trackConn(maConn, socket)

if (handler != null) {
handler(conn)
}

this.dispatchEvent(new CustomEvent('connection', {
detail: conn
}))
})
.catch(async err => {
log.error('inbound connection failed', err)

await attemptClose(maConn)
})
.catch(err => {
log.error('closing inbound connection failed', err)
})
} catch (err) {
log.error('inbound connection failed', err)

attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
})
}
})
this.server.on('error', err => {
this.dispatchEvent(new CustomEvent('error', {
detail: err
}))
const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(socket => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
})
this.server.on('close', () => {
this.dispatchEvent(new CustomEvent('close'))
})
this.server.on('listening', () => {
this.dispatchEvent(new CustomEvent('listening'))
})
}

getAddrs () {
let addrs: Multiaddr[] = []
const address = this.server.address()

if (address == null) {
throw new Error('Listener is not ready yet')
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, { listeningAddr })
} catch (err) {
log.error('inbound connection failed', err)
return
}

if (typeof address === 'string') {
throw new Error('Incorrect server address type')
log('new inbound connection %s', maConn.remoteAddr)
try {
upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
trackConn(server, maConn, socket)

if (handler != null) {
handler(conn)
}

listener.dispatchEvent(new CustomEvent('connection', { detail: conn }))
})
.catch(async err => {
log.error('inbound connection failed', err)

await attemptClose(maConn)
})
.catch(err => {
log.error('closing inbound connection failed', err)
})
} catch (err) {
log.error('inbound connection failed', err)

attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
})
}
}),
// Keep track of open connections to destroy in case of timeout
{ __connections: [] })

if (this.listeningAddr == null) {
throw new Error('Listener is not ready yet')
}
const listener: Listener = Object.assign(new EventEmitter(), {
getAddrs: () => {
let addrs: Multiaddr[] = []
const address = server.address()

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (this.listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
if (address == null) {
throw new Error('Listener is not ready yet')
}

return addrs.map(ma => this.peerId != null ? ma.encapsulate(`/p2p/${this.peerId}`) : ma)
}
if (typeof address === 'string') {
throw new Error('Incorrect server address type')
}

async listen (ma: Multiaddr) {
const peerId = ma.getPeerId()
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

if (peerId == null) {
ma = ma.decapsulateCode(CODE_P2P)
} else {
this.peerId = peerId
}
return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
},
listen: async (ma: Multiaddr) => {
listeningAddr = ma
peerId = ma.getPeerId()

this.listeningAddr = ma
if (peerId == null) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}

return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(ma)
this.server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', this.server.address())
resolve()
return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', server.address())
resolve()
})
})
})
}
},
close: async () => {
if (!server.listening) {
return
}

async close () {
if (!this.server.listening) {
return
await Promise.all([
server.__connections.map(async maConn => await attemptClose(maConn))
])

await new Promise<void>((resolve, reject) => {
server.close(err => (err != null) ? reject(err) : resolve())
})
}
})

await Promise.all([
this.connections.map(async maConn => await attemptClose(maConn))
])
server
.on('listening', () => listener.dispatchEvent(new CustomEvent('listening')))
.on('error', err => listener.dispatchEvent(new CustomEvent('error', { detail: err })))
.on('close', () => listener.dispatchEvent(new CustomEvent('close')))

await new Promise<void>((resolve, reject) => {
this.server.close(err => (err != null) ? reject(err) : resolve())
})
}

trackConn (maConn: MultiaddrConnection, socket: net.Socket) {
this.connections.push(maConn)
return listener
}

const untrackConn = () => {
this.connections = this.connections.filter(c => c !== maConn)
}
function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) {
server.__connections.push(maConn)

socket.once('close', untrackConn)
const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
}
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader
} = context

return new TCPListener(upgrader, handler)
socket.once('close', untrackConn)
}
2 changes: 1 addition & 1 deletion test/connection.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'aegir/utils/chai.js'
import { TCP } from '../src/index.js'
import { Multiaddr } from '@multiformats/multiaddr'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/transport/utils'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import type { Connection } from '@libp2p/interfaces/connection'

describe('valid localAddr and remoteAddr', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/filter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'aegir/utils/chai.js'
import { TCP } from '../src/index.js'
import { Multiaddr } from '@multiformats/multiaddr'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/transport/utils'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'

describe('filter addrs', () => {
const base = '/ip4/127.0.0.1'
Expand Down
Loading

0 comments on commit d8840e8

Please sign in to comment.