diff --git a/package.json b/package.json index 932ca4d..3cb2fac 100644 --- a/package.json +++ b/package.json @@ -146,11 +146,11 @@ "@libp2p/interfaces": "^3.2.0", "@libp2p/logger": "^2.0.0", "abortable-iterator": "^4.0.2", - "it-first": "^2.0.0", + "it-first": "^3.0.1", "it-handshake": "^4.1.2", - "it-length-prefixed": "^8.0.3", + "it-length-prefixed": "^9.0.0", "it-merge": "^3.0.0", - "it-pipe": "^2.0.4", + "it-pipe": "^3.0.0", "it-pushable": "^3.1.0", "it-reader": "^6.0.1", "it-stream-types": "^1.0.4", @@ -160,10 +160,10 @@ }, "devDependencies": { "@types/varint": "^6.0.0", - "aegir": "^37.2.0", + "aegir": "^38.1.8", "iso-random-stream": "^2.0.2", "it-all": "^3.0.1", - "it-map": "^2.0.0", + "it-map": "^3.0.2", "it-pair": "^2.0.3", "p-timeout": "^6.0.0", "timeout-abort-controller": "^3.0.0", diff --git a/src/multistream.ts b/src/multistream.ts index cd2b932..48b50e6 100644 --- a/src/multistream.ts +++ b/src/multistream.ts @@ -28,7 +28,7 @@ export function encode (buffer: Uint8Array | Uint8ArrayList): Uint8ArrayList { /** * `write` encodes and writes a single buffer */ -export function write (writer: Pushable, buffer: Uint8Array | Uint8ArrayList, options: MultistreamSelectInit = {}) { +export function write (writer: Pushable, buffer: Uint8Array | Uint8ArrayList, options: MultistreamSelectInit = {}): void { const encoded = encode(buffer) if (options.writeBytes === true) { @@ -41,7 +41,7 @@ export function write (writer: Pushable, buffer: Uint8Array | Uint8ArrayLis /** * `writeAll` behaves like `write`, except it encodes an array of items as a single write */ -export function writeAll (writer: Pushable, buffers: Uint8Array[], options: MultistreamSelectInit = {}) { +export function writeAll (writer: Pushable, buffers: Uint8Array[], options: MultistreamSelectInit = {}): void { const list = new Uint8ArrayList() for (const buf of buffers) { @@ -71,13 +71,13 @@ export async function read (reader: Reader, options?: AbortOptions): Promise { + const onLength = (l: number): void => { byteLength = l } const buf = await pipe( input, - lp.decode({ onLength, maxDataLength: MAX_PROTOCOL_LENGTH }), + (source) => lp.decode(source, { onLength, maxDataLength: MAX_PROTOCOL_LENGTH }), async (source) => await first(source) ) @@ -93,7 +93,7 @@ export async function read (reader: Reader, options?: AbortOptions): Promise { const buf = await read(reader, options) return uint8ArrayToString(buf.subarray()) diff --git a/src/select.ts b/src/select.ts index d45244f..b7c0805 100644 --- a/src/select.ts +++ b/src/select.ts @@ -122,23 +122,25 @@ export function lazySelect (stream: Duplex, protocol: string): ProtocolStre let negotiated = false return { stream: { - sink: async source => await stream.sink((async function * () { - let first = true - for await (const chunk of merge(source, negotiateTrigger)) { - if (first) { - first = false - negotiated = true - negotiateTrigger.end() - const p1 = uint8ArrayFromString(PROTOCOL_ID) - const p2 = uint8ArrayFromString(protocol) - const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) - if (chunk.length > 0) list.append(chunk) - yield * list - } else { - yield chunk + sink: async source => { + await stream.sink((async function * () { + let first = true + for await (const chunk of merge(source, negotiateTrigger)) { + if (first) { + first = false + negotiated = true + negotiateTrigger.end() + const p1 = uint8ArrayFromString(PROTOCOL_ID) + const p2 = uint8ArrayFromString(protocol) + const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) + if (chunk.length > 0) list.append(chunk) + yield * list + } else { + yield chunk + } } - } - })()), + })()) + }, source: (async function * () { if (!negotiated) negotiateTrigger.push(new Uint8Array()) const byteReader = reader(stream.source) diff --git a/test/listener.spec.ts b/test/listener.spec.ts index 85bbbd0..028f9cf 100644 --- a/test/listener.spec.ts +++ b/test/listener.spec.ts @@ -120,7 +120,7 @@ describe('Listener', () => { // Decode each of the protocols from the reader const lsProtocols = await pipe( protocolsReader, - Lp.decode(), + (source) => Lp.decode(source), // Stringify and remove the newline (source) => map(source, (buf) => uint8ArrayToString(buf.subarray()).trim()), async (source) => await all(source)