Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: reduce async iterator loops per package in _createSink #224

Merged
merged 3 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface MessageHeader {
length: number
}

class Decoder {
export class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null

Expand Down
35 changes: 23 additions & 12 deletions src/mplex.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { pipe } from 'it-pipe'
import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { Decoder } from './decode.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { createStream, MAX_MSG_SIZE } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { logger } from '@libp2p/logger'
import errCode from 'err-code'
Expand Down Expand Up @@ -202,16 +200,29 @@ export class MplexStreamMuxer implements StreamMuxer {
source = abortableSource(source, anySignal(abortSignals))

try {
await pipe(
source,
decode,
restrictSize(this._init.maxMsgSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
const decoder = new Decoder()
const maxSize = this._init.maxMsgSize ?? MAX_MSG_SIZE

for await (const chunk of source) {
// decode
const msgs = decoder.write(chunk)
if (msgs.length === 0) {
// eslint-disable-next-line no-continue
continue
}

// restrict size
for (const msg of msgs) {
if (
(msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) &&
msg.data.byteLength > maxSize
) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
}

await this._handleIncoming(msg)
}
)
}

this._source.end()
} catch (err: any) {
Expand Down
36 changes: 0 additions & 36 deletions src/restrict-size.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { abortableSource } from 'abortable-iterator'
import { pushable } from 'it-pushable'
import errCode from 'err-code'
import { MAX_MSG_SIZE } from './restrict-size.js'
import { anySignal } from 'any-signal'
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand All @@ -14,6 +13,7 @@ import type { MplexStream } from './mplex.js'

const log = logger('libp2p:mplex:stream')

export const MAX_MSG_SIZE = 1 << 20 // 1MB
const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
const ERR_SINK_ENDED = 'ERR_SINK_ENDED'
Expand Down
75 changes: 45 additions & 30 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,71 @@
import { expect } from 'aegir/chai'
import { pipe } from 'it-pipe'
import randomBytes from 'iso-random-stream/src/random.js'
import all from 'it-all'
import drain from 'it-drain'
import each from 'it-foreach'
import { Message, MessageTypes } from '../src/message-types.js'
import { restrictSize } from '../src/restrict-size.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { MplexStream, MplexStreamMuxer } from '../src/mplex.js'
import { encode } from '../src/encode.js'

describe('restrict-size', () => {
it('should throw when size is too big', async () => {
const maxSize = 32
const maxMsgSize = 32

it('should throw when size is too big', async () => {
const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxMsgSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
]

const output: Message[] = []
const streamMuxer = new MplexStreamMuxer({ maxMsgSize })
let abortError: Error | null = null

try {
await pipe(
input,
restrictSize(maxSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
expect(output).to.have.length(2)
expect(output[0]).to.deep.equal(input[0])
expect(output[1]).to.deep.equal(input[1])
return
// Mutate _handleIncoming to capture output
streamMuxer._handleIncoming = async (msg) => {
output.push(msg)
}
throw new Error('did not restrict size')

// Note: in current MplexStreamMuxer it's very hard to access sink errors.
// The simplest way currently is to add a mock stream that will be aborted
// on MplexStreamMuxer.close()
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const mockStream = {
abort: (err) => {
abortError = err
}
} as MplexStream
// eslint-disable-next-line @typescript-eslint/dot-notation
streamMuxer['_streams'].initiators.set(0, mockStream)

await pipe(
input,
encode,
streamMuxer.sink
)

if (abortError === null) throw Error('did not restrict size')
expect(abortError).to.have.property('code', 'ERR_MSG_TOO_BIG')
expect(output).to.have.length(2)
expect(output[0]).to.deep.equal(input[0])
expect(output[1]).to.deep.equal(input[1])
})

it('should allow message with no data property', async () => {
const message: Message = {
id: 4,
type: MessageTypes.CLOSE_RECEIVER
const input: Message[] = [{ id: 4, type: MessageTypes.CLOSE_RECEIVER }]

const output: Message[] = []
const streamMuxer = new MplexStreamMuxer({ maxMsgSize })

// Mutate _handleIncoming to capture output
streamMuxer._handleIncoming = async (msg) => {
output.push(msg)
}
const input: Message[] = [message]

const output = await pipe(
await pipe(
input,
restrictSize(32),
async (source) => await all(source)
encode,
streamMuxer.sink
)
expect(output).to.deep.equal(input)
})
Expand Down