From 4e2a49df22430316140cd37a96fc3b6a8f95b76a Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 25 Nov 2022 12:01:13 +0000 Subject: [PATCH] feat: add message byte batching (#235) * feat: add message byte batching Adds a new setting `minSendBytes` that is `undefined` by default. If `undefined` all messages sent through multiplexed streams will be serialized and sent over the wire immediately. If set to a number, it will be used as a byte value, and the serialized bytes of all messages sent during the current tick will be buffered up to that value. Once either the buffer lengths hits that value or the next tick begins, all bytes in the buffer will be sent over the wire. * chore: add readme note --- README.md | 1 + package.json | 1 + src/encode.ts | 28 +++++++++++++++++----- src/index.ts | 12 ++++++++++ src/mplex.ts | 2 +- test/mplex.spec.ts | 59 ++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 96 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 2a66b60..651fcdb 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ Creates a factory that can be used to create new muxers. - `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB) - `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB) +- `minSendBytes` - if set, message bytes from the current tick will be batched up to this amount before being yielded by the muxer source, unless the next tick begins in which case all available bytes will be yielded - `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024) - `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024) - `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB) diff --git a/package.json b/package.json index 2807c7b..8504ea8 100644 --- a/package.json +++ b/package.json @@ -153,6 +153,7 @@ "any-signal": "^3.0.0", "benchmark": "^2.1.4", "err-code": "^3.0.1", + "it-batched-bytes": "^1.0.0", "it-pushable": "^3.1.0", "it-stream-types": "^1.0.4", "rate-limiter-flexible": "^2.3.9", diff --git a/src/encode.ts b/src/encode.ts index edef35a..7b9f55a 100644 --- a/src/encode.ts +++ b/src/encode.ts @@ -3,6 +3,7 @@ import varint from 'varint' import { Uint8ArrayList } from 'uint8arraylist' import { allocUnsafe } from './alloc-unsafe.js' import { Message, MessageTypes } from './message-types.js' +import batchedBytes from 'it-batched-bytes' const POOL_SIZE = 10 * 1024 @@ -55,14 +56,29 @@ const encoder = new Encoder() /** * Encode and yield one or more messages */ -export async function * encode (source: Source) { - for await (const msgs of source) { - const list = new Uint8ArrayList() +export async function * encode (source: Source, minSendBytes: number = 0) { + if (minSendBytes == null || minSendBytes === 0) { + // just send the messages + for await (const messages of source) { + const list = new Uint8ArrayList() - for (const msg of msgs) { - encoder.write(msg, list) + for (const msg of messages) { + encoder.write(msg, list) + } + + yield list.subarray() } - yield list.subarray() + return } + + // batch messages up for sending + yield * batchedBytes(source, { + size: minSendBytes, + serialize: (obj, list) => { + for (const m of obj) { + encoder.write(m, list) + } + } + }) } diff --git a/src/index.ts b/src/index.ts index da98e48..8a72a21 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,6 +19,18 @@ export interface MplexInit { */ maxUnprocessedMessageQueueSize?: number + /** + * Each byte array written into a multiplexed stream is converted to one or + * more messages which are sent as byte arrays to the remote node. Sending + * lots of small messages can be expensive - use this setting to batch up + * the serialized bytes of all messages sent during the current tick up to + * this limit to send in one go similar to Nagle's algorithm. N.b. you + * should benchmark your application carefully when using this setting as it + * may cause the opposite of the desired effect. Omit this setting to send + * all messages as they become available. (default: undefined) + */ + minSendBytes?: number + /** * The maximum number of multiplexed streams that can be open at any * one time. A request to open more than this will have a stream diff --git a/src/mplex.ts b/src/mplex.ts index 175af70..c4f981a 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -231,7 +231,7 @@ export class MplexStreamMuxer implements StreamMuxer { onEnd }) - return Object.assign(encode(source), { + return Object.assign(encode(source, this._init.minSendBytes), { push: source.push, end: source.end, return: source.return diff --git a/test/mplex.spec.ts b/test/mplex.spec.ts index 1fa32bc..7443a99 100644 --- a/test/mplex.spec.ts +++ b/test/mplex.spec.ts @@ -67,14 +67,17 @@ describe('mplex', () => { stream.end() const bufs: Uint8Array[] = [] + const sinkDone = pDefer() void Promise.resolve().then(async () => { for await (const buf of muxer.source) { bufs.push(buf) } + sinkDone.resolve() }) await muxer.sink(stream) + await sinkDone.promise const messages = await all(decode()(bufs)) @@ -162,4 +165,60 @@ describe('mplex', () => { expect(messages).to.have.nested.property('[0].id', id) expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER) }) + + it('should batch bytes to send', async () => { + const minSendBytes = 10 + + // input bytes, smaller than batch size + const input: Uint8Array[] = [ + Uint8Array.from([0, 1, 2, 3, 4]), + Uint8Array.from([0, 1, 2, 3, 4]), + Uint8Array.from([0, 1, 2, 3, 4]) + ] + + // create the muxer + const factory = mplex({ + minSendBytes + })() + const muxer = factory.createStreamMuxer({}) + + // collect outgoing mplex messages + const muxerFinished = pDefer() + let output: Uint8Array[] = [] + void Promise.resolve().then(async () => { + output = await all(muxer.source) + muxerFinished.resolve() + }) + + // create a stream + const stream = await muxer.newStream() + const streamFinished = pDefer() + // send messages over the stream + void Promise.resolve().then(async () => { + await stream.sink(async function * () { + yield * input + }()) + stream.close() + streamFinished.resolve() + }) + + // wait for all data to be sent over the stream + await streamFinished.promise + + // close the muxer + await muxer.sink([]) + + // wait for all output to be collected + await muxerFinished.promise + + // last message is unbatched + const closeMessage = output.pop() + expect(closeMessage).to.have.lengthOf(2) + + // all other messages should be above or equal to the batch size + expect(output).to.have.lengthOf(2) + for (const buf of output) { + expect(buf).to.have.length.that.is.at.least(minSendBytes) + } + }) })