From f0156895dfe4a958aeb49be1173012390a7d396e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 14 Oct 2022 09:43:06 -0500 Subject: [PATCH 1/2] Reduce async iterator loops per package in _createSink --- src/decode.ts | 2 +- src/mplex.ts | 35 ++++++++++++------ src/restrict-size.ts | 36 ------------------ src/stream.ts | 2 +- test/restrict-size.spec.ts | 75 +++++++++++++++++++++++--------------- 5 files changed, 70 insertions(+), 80 deletions(-) delete mode 100644 src/restrict-size.ts diff --git a/src/decode.ts b/src/decode.ts index 94aaf40..3ed3d85 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -10,7 +10,7 @@ interface MessageHeader { length: number } -class Decoder { +export class Decoder { private readonly _buffer: Uint8ArrayList private _headerInfo: MessageHeader | null diff --git a/src/mplex.ts b/src/mplex.ts index 2d44172..b8742cc 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -1,11 +1,9 @@ -import { pipe } from 'it-pipe' import { pushableV } from 'it-pushable' import { abortableSource } from 'abortable-iterator' import { encode } from './encode.js' -import { decode } from './decode.js' -import { restrictSize } from './restrict-size.js' +import { Decoder } from './decode.js' import { MessageTypes, MessageTypeNames, Message } from './message-types.js' -import { createStream } from './stream.js' +import { createStream, MAX_MSG_SIZE } from './stream.js' import { toString as uint8ArrayToString } from 'uint8arrays' import { logger } from '@libp2p/logger' import errCode from 'err-code' @@ -202,16 +200,29 @@ export class MplexStreamMuxer implements StreamMuxer { source = abortableSource(source, anySignal(abortSignals)) try { - await pipe( - source, - decode, - restrictSize(this._init.maxMsgSize), - async source => { - for await (const msg of source) { - await this._handleIncoming(msg) + const decoder = new Decoder() + const maxSize = this._init.maxMsgSize ?? MAX_MSG_SIZE + + for await (const chunk of source) { + // decode + const msgs = decoder.write(chunk) + if (msgs.length === 0) { + // eslint-disable-next-line no-continue + continue + } + + // restrict size + for (const msg of msgs) { + if ( + (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && + msg.data.byteLength > maxSize + ) { + throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) } + + await this._handleIncoming(msg) } - ) + } this._source.end() } catch (err: any) { diff --git a/src/restrict-size.ts b/src/restrict-size.ts deleted file mode 100644 index e91d9c9..0000000 --- a/src/restrict-size.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { Message, MessageTypes } from './message-types.js' -import type { Source, Transform } from 'it-stream-types' - -export const MAX_MSG_SIZE = 1 << 20 // 1MB - -/** - * Creates an iterable transform that restricts message sizes to - * the given maximum size. - */ -export function restrictSize (max?: number): Transform { - const maxSize = max ?? MAX_MSG_SIZE - - const checkSize = (msg: Message) => { - if (msg.type !== MessageTypes.NEW_STREAM && msg.type !== MessageTypes.MESSAGE_INITIATOR && msg.type !== MessageTypes.MESSAGE_RECEIVER) { - return - } - - if (msg.data.byteLength > maxSize) { - throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) - } - } - - return (source: Source) => { - return (async function * restrictSize () { - for await (const msg of source) { - if (Array.isArray(msg)) { - msg.forEach(checkSize) - yield * msg - } else { - checkSize(msg) - yield msg - } - } - })() - } -} diff --git a/src/stream.ts b/src/stream.ts index bfc29cf..ece9a79 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,7 +1,6 @@ import { abortableSource } from 'abortable-iterator' import { pushable } from 'it-pushable' import errCode from 'err-code' -import { MAX_MSG_SIZE } from './restrict-size.js' import { anySignal } from 'any-signal' import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' @@ -14,6 +13,7 @@ import type { MplexStream } from './mplex.js' const log = logger('libp2p:mplex:stream') +export const MAX_MSG_SIZE = 1 << 20 // 1MB const ERR_STREAM_RESET = 'ERR_STREAM_RESET' const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT' const ERR_SINK_ENDED = 'ERR_SINK_ENDED' diff --git a/test/restrict-size.spec.ts b/test/restrict-size.spec.ts index b744207..c7dba4c 100644 --- a/test/restrict-size.spec.ts +++ b/test/restrict-size.spec.ts @@ -3,56 +3,71 @@ import { expect } from 'aegir/chai' import { pipe } from 'it-pipe' import randomBytes from 'iso-random-stream/src/random.js' -import all from 'it-all' -import drain from 'it-drain' -import each from 'it-foreach' import { Message, MessageTypes } from '../src/message-types.js' -import { restrictSize } from '../src/restrict-size.js' import { Uint8ArrayList } from 'uint8arraylist' +import { MplexStream, MplexStreamMuxer } from '../src/mplex.js' +import { encode } from '../src/encode.js' describe('restrict-size', () => { - it('should throw when size is too big', async () => { - const maxSize = 32 + const maxMsgSize = 32 + it('should throw when size is too big', async () => { const input: Message[] = [ { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) }, - { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxMsgSize)) }, { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }, { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) } ] const output: Message[] = [] + const streamMuxer = new MplexStreamMuxer({ maxMsgSize }) + let abortError: Error | null = null - try { - await pipe( - input, - restrictSize(maxSize), - (source) => each(source, chunk => { - output.push(chunk) - }), - async (source) => await drain(source) - ) - } catch (err: any) { - expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG') - expect(output).to.have.length(2) - expect(output[0]).to.deep.equal(input[0]) - expect(output[1]).to.deep.equal(input[1]) - return + // Mutate _handleIncoming to capture output + streamMuxer._handleIncoming = async (msg) => { + output.push(msg) } - throw new Error('did not restrict size') + + // Note: in current MplexStreamMuxer it's very hard to access sink errors. + // The simplest way currently is to add a mock stream that will be aborted + // on MplexStreamMuxer.close() + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const mockStream = { + abort: (err) => { + abortError = err + } + } as MplexStream + // eslint-disable-next-line @typescript-eslint/dot-notation + streamMuxer['_streams'].initiators.set(0, mockStream) + + await pipe( + input, + encode, + streamMuxer.sink + ) + + if (abortError === null) throw Error('did not restrict size') + expect(abortError).to.have.property('code', 'ERR_MSG_TOO_BIG') + expect(output).to.have.length(2) + expect(output[0]).to.deep.equal(input[0]) + expect(output[1]).to.deep.equal(input[1]) }) it('should allow message with no data property', async () => { - const message: Message = { - id: 4, - type: MessageTypes.CLOSE_RECEIVER + const input: Message[] = [{ id: 4, type: MessageTypes.CLOSE_RECEIVER }] + + const output: Message[] = [] + const streamMuxer = new MplexStreamMuxer({ maxMsgSize }) + + // Mutate _handleIncoming to capture output + streamMuxer._handleIncoming = async (msg) => { + output.push(msg) } - const input: Message[] = [message] - const output = await pipe( + await pipe( input, - restrictSize(32), - async (source) => await all(source) + encode, + streamMuxer.sink ) expect(output).to.deep.equal(input) }) From 011dd0ff819143b16e8fe013a11361353d58b7e0 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 24 Nov 2022 16:08:38 +0000 Subject: [PATCH 2/2] chore: fix whitespace --- test/restrict-size.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/restrict-size.spec.ts b/test/restrict-size.spec.ts index 461f35a..f24efd2 100644 --- a/test/restrict-size.spec.ts +++ b/test/restrict-size.spec.ts @@ -122,4 +122,4 @@ describe('restrict size', () => { } throw new Error('did not restrict size') }) -}) \ No newline at end of file +})