Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: reduce async iterator loops per package in _createSink #224

Merged
merged 3 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@
"any-signal": "^3.0.0",
"benchmark": "^2.1.4",
"err-code": "^3.0.1",
"it-pipe": "^2.0.3",
"it-pushable": "^3.1.0",
"it-stream-types": "^1.0.4",
"rate-limiter-flexible": "^2.3.9",
Expand All @@ -172,6 +171,7 @@
"it-drain": "^2.0.0",
"it-foreach": "^1.0.0",
"it-map": "^2.0.0",
"it-pipe": "^2.0.3",
"it-to-buffer": "^3.0.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
Expand Down
20 changes: 1 addition & 19 deletions src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { MessageTypeNames, MessageTypes } from './message-types.js'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = 1 << 20 // 1MB
Expand All @@ -13,7 +12,7 @@ interface MessageHeader {
length: number
}

class Decoder {
export class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number
Expand Down Expand Up @@ -136,20 +135,3 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
offset
}
}

/**
* Decode a chunk and yield an _array_ of decoded messages
*/
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)

if (msgs.length > 0) {
yield * msgs
}
}
}
}
17 changes: 7 additions & 10 deletions src/mplex.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { pipe } from 'it-pipe'
import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { Decoder } from './decode.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
Expand Down Expand Up @@ -201,15 +200,13 @@ export class MplexStreamMuxer implements StreamMuxer {
source = abortableSource(source, anySignal(abortSignals))

try {
await pipe(
source,
decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
}
const decoder = new Decoder(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
for (const msg of decoder.write(chunk)) {
await this._handleIncoming(msg)
}
)
}

this._source.end()
} catch (err: any) {
Expand Down
2 changes: 1 addition & 1 deletion test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { expect } from 'aegir/chai'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { decode } from './fixtures/decode.js'
import all from 'it-all'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { messageWithBytes } from './fixtures/utils.js'
Expand Down
19 changes: 19 additions & 0 deletions test/fixtures/decode.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* eslint-env mocha */

import type { Message } from '../../src/message-types.js'
import { Decoder, MAX_MSG_QUEUE_SIZE, MAX_MSG_SIZE } from '../../src/decode.js'
import type { Source } from 'it-stream-types'

export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)

if (msgs.length > 0) {
yield * msgs
}
}
}
}
6 changes: 3 additions & 3 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import all from 'it-all'
import type { Source } from 'it-stream-types'
import delay from 'delay'
import pDefer from 'p-defer'
import { decode } from '../src/decode.js'
import { decode } from './fixtures/decode.js'
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'

Expand Down Expand Up @@ -135,8 +135,8 @@ describe('mplex', () => {
streamSourceError.reject(new Error('Stream source did not error'))
})
.catch(err => {
// should have errored before all messages were sent
expect(sent).to.equal(2)
// should have errored before all 102 messages were sent
expect(sent).to.be.lessThan(10)
streamSourceError.resolve(err)
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import drain from 'it-drain'
import each from 'it-foreach'
import { Message, MessageTypes } from '../src/message-types.js'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { decode } from './fixtures/decode.js'
import { Uint8ArrayList } from 'uint8arraylist'
import toBuffer from 'it-to-buffer'

Expand Down