Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: add message byte batching #235

Merged
merged 3 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Creates a factory that can be used to create new muxers.

- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB)
- `minSendBytes` - if set, message bytes from the current tick will be batched up to this amount before being yielded by the muxer source, unless the next tick begins in which case all available bytes will be yielded
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"any-signal": "^3.0.0",
"benchmark": "^2.1.4",
"err-code": "^3.0.1",
"it-batched-bytes": "^1.0.0",
"it-pushable": "^3.1.0",
"it-stream-types": "^1.0.4",
"rate-limiter-flexible": "^2.3.9",
Expand Down
28 changes: 22 additions & 6 deletions src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import varint from 'varint'
import { Uint8ArrayList } from 'uint8arraylist'
import { allocUnsafe } from './alloc-unsafe.js'
import { Message, MessageTypes } from './message-types.js'
import batchedBytes from 'it-batched-bytes'

const POOL_SIZE = 10 * 1024

Expand Down Expand Up @@ -55,14 +56,29 @@ const encoder = new Encoder()
/**
* Encode and yield one or more messages
*/
export async function * encode (source: Source<Message[]>) {
for await (const msgs of source) {
const list = new Uint8ArrayList()
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0) {
if (minSendBytes == null || minSendBytes === 0) {
// just send the messages
for await (const messages of source) {
const list = new Uint8ArrayList()

for (const msg of msgs) {
encoder.write(msg, list)
for (const msg of messages) {
encoder.write(msg, list)
}

yield list.subarray()
}

yield list.subarray()
return
}

// batch messages up for sending
yield * batchedBytes(source, {
size: minSendBytes,
serialize: (obj, list) => {
for (const m of obj) {
encoder.write(m, list)
}
}
})
}
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ export interface MplexInit {
*/
maxUnprocessedMessageQueueSize?: number

/**
* Each byte array written into a multiplexed stream is converted to one or
* more messages which are sent as byte arrays to the remote node. Sending
* lots of small messages can be expensive - use this setting to batch up
* the serialized bytes of all messages sent during the current tick up to
* this limit to send in one go similar to Nagle's algorithm. N.b. you
* should benchmark your application carefully when using this setting as it
* may cause the opposite of the desired effect. Omit this setting to send
* all messages as they become available. (default: undefined)
*/
minSendBytes?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. A request to open more than this will have a stream
Expand Down
2 changes: 1 addition & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class MplexStreamMuxer implements StreamMuxer {
onEnd
})

return Object.assign(encode(source), {
return Object.assign(encode(source, this._init.minSendBytes), {
push: source.push,
end: source.end,
return: source.return
Expand Down
59 changes: 59 additions & 0 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ describe('mplex', () => {
stream.end()

const bufs: Uint8Array[] = []
const sinkDone = pDefer()

void Promise.resolve().then(async () => {
for await (const buf of muxer.source) {
bufs.push(buf)
}
sinkDone.resolve()
})

await muxer.sink(stream)
await sinkDone.promise

const messages = await all(decode()(bufs))

Expand Down Expand Up @@ -162,4 +165,60 @@ describe('mplex', () => {
expect(messages).to.have.nested.property('[0].id', id)
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
})

it('should batch bytes to send', async () => {
const minSendBytes = 10

// input bytes, smaller than batch size
const input: Uint8Array[] = [
Uint8Array.from([0, 1, 2, 3, 4]),
Uint8Array.from([0, 1, 2, 3, 4]),
Uint8Array.from([0, 1, 2, 3, 4])
]

// create the muxer
const factory = mplex({
minSendBytes
})()
const muxer = factory.createStreamMuxer({})

// collect outgoing mplex messages
const muxerFinished = pDefer()
let output: Uint8Array[] = []
void Promise.resolve().then(async () => {
output = await all(muxer.source)
muxerFinished.resolve()
})

// create a stream
const stream = await muxer.newStream()
const streamFinished = pDefer()
// send messages over the stream
void Promise.resolve().then(async () => {
await stream.sink(async function * () {
yield * input
}())
stream.close()
streamFinished.resolve()
})

// wait for all data to be sent over the stream
await streamFinished.promise

// close the muxer
await muxer.sink([])

// wait for all output to be collected
await muxerFinished.promise

// last message is unbatched
const closeMessage = output.pop()
expect(closeMessage).to.have.lengthOf(2)

// all other messages should be above or equal to the batch size
expect(output).to.have.lengthOf(2)
for (const buf of output) {
expect(buf).to.have.length.that.is.at.least(minSendBytes)
}
})
})