Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: add message byte batching #235

Merged
merged 3 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Creates a factory that can be used to create new muxers.

- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB)
- `minSendBytes` - if set, message bytes from the current tick will be batched up to this amount before being yielded by the muxer source, unless the next tick begins in which case all available bytes will be yielded
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"any-signal": "^3.0.0",
"benchmark": "^2.1.4",
"err-code": "^3.0.1",
"it-batched-bytes": "^1.0.0",
"it-pipe": "^2.0.3",
"it-pushable": "^3.1.0",
"it-stream-types": "^1.0.4",
Expand Down
30 changes: 21 additions & 9 deletions src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import varint from 'varint'
import { Uint8ArrayList } from 'uint8arraylist'
import { allocUnsafe } from './alloc-unsafe.js'
import { Message, MessageTypes } from './message-types.js'
import batchedBytes from 'it-batched-bytes'

const POOL_SIZE = 10 * 1024

Expand Down Expand Up @@ -55,18 +56,29 @@ const encoder = new Encoder()
/**
* Encode and yield one or more messages
*/
export async function * encode (source: Source<Message | Message[]>) {
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
for await (const msg of source) {
const list = new Uint8ArrayList()
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0) {
if (minSendBytes == null || minSendBytes === 0) {
// just send the messages
for await (const messages of source) {
const list = new Uint8ArrayList()

if (Array.isArray(msg)) {
for (const m of msg) {
encoder.write(m, list)
for (const msg of messages) {
encoder.write(msg, list)
}
} else {
encoder.write(msg, list)

yield list.subarray()
}

yield list.subarray()
return
}

// batch messages up for sending
yield * batchedBytes(source, {
size: minSendBytes,
serialize: (obj, list) => {
for (const m of obj) {
encoder.write(m, list)
}
}
})
}
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ export interface MplexInit {
*/
maxUnprocessedMessageQueueSize?: number

/**
* Each byte array written into a multiplexed stream is converted to one or
* more messages which are sent as byte arrays to the remote node. Sending
* lots of small messages can be expensive - use this setting to batch up
* the serialized bytes of all messages sent during the current tick up to
* this limit to send in one go similar to Nagle's algorithm. N.b. you
* should benchmark your application carefully when using this setting as it
* may cause the opposite of the desired effect. Omit this setting to send
* all messages as they become available. (default: undefined)
*/
minSendBytes?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. A request to open more than this will have a stream
Expand Down
2 changes: 1 addition & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class MplexStreamMuxer implements StreamMuxer {
onEnd
})

return Object.assign(encode(source), {
return Object.assign(encode(source, this._init.minSendBytes), {
push: source.push,
end: source.end,
return: source.return
Expand Down
16 changes: 8 additions & 8 deletions test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Uint8ArrayList } from 'uint8arraylist'

describe('coder', () => {
it('should encode header', async () => {
const source: Message[] = [{ id: 17, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('17')) }]
const source: Message[][] = [[{ id: 17, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('17')) }]]

const data = uint8ArrayConcat(await all(encode(source)))

Expand All @@ -29,34 +29,34 @@ describe('coder', () => {
})

it('should encode several msgs into buffer', async () => {
const source: Message[] = [
const source: Message[][] = [[
{ id: 17, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('17')) },
{ id: 19, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('19')) },
{ id: 21, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('21')) }
]
]]

const data = uint8ArrayConcat(await all(encode(source)))

expect(data).to.equalBytes(uint8ArrayFromString('88010231379801023139a801023231', 'base16'))
})

it('should encode from Uint8ArrayList', async () => {
const source: NewStreamMessage[] = [{
const source: NewStreamMessage[][] = [[{
id: 17,
type: 0,
data: new Uint8ArrayList(
uint8ArrayFromString(Math.random().toString()),
uint8ArrayFromString(Math.random().toString())
)
}]
}]]

const data = uint8ArrayConcat(await all(encode(source)))

expect(data).to.equalBytes(
uint8ArrayConcat([
uint8ArrayFromString('8801', 'base16'),
Uint8Array.from([source[0].data.length]),
source[0].data instanceof Uint8Array ? source[0].data : source[0].data.slice()
Uint8Array.from([source[0][0].data.length]),
source[0][0].data instanceof Uint8Array ? source[0][0].data : source[0][0].data.slice()
])
)
})
Expand All @@ -77,7 +77,7 @@ describe('coder', () => {
})

it('should encode zero length body msg', async () => {
const source: Message[] = [{ id: 17, type: 0 }]
const source: Message[][] = [[{ id: 17, type: 0 }]]

const data = uint8ArrayConcat(await all(encode(source)))

Expand Down
79 changes: 69 additions & 10 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,41 @@ describe('mplex', () => {

// max out the streams for this connection
for (let i = 0; i < maxInboundStreams; i++) {
const source: NewStreamMessage[] = [{
const source: NewStreamMessage[][] = [[{
id: i,
type: 0,
data: new Uint8ArrayList(uint8ArrayFromString('17'))
}]
}]]

const data = uint8ArrayConcat(await all(encode(source)))

stream.push(data)
}

// simulate a new incoming stream
const source: NewStreamMessage[] = [{
const source: NewStreamMessage[][] = [[{
id: 11,
type: 0,
data: new Uint8ArrayList(uint8ArrayFromString('17'))
}]
}]]

const data = uint8ArrayConcat(await all(encode(source)))

stream.push(data)
stream.end()

const bufs: Uint8Array[] = []
const sinkDone = pDefer()

void Promise.resolve().then(async () => {
for await (const buf of muxer.source) {
bufs.push(buf)
}
sinkDone.resolve()
})

await muxer.sink(stream)
await sinkDone.promise

const messages = await all(decode()(bufs))

Expand All @@ -89,13 +92,13 @@ describe('mplex', () => {
const id = 17

// simulate a new incoming stream that sends lots of data
const input: Source<Message> = (async function * send () {
const input: Source<Message[]> = (async function * send () {
const newStreamMessage: NewStreamMessage = {
id,
type: MessageTypes.NEW_STREAM,
data: new Uint8ArrayList(new Uint8Array(1024))
}
yield newStreamMessage
yield [newStreamMessage]

await delay(10)

Expand All @@ -105,7 +108,7 @@ describe('mplex', () => {
type: MessageTypes.MESSAGE_INITIATOR,
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
}
yield dataMessage
yield [dataMessage]

sent++

Expand All @@ -118,7 +121,7 @@ describe('mplex', () => {
id,
type: MessageTypes.CLOSE_INITIATOR
}
yield closeMessage
yield [closeMessage]
})()

// create the muxer
Expand All @@ -135,8 +138,8 @@ describe('mplex', () => {
streamSourceError.reject(new Error('Stream source did not error'))
})
.catch(err => {
// should have errored before all messages were sent
expect(sent).to.equal(2)
// should have errored before all 102 messages were sent
expect(sent).to.be.lessThan(10)
streamSourceError.resolve(err)
})
}
Expand All @@ -162,4 +165,60 @@ describe('mplex', () => {
expect(messages).to.have.nested.property('[0].id', id)
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
})

it('should batch bytes to send', async () => {
const minSendBytes = 10

// input bytes, smaller than batch size
const input: Uint8Array[] = [
Uint8Array.from([0, 1, 2, 3, 4]),
Uint8Array.from([0, 1, 2, 3, 4]),
Uint8Array.from([0, 1, 2, 3, 4])
]

// create the muxer
const factory = mplex({
minSendBytes
})()
const muxer = factory.createStreamMuxer({})

// collect outgoing mplex messages
const muxerFinished = pDefer()
let output: Uint8Array[] = []
void Promise.resolve().then(async () => {
output = await all(muxer.source)
muxerFinished.resolve()
})

// create a stream
const stream = await muxer.newStream()
const streamFinished = pDefer()
// send messages over the stream
void Promise.resolve().then(async () => {
await stream.sink(async function * () {
yield * input
}())
stream.close()
streamFinished.resolve()
})

// wait for all data to be sent over the stream
await streamFinished.promise

// close the muxer
await muxer.sink([])

// wait for all output to be collected
await muxerFinished.promise

// last message is unbatched
const closeMessage = output.pop()
expect(closeMessage).to.have.lengthOf(2)

// all other messages should be above or equal to the batch size
expect(output).to.have.lengthOf(2)
for (const buf of output) {
expect(buf).to.have.length.that.is.at.least(minSendBytes)
}
})
})
16 changes: 6 additions & 10 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ describe('restrict size', () => {
it('should throw when size is too big', async () => {
const maxSize = 32

const input: Message[] = [
const input: Message[][] = [[
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }
]
]]

const output: Message[] = []

Expand All @@ -37,10 +37,6 @@ describe('restrict size', () => {
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
expect(output).to.have.length(3)
expect(output[0]).to.deep.equal(input[0])
expect(output[1]).to.deep.equal(input[1])
expect(output[2]).to.deep.equal(input[2])
return
}
throw new Error('did not restrict size')
Expand All @@ -51,30 +47,30 @@ describe('restrict size', () => {
id: 4,
type: MessageTypes.CLOSE_RECEIVER
}
const input: Message[] = [message]
const input: Message[][] = [[message]]

const output = await pipe(
input,
encode,
decode(32),
async (source) => await all(source)
)
expect(output).to.deep.equal(input)
expect(output).to.deep.equal(input[0])
})

it('should throw when unprocessed message queue size is too big', async () => {
const maxMessageSize = 32
const maxUnprocessedMessageQueueSize = 64

const input: Message[] = [
const input: Message[][] = [[
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
]
]]

const output: Message[] = []

Expand Down