From 60840005f9704fff2cd0e0fe75a52d46bd85a49f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Aug 2022 17:16:07 +0100 Subject: [PATCH 1/4] feat: add lazy select --- package.json | 1 + src/index.ts | 2 +- src/select.ts | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index c085abf..9b13a60 100644 --- a/package.json +++ b/package.json @@ -149,6 +149,7 @@ "it-first": "^1.0.6", "it-handshake": "^4.0.1", "it-length-prefixed": "^8.0.2", + "it-merge": "^1.0.4", "it-pipe": "^2.0.3", "it-pushable": "^3.0.0", "it-reader": "^6.0.1", diff --git a/src/index.ts b/src/index.ts index 0f96da6..7ccb3c2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,5 +21,5 @@ export interface MultistreamSelectInit extends AbortOptions { writeBytes?: boolean } -export { select } from './select.js' +export { select, lazySelect } from './select.js' export { handle } from './handle.js' diff --git a/src/select.ts b/src/select.ts index f005371..1394fea 100644 --- a/src/select.ts +++ b/src/select.ts @@ -5,7 +5,10 @@ import { handshake } from 'it-handshake' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { PROTOCOL_ID } from './index.js' import type { Duplex } from 'it-stream-types' -import type { Uint8ArrayList } from 'uint8arraylist' +import { Uint8ArrayList } from 'uint8arraylist' +import { pushable } from 'it-pushable' +import merge from 'it-merge' +import { reader } from 'it-reader' import type { ByteArrayInit, ByteListInit, MultistreamSelectInit, ProtocolStream } from './index.js' const log = logger('libp2p:mss:select') @@ -58,3 +61,59 @@ export async function select (stream: Duplex, protocols: string | string[], rest() throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL') } + +/** + * Lazily negotiates a protocol. + * + * It *does not* block writes waiting for the other end to respond. Instead, it + * simply assumes the negotiation went successfully and starts writing data. + * + * Use when it is known that the receiver supports the desired protocol. + */ +export function lazySelect (stream: Duplex, protocol: string): ProtocolStream { + // This is a signal to write the multistream headers if the consumer tries to + // read from the source + const readPusher = pushable() + return { + stream: { + // eslint-disable-next-line @typescript-eslint/promise-function-async + sink: source => { + return stream.sink((async function * () { + let first = true + for await (const chunk of merge(source, readPusher)) { + if (first) { + const p1 = uint8ArrayFromString(PROTOCOL_ID) + const p2 = uint8ArrayFromString(protocol) + const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) + if (chunk.length !== 0) { // chunk will be zero length if from the pushable + list.append(chunk) + } + yield list.slice() + first = false + } + yield chunk + } + })()) + }, + source: (async function * () { + readPusher.push(new Uint8Array()) + try { + const byteReader = reader(stream.source) + let response = await multistream.readString(byteReader) + if (response === PROTOCOL_ID) { + response = await multistream.readString(byteReader) + } + if (response !== protocol) { + throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL') + } + for await (const chunk of byteReader) { + yield chunk.slice() + } + } catch (err) { + if (err.code !== 'ERR_UNDER_READ') throw err + } + })() + }, + protocol + } +} From bd4be1b03bca38d3430d0eda95bbd10b9aca404d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 8 Aug 2022 18:07:16 +0100 Subject: [PATCH 2/4] fix: make work --- src/select.ts | 59 +++++++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/src/select.ts b/src/select.ts index 1394fea..e5466d4 100644 --- a/src/select.ts +++ b/src/select.ts @@ -73,44 +73,39 @@ export async function select (stream: Duplex, protocols: string | string[], export function lazySelect (stream: Duplex, protocol: string): ProtocolStream { // This is a signal to write the multistream headers if the consumer tries to // read from the source - const readPusher = pushable() + const negotiateTrigger = pushable() + let negotiated = false return { stream: { // eslint-disable-next-line @typescript-eslint/promise-function-async - sink: source => { - return stream.sink((async function * () { - let first = true - for await (const chunk of merge(source, readPusher)) { - if (first) { - const p1 = uint8ArrayFromString(PROTOCOL_ID) - const p2 = uint8ArrayFromString(protocol) - const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) - if (chunk.length !== 0) { // chunk will be zero length if from the pushable - list.append(chunk) - } - yield list.slice() - first = false - } + sink: source => stream.sink((async function * () { + let first = true + for await (const chunk of merge(source, negotiateTrigger)) { + if (first) { + first = false + negotiated = true + const p1 = uint8ArrayFromString(PROTOCOL_ID) + const p2 = uint8ArrayFromString(protocol) + const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) + if (chunk.length > 0) list.append(chunk) + yield list.slice() + } else { yield chunk } - })()) - }, + } + })()), source: (async function * () { - readPusher.push(new Uint8Array()) - try { - const byteReader = reader(stream.source) - let response = await multistream.readString(byteReader) - if (response === PROTOCOL_ID) { - response = await multistream.readString(byteReader) - } - if (response !== protocol) { - throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL') - } - for await (const chunk of byteReader) { - yield chunk.slice() - } - } catch (err) { - if (err.code !== 'ERR_UNDER_READ') throw err + if (!negotiated) negotiateTrigger.push(new Uint8Array()) + const byteReader = reader(stream.source) + let response = await multistream.readString(byteReader) + if (response === PROTOCOL_ID) { + response = await multistream.readString(byteReader) + } + if (response !== protocol) { + throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL') + } + for await (const chunk of byteReader) { + yield chunk.slice() } })() }, From c9e27556dcd8b58123fc5acce55f0af6541b5931 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 12 Aug 2022 14:25:57 +0100 Subject: [PATCH 3/4] test: add lazySelect tests --- src/select.ts | 5 ++++- test/dialer.spec.ts | 15 ++++++++++++++ test/integration.spec.ts | 43 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/src/select.ts b/src/select.ts index e5466d4..815a00f 100644 --- a/src/select.ts +++ b/src/select.ts @@ -70,7 +70,9 @@ export async function select (stream: Duplex, protocols: string | string[], * * Use when it is known that the receiver supports the desired protocol. */ -export function lazySelect (stream: Duplex, protocol: string): ProtocolStream { +export function lazySelect (stream: Duplex, protocol: string): ProtocolStream +export function lazySelect (stream: Duplex, protocol: string): ProtocolStream +export function lazySelect (stream: Duplex, protocol: string): ProtocolStream { // This is a signal to write the multistream headers if the consumer tries to // read from the source const negotiateTrigger = pushable() @@ -84,6 +86,7 @@ export function lazySelect (stream: Duplex, protocol: string): Proto if (first) { first = false negotiated = true + negotiateTrigger.end() const p1 = uint8ArrayFromString(PROTOCOL_ID) const p2 = uint8ArrayFromString(protocol) const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) diff --git a/test/dialer.spec.ts b/test/dialer.spec.ts index ded14fe..0a7ada2 100644 --- a/test/dialer.spec.ts +++ b/test/dialer.spec.ts @@ -116,4 +116,19 @@ describe('Dialer', () => { await expect(mss.select(duplex, protocol)).to.eventually.be.rejected().with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') }) }) + + describe('dialer.lazySelect', () => { + it('should lazily select a single protocol', async () => { + const protocol = '/echo/1.0.0' + const duplex = pair() + + const selection = mss.lazySelect(duplex, protocol) + expect(selection.protocol).to.equal(protocol) + + // Ensure stream is usable after selection + const input = [randomBytes(10), randomBytes(64), randomBytes(3)] + const output = await pipe(input, selection.stream, async (source) => await all(source)) + expect(new Uint8ArrayList(...output).slice()).to.eql(new Uint8ArrayList(...input).slice()) + }) + }) }) diff --git a/test/integration.spec.ts b/test/integration.spec.ts index d24ea1b..e86f1d9 100644 --- a/test/integration.spec.ts +++ b/test/integration.spec.ts @@ -79,4 +79,47 @@ describe('Dialer and Listener integration', () => { ]) expect(new Uint8ArrayList(...output[0]).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) + + it('should handle and lazySelect', async () => { + const protocol = '/echo/1.0.0' + const pair = duplexPair() + + const dialerSelection = mss.lazySelect(pair[0], protocol) + expect(dialerSelection.protocol).to.equal(protocol) + + // Ensure stream is usable after selection + const input = [new Uint8ArrayList(randomBytes(10), randomBytes(64), randomBytes(3))] + // Since the stream is lazy, we need to write to it before handling + const dialerOutPromise = pipe(input, dialerSelection.stream, async source => await all(source)) + + const listenerSelection = await mss.handle(pair[1], protocol) + expect(listenerSelection.protocol).to.equal(protocol) + + await pipe(listenerSelection.stream, listenerSelection.stream) + + const dialerOut = await dialerOutPromise + expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) + }) + + it('should abort an unhandled lazySelect', async () => { + const protocol = '/echo/1.0.0' + const pair = duplexPair() + + const dialerSelection = mss.lazySelect(pair[0], protocol) + expect(dialerSelection.protocol).to.equal(protocol) + + // Ensure stream is usable after selection + const input = [new Uint8ArrayList(randomBytes(10), randomBytes(64), randomBytes(3))] + // Since the stream is lazy, we need to write to it before handling + const dialerResultPromise = pipe(input, dialerSelection.stream, async source => await all(source)) + + // The error message from this varies depending on how much data got + // written when the dialer receives the `na` response and closes the + // stream, so we just assert that this rejects. + await expect(mss.handle(pair[1], '/unhandled/1.0.0')).to.be.rejected() + + const dialerErr = await expect(dialerResultPromise).to.be.rejected() + // Dialer should fail to negotiate the single protocol + expect(dialerErr.code).to.eql('ERR_UNSUPPORTED_PROTOCOL') + }) }) From 0eb2a2bfcd7309b69106f97c7cfc1f070e02a61a Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 12 Oct 2022 16:43:28 +0100 Subject: [PATCH 4/4] chore: apply suggestions from code review --- src/select.ts | 7 +++---- test/integration.spec.ts | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/select.ts b/src/select.ts index 815a00f..a047853 100644 --- a/src/select.ts +++ b/src/select.ts @@ -79,8 +79,7 @@ export function lazySelect (stream: Duplex, protocol: string): ProtocolStre let negotiated = false return { stream: { - // eslint-disable-next-line @typescript-eslint/promise-function-async - sink: source => stream.sink((async function * () { + sink: async source => await stream.sink((async function * () { let first = true for await (const chunk of merge(source, negotiateTrigger)) { if (first) { @@ -91,7 +90,7 @@ export function lazySelect (stream: Duplex, protocol: string): ProtocolStre const p2 = uint8ArrayFromString(protocol) const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2)) if (chunk.length > 0) list.append(chunk) - yield list.slice() + yield * list } else { yield chunk } @@ -108,7 +107,7 @@ export function lazySelect (stream: Duplex, protocol: string): ProtocolStre throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL') } for await (const chunk of byteReader) { - yield chunk.slice() + yield * chunk } })() }, diff --git a/test/integration.spec.ts b/test/integration.spec.ts index e86f1d9..a8ad9ef 100644 --- a/test/integration.spec.ts +++ b/test/integration.spec.ts @@ -116,10 +116,10 @@ describe('Dialer and Listener integration', () => { // The error message from this varies depending on how much data got // written when the dialer receives the `na` response and closes the // stream, so we just assert that this rejects. - await expect(mss.handle(pair[1], '/unhandled/1.0.0')).to.be.rejected() + await expect(mss.handle(pair[1], '/unhandled/1.0.0')).to.eventually.be.rejected() - const dialerErr = await expect(dialerResultPromise).to.be.rejected() // Dialer should fail to negotiate the single protocol - expect(dialerErr.code).to.eql('ERR_UNSUPPORTED_PROTOCOL') + await expect(dialerResultPromise).to.eventually.be.rejected() + .with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') }) })