Skip to content

Commit

Permalink
fix: remove abortable iterator (#488)
Browse files Browse the repository at this point in the history
AbortableSource is slow because it races promises against every chunk
causing extra async work.

It's only really necessary if we're going to pass the source off to
another component.

Here we don't do that so it's simpler to just add a listener for the
abort event and close the stream.
  • Loading branch information
achingbrain authored Feb 5, 2024
1 parent b4e6a8d commit e39b2e2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
"@libp2p/peer-id": "^4.0.5",
"@libp2p/pubsub": "^9.0.8",
"@multiformats/multiaddr": "^12.1.14",
"abortable-iterator": "^5.0.1",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
Expand Down
28 changes: 18 additions & 10 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { abortableSource } from 'abortable-iterator'
import { encode, decode } from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
Expand All @@ -25,8 +24,15 @@ export class OutboundStream {
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

this.closeController.signal.addEventListener('abort', () => {
rawStream.close()
.catch(err => {
rawStream.abort(err)
})
})

pipe(
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
this.pushable,
this.rawStream
).catch(errCallback)
}
Expand Down Expand Up @@ -59,7 +65,6 @@ export class OutboundStream {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
await this.pushable.return()
await this.rawStream.close()
}
}

Expand All @@ -73,17 +78,20 @@ export class InboundStream {
this.rawStream = rawStream
this.closeController = new AbortController()

this.source = abortableSource(
pipe(this.rawStream, (source) => decode(source, opts)),
this.closeController.signal,
{
returnOnAbort: true
}
this.closeController.signal.addEventListener('abort', () => {
rawStream.close()
.catch(err => {
rawStream.abort(err)
})
})

this.source = pipe(
this.rawStream,
(source) => decode(source, opts)
)
}

async close (): Promise<void> {
this.closeController.abort()
await this.rawStream.close()
}
}

0 comments on commit e39b2e2

Please sign in to comment.