diff --git a/package.json b/package.json index 9213c9f..f43ea3d 100644 --- a/package.json +++ b/package.json @@ -153,7 +153,6 @@ "any-signal": "^3.0.0", "benchmark": "^2.1.4", "err-code": "^3.0.1", - "it-pipe": "^2.0.3", "it-pushable": "^3.1.0", "it-stream-types": "^1.0.4", "rate-limiter-flexible": "^2.3.9", @@ -172,6 +171,7 @@ "it-drain": "^2.0.0", "it-foreach": "^1.0.0", "it-map": "^2.0.0", + "it-pipe": "^2.0.3", "it-to-buffer": "^3.0.0", "p-defer": "^4.0.0", "random-int": "^3.0.0", diff --git a/src/decode.ts b/src/decode.ts index 3ef8a5d..94d5812 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -1,6 +1,5 @@ import { MessageTypeNames, MessageTypes } from './message-types.js' import { Uint8ArrayList } from 'uint8arraylist' -import type { Source } from 'it-stream-types' import type { Message } from './message-types.js' export const MAX_MSG_SIZE = 1 << 20 // 1MB @@ -13,7 +12,7 @@ interface MessageHeader { length: number } -class Decoder { +export class Decoder { private readonly _buffer: Uint8ArrayList private _headerInfo: MessageHeader | null private readonly _maxMessageSize: number @@ -136,20 +135,3 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) { offset } } - -/** - * Decode a chunk and yield an _array_ of decoded messages - */ -export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) { - return async function * decodeMessages (source: Source): Source { - const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize) - - for await (const chunk of source) { - const msgs = decoder.write(chunk) - - if (msgs.length > 0) { - yield * msgs - } - } - } -} diff --git a/src/mplex.ts b/src/mplex.ts index b80b187..175af70 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -1,8 +1,7 @@ -import { pipe } from 'it-pipe' import { pushableV } from 'it-pushable' import { abortableSource } from 'abortable-iterator' import { encode } from './encode.js' -import { decode } from './decode.js' +import { Decoder } from './decode.js' import { MessageTypes, MessageTypeNames, Message } from './message-types.js' import { createStream } from './stream.js' import { toString as uint8ArrayToString } from 'uint8arrays' @@ -201,15 +200,13 @@ export class MplexStreamMuxer implements StreamMuxer { source = abortableSource(source, anySignal(abortSignals)) try { - await pipe( - source, - decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize), - async source => { - for await (const msg of source) { - await this._handleIncoming(msg) - } + const decoder = new Decoder(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize) + + for await (const chunk of source) { + for (const msg of decoder.write(chunk)) { + await this._handleIncoming(msg) } - ) + } this._source.end() } catch (err: any) { diff --git a/test/coder.spec.ts b/test/coder.spec.ts index 45f1264..4425c83 100644 --- a/test/coder.spec.ts +++ b/test/coder.spec.ts @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { encode } from '../src/encode.js' -import { decode } from '../src/decode.js' +import { decode } from './fixtures/decode.js' import all from 'it-all' import { concat as uint8ArrayConcat } from 'uint8arrays/concat' import { messageWithBytes } from './fixtures/utils.js' diff --git a/test/fixtures/decode.ts b/test/fixtures/decode.ts new file mode 100644 index 0000000..8b3a1ad --- /dev/null +++ b/test/fixtures/decode.ts @@ -0,0 +1,19 @@ +/* eslint-env mocha */ + +import type { Message } from '../../src/message-types.js' +import { Decoder, MAX_MSG_QUEUE_SIZE, MAX_MSG_SIZE } from '../../src/decode.js' +import type { Source } from 'it-stream-types' + +export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) { + return async function * decodeMessages (source: Source): Source { + const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize) + + for await (const chunk of source) { + const msgs = decoder.write(chunk) + + if (msgs.length > 0) { + yield * msgs + } + } + } +} diff --git a/test/mplex.spec.ts b/test/mplex.spec.ts index f2e58b2..dd65c9f 100644 --- a/test/mplex.spec.ts +++ b/test/mplex.spec.ts @@ -11,7 +11,7 @@ import all from 'it-all' import type { Source } from 'it-stream-types' import delay from 'delay' import pDefer from 'p-defer' -import { decode } from '../src/decode.js' +import { decode } from './fixtures/decode.js' import { pushable } from 'it-pushable' import { Uint8ArrayList } from 'uint8arraylist' @@ -135,8 +135,8 @@ describe('mplex', () => { streamSourceError.reject(new Error('Stream source did not error')) }) .catch(err => { - // should have errored before all messages were sent - expect(sent).to.equal(2) + // should have errored before all 102 messages were sent + expect(sent).to.be.lessThan(10) streamSourceError.resolve(err) }) } diff --git a/test/restrict-size.spec.ts b/test/restrict-size.spec.ts index 2067e98..f24efd2 100644 --- a/test/restrict-size.spec.ts +++ b/test/restrict-size.spec.ts @@ -8,7 +8,7 @@ import drain from 'it-drain' import each from 'it-foreach' import { Message, MessageTypes } from '../src/message-types.js' import { encode } from '../src/encode.js' -import { decode } from '../src/decode.js' +import { decode } from './fixtures/decode.js' import { Uint8ArrayList } from 'uint8arraylist' import toBuffer from 'it-to-buffer'