diff --git a/package.json b/package.json index b4052b7..709e9a4 100644 --- a/package.json +++ b/package.json @@ -148,8 +148,9 @@ "lint": "aegir lint", "dep-check": "aegir dep-check", "build": "aegir build", - "test": "aegir test -t browser", + "test": "aegir test -t browser -t webworker", "test:chrome": "aegir test -t browser --cov", + "test:chrome-webworker": "aegir test -t webworker", "release": "aegir release", "docs": "aegir docs" }, diff --git a/src/index.ts b/src/index.ts index 60342a5..e54c01a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,13 +11,12 @@ import type { Duplex, Source } from 'it-stream-types' import type { StreamMuxerFactory, StreamMuxerInit, StreamMuxer } from '@libp2p/interface-stream-muxer' import { Uint8ArrayList } from 'uint8arraylist' -const log = logger('libp2p:webtransport') declare global { - interface Window { - WebTransport: any - } + var WebTransport: any } +const log = logger('libp2p:webtransport') + // @ts-expect-error - Not easy to combine these types. const multibaseDecoder = Object.values(bases).map(b => b.decoder).reduce((d, b) => d.or(b)) @@ -265,7 +264,7 @@ export interface WebTransportComponents { peerId: PeerId } -class WebTransport implements Transport { +class WebTransportTransport implements Transport { private readonly components: WebTransportComponents private readonly config: Required @@ -299,7 +298,7 @@ class WebTransport implements Transport { throw new Error('Expected multiaddr to contain certhashes') } - const wt = new window.WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { + const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { serverCertificateHashes: certhashes.map(certhash => ({ algorithm: 'sha-256', value: certhash.digest @@ -349,7 +348,7 @@ class WebTransport implements Transport { return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true }) } - async authenticateWebTransport (wt: typeof window.WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>): Promise { + async authenticateWebTransport (wt: InstanceType, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>): Promise { const stream = await wt.createBidirectionalStream() const writer = stream.writable.getWriter() const reader = stream.readable.getReader() @@ -359,7 +358,14 @@ class WebTransport implements Transport { source: (async function * () { while (true) { const val = await reader.read() - yield val.value + + if (val.value != null) { + yield val.value + } + + if (val.done === true) { + break + } } })(), sink: async function (source: Source) { @@ -390,7 +396,7 @@ class WebTransport implements Transport { return true } - webtransportMuxer (wt: typeof window.WebTransport): StreamMuxerFactory { + webtransportMuxer (wt: InstanceType): StreamMuxerFactory { let streamIDCounter = 0 const config = this.config return { @@ -411,9 +417,11 @@ class WebTransport implements Transport { const reader = wt.incomingBidirectionalStreams.getReader() while (true) { const { done, value: wtStream } = await reader.read() + if (done === true) { break } + if (activeStreams.length >= config.maxInboundStreams) { // We've reached our limit, close this stream. wtStream.writable.close().catch((err: Error) => { @@ -482,5 +490,5 @@ class WebTransport implements Transport { } export function webTransport (init: WebTransportInit = {}): (components: WebTransportComponents) => Transport { - return (components: WebTransportComponents) => new WebTransport(components, init) + return (components: WebTransportComponents) => new WebTransportTransport(components, init) }