Skip to content

Commit

Permalink
refactor: remove abortable-iterator usage (#1785)
Browse files Browse the repository at this point in the history
Replaces abortable-iterator with a simple abort signal listener that just aborts the stream.

Closes #2001
Closes #2004
Closes #2020

---------

Co-authored-by: chad <chad.nehemiah94@gmail.com>
Co-authored-by: Alex Potsides <alex@achingbrain.net>
  • Loading branch information
3 people authored Sep 5, 2023
1 parent 0634e3b commit 2b755a8
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 31 deletions.
1 change: 0 additions & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
"@multiformats/mafmt": "^12.1.2",
"@multiformats/multiaddr": "^12.1.5",
"@multiformats/multiaddr-matcher": "^1.0.0",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"datastore-core": "^9.0.1",
"delay": "^6.0.0",
Expand Down
27 changes: 20 additions & 7 deletions packages/libp2p/src/autonat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
*/

import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { abortableDuplex } from 'abortable-iterator'
import first from 'it-first'
import * as lp from 'it-length-prefixed'
import map from 'it-map'
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import isPrivateIp from 'private-ip'
import { codes } from '../errors.js'
import {
MAX_INBOUND_STREAMS,
MAX_OUTBOUND_STREAMS,
Expand Down Expand Up @@ -155,6 +156,12 @@ class DefaultAutoNATService implements Startable {
async handleIncomingAutonatStream (data: IncomingStreamData): Promise<void> {
const signal = AbortSignal.timeout(this.timeout)

const onAbort = (): void => {
data.stream.abort(new CodeError('handleIncomingAutonatStream timeout', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
Expand All @@ -166,11 +173,10 @@ class DefaultAutoNATService implements Startable {
.map(ma => ma.toOptions().host)

try {
const source = abortableDuplex(data.stream, signal)
const self = this

await pipe(
source,
data.stream,
(source) => lp.decode(source),
async function * (stream) {
const buf = await first(stream)
Expand Down Expand Up @@ -379,12 +385,12 @@ class DefaultAutoNATService implements Startable {
})
},
(source) => lp.encode(source),
// pipe to the stream, not the abortable source other wise we
// can't tell the remote when a dial timed out..
data.stream
)
} catch (err) {
log.error('error handling incoming autonat stream', err)
} finally {
signal.removeEventListener('abort', onAbort)
}
}

Expand Down Expand Up @@ -453,6 +459,8 @@ class DefaultAutoNATService implements Startable {
const networkSegments: string[] = []

const verifyAddress = async (peer: PeerInfo): Promise<Message.DialResponse | undefined> => {
let onAbort = (): void => {}

try {
log('asking %p to verify multiaddr', peer.id)

Expand All @@ -463,12 +471,15 @@ class DefaultAutoNATService implements Startable {
const stream = await connection.newStream(this.protocol, {
signal
})
const source = abortableDuplex(stream, signal)

onAbort = () => { stream.abort(new CodeError('verifyAddress timeout', codes.ERR_TIMEOUT)) }

signal.addEventListener('abort', onAbort, { once: true })

const buf = await pipe(
[request],
(source) => lp.encode(source),
source,
stream,
(source) => lp.decode(source),
async (stream) => first(stream)
)
Expand Down Expand Up @@ -510,6 +521,8 @@ class DefaultAutoNATService implements Startable {
return response.dialResponse
} catch (err) {
log.error('error asking remote to verify multiaddr', err)
} finally {
signal.removeEventListener('abort', onAbort)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
options.signal?.throwIfAborted()

log('requesting reservation from %p', connection.remotePeer)
const stream = await connection.newStream(RELAY_V2_HOP_CODEC)
const stream = await connection.newStream(RELAY_V2_HOP_CODEC, options)
const pbstr = pbStream(stream)
const hopstr = pbstr.pb(HopMessage)
await hopstr.write({ type: HopMessage.Type.RESERVE }, options)
Expand Down
25 changes: 18 additions & 7 deletions packages/libp2p/src/circuit-relay/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal } from 'any-signal'
import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
import { codes } from '../errors.js'
import { DEFAULT_DATA_LIMIT, DEFAULT_DURATION_LIMIT } from './constants.js'
import type { Limit } from './pb/index.js'
import type { Stream } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -58,9 +59,13 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
}

queueMicrotask(() => {
void dst.sink(countStreamBytes(abortableSource(src.source, signal, {
abortMessage: 'duration limit exceeded'
}), dataLimit))
const onAbort = (): void => {
dst.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

void dst.sink(countStreamBytes(src.source, dataLimit))
.catch(err => {
log.error('error while relaying streams src -> dst', err)
abortStreams(err)
Expand All @@ -69,16 +74,21 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
srcDstFinished = true

if (dstSrcFinished) {
signal.removeEventListener('abort', onAbort)
signal.clear()
clearTimeout(timeout)
}
})
})

queueMicrotask(() => {
void src.sink(countStreamBytes(abortableSource(dst.source, signal, {
abortMessage: 'duration limit exceeded'
}), dataLimit))
const onAbort = (): void => {
src.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

void src.sink(countStreamBytes(dst.source, dataLimit))
.catch(err => {
log.error('error while relaying streams dst -> src', err)
abortStreams(err)
Expand All @@ -87,6 +97,7 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
dstSrcFinished = true

if (srcDstFinished) {
signal.removeEventListener('abort', onAbort)
signal.clear()
clearTimeout(timeout)
}
Expand Down
11 changes: 8 additions & 3 deletions packages/libp2p/src/fetch/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableDuplex } from 'abortable-iterator'
import first from 'it-first'
import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
Expand Down Expand Up @@ -140,6 +139,7 @@ class DefaultFetchService implements Startable, FetchService {
const connection = await this.components.connectionManager.openConnection(peer, options)
let signal = options.signal
let stream: Stream | undefined
let onAbort = (): void => {}

// create a timeout if no abort signal passed
if (signal == null) {
Expand All @@ -157,15 +157,19 @@ class DefaultFetchService implements Startable, FetchService {
signal
})

onAbort = () => {
stream?.abort(new CodeError('fetch timeout', codes.ERR_TIMEOUT))
}

// make stream abortable
const source = abortableDuplex(stream, signal)
signal.addEventListener('abort', onAbort, { once: true })

log('fetch %s', key)

const result = await pipe(
[FetchRequest.encode({ identifier: key })],
(source) => lp.encode(source),
source,
stream,
(source) => lp.decode(source),
async function (source) {
const buf = await first(source)
Expand Down Expand Up @@ -200,6 +204,7 @@ class DefaultFetchService implements Startable, FetchService {

return result ?? null
} finally {
signal.removeEventListener('abort', onAbort)
if (stream != null) {
await stream.close()
}
Expand Down
11 changes: 8 additions & 3 deletions packages/libp2p/src/ping/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { randomBytes } from '@libp2p/crypto'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableDuplex } from 'abortable-iterator'
import first from 'it-first'
import { pipe } from 'it-pipe'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
Expand Down Expand Up @@ -108,6 +107,7 @@ class DefaultPingService implements Startable, PingService {
const data = randomBytes(PING_LENGTH)
const connection = await this.components.connectionManager.openConnection(peer, options)
let stream: Stream | undefined
let onAbort = (): void => {}

options.signal = options.signal ?? AbortSignal.timeout(this.timeout)

Expand All @@ -117,12 +117,16 @@ class DefaultPingService implements Startable, PingService {
runOnTransientConnection: this.runOnTransientConnection
})

onAbort = () => {
stream?.abort(new CodeError('ping timeout', codes.ERR_TIMEOUT))
}

// make stream abortable
const source = abortableDuplex(stream, options.signal)
options.signal.addEventListener('abort', onAbort, { once: true })

const result = await pipe(
[data],
source,
stream,
async (source) => first(source)
)

Expand All @@ -146,6 +150,7 @@ class DefaultPingService implements Startable, PingService {

throw err
} finally {
options.signal.removeEventListener('abort', onAbort)
if (stream != null) {
await stream.close()
}
Expand Down
13 changes: 8 additions & 5 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { abortableDuplex } from 'abortable-iterator'
import { createConnection } from './connection/index.js'
import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { codes } from './errors.js'
Expand Down Expand Up @@ -165,16 +164,18 @@ export class DefaultUpgrader implements Upgrader {

const signal = AbortSignal.timeout(this.inboundUpgradeTimeout)

const onAbort = (): void => {
maConn.abort(new CodeError('inbound upgrade timeout', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch { }

try {
const abortableStream = abortableDuplex(maConn, signal)
maConn.source = abortableStream.source
maConn.sink = abortableStream.sink

if ((await this.components.connectionGater.denyInboundConnection?.(maConn)) === true) {
throw new CodeError('The multiaddr connection is blocked by gater.acceptConnection', codes.ERR_CONNECTION_INTERCEPTED)
}
Expand Down Expand Up @@ -255,6 +256,8 @@ export class DefaultUpgrader implements Upgrader {
transient: opts?.transient
})
} finally {
signal.removeEventListener('abort', onAbort)

this.components.connectionManager.afterUpgradeInbound()
}
}
Expand Down
3 changes: 3 additions & 0 deletions packages/libp2p/test/autonat/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ describe('autonat', () => {
}

sink.end()
},
abort: (err) => {
void stream.source.throw(err)
}
}
const connection = {
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/circuit-relay/utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ describe('circuit-relay utils', () => {

createLimitedRelay(localStream, remoteStream, controller.signal, limit)

expect(await toBuffer(received)).to.have.property('byteLength', 8)
expect(await toBuffer(received)).to.have.property('byteLength', 12)
expect(localStreamAbortSpy).to.have.property('called', true)
expect(remoteStreamAbortSpy).to.have.property('called', true)
})
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/fetch/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ describe('fetch', () => {
await expect(localFetch.fetch(remoteComponents.peerId, key, {
signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
.to.eventually.be.rejected.with.property('code', 'ERR_TIMEOUT')

// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/ping/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ describe('ping', () => {
await expect(localPing.ping(remoteComponents.peerId, {
signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
.to.eventually.be.rejected.with.property('code', 'ERR_TIMEOUT')

// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
Expand Down
38 changes: 38 additions & 0 deletions packages/libp2p/test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,44 @@ describe('Upgrader', () => {
})
})

it('should clear timeout if upgrade is successful', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })

localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
plaintext()()
],
muxers: [
yamux()()
],
inboundUpgradeTimeout: 1000
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
plaintext()()
],
muxers: [
yamux()()
],
inboundUpgradeTimeout: 1000
})

const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])

await delay(2000)

expect(connections).to.have.length(2)

connections.forEach(conn => {
conn.close().catch(() => {
throw new Error('Failed to close connection')
})
})
})

it('should fail if muxers do not match', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })

Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-perf/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class DefaultPerfService implements Startable, PerfService {

const writeBlockSize = this.writeBlockSize

const stream = await connection.newStream([this.protocol])
const stream = await connection.newStream([this.protocol], options)

// Convert sendBytes to uint64 big endian buffer
const view = new DataView(this.databuf)
Expand Down

0 comments on commit 2b755a8

Please sign in to comment.