From 9c45f5eaca5c5b7a837942461b0e814370d9d3f7 Mon Sep 17 00:00:00 2001 From: Cayman Date: Mon, 14 Aug 2023 17:27:13 -0400 Subject: [PATCH] feat: add message byte batching --- package.json | 1 + src/config.ts | 18 +++++++++++++++++- src/muxer.ts | 36 +++++++++++++++++++++++++++--------- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index 9a2728c..d825884 100644 --- a/package.json +++ b/package.json @@ -174,6 +174,7 @@ "@libp2p/interface": "^0.1.0", "@libp2p/logger": "^3.0.0", "abortable-iterator": "^5.0.1", + "it-batched-bytes": "^2.0.3", "it-foreach": "^2.0.3", "it-pipe": "^3.0.1", "it-pushable": "^3.2.0", diff --git a/src/config.ts b/src/config.ts index 887e461..8b102da 100644 --- a/src/config.ts +++ b/src/config.ts @@ -52,6 +52,18 @@ export interface Config { * This ensures that a single stream doesn't hog a connection. */ maxMessageSize: number + + /** + * Each byte array written into a multiplexed stream is converted to one or + * more messages which are sent as byte arrays to the remote node. Sending + * lots of small messages can be expensive - use this setting to batch up + * the serialized bytes of all messages sent during the current tick up to + * this limit to send in one go similar to Nagle's algorithm. N.b. you + * should benchmark your application carefully when using this setting as it + * may cause the opposite of the desired effect. Omit this setting to send + * all messages as they become available. (default: 0) + */ + minSendBytes: number } export const defaultConfig: Config = { @@ -62,7 +74,8 @@ export const defaultConfig: Config = { maxOutboundStreams: 1_000, initialStreamWindowSize: INITIAL_STREAM_WINDOW, maxStreamWindowSize: MAX_STREAM_WINDOW, - maxMessageSize: 64 * 1024 + maxMessageSize: 64 * 1024, + minSendBytes: 0 } export function verifyConfig (config: Config): void { @@ -87,4 +100,7 @@ export function verifyConfig (config: Config): void { if (config.maxMessageSize < 1024) { throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG) } + if (config.minSendBytes < 0) { + throw new CodeError('MinSendBytes must be greater or equal to 0', ERR_INVALID_CONFIG) + } } diff --git a/src/muxer.ts b/src/muxer.ts index 9f3f6ac..eddaee8 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,8 +1,9 @@ import { CodeError } from '@libp2p/interface/errors' import { logger, type Logger } from '@libp2p/logger' import { abortableSource } from 'abortable-iterator' +import batchedBytes from 'it-batched-bytes' import { pipe } from 'it-pipe' -import { pushable, type Pushable } from 'it-pushable' +import { pushableV, type PushableV } from 'it-pushable' import { type Config, defaultConfig, verifyConfig } from './config.js' import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js' import { Decoder } from './decode.js' @@ -13,7 +14,7 @@ import type { AbortOptions } from '@libp2p/interface' import type { Stream } from '@libp2p/interface/connection' import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer' import type { Sink, Source } from 'it-stream-types' -import type { Uint8ArrayList } from 'uint8arraylist' +import { Uint8ArrayList } from 'uint8arraylist' const YAMUX_PROTOCOL_ID = '/yamux/1.0.0' const CLOSE_TIMEOUT = 500 @@ -43,12 +44,13 @@ export interface CloseOptions extends AbortOptions { export class YamuxMuxer implements StreamMuxer { protocol = YAMUX_PROTOCOL_ID - source: Pushable + source: AsyncGenerator sink: Sink, Promise> private readonly config: Config private readonly log?: Logger + private readonly _source: PushableV /** Used to close the muxer from either the sink or source */ private readonly closeController: AbortController @@ -80,7 +82,7 @@ export class YamuxMuxer implements StreamMuxer { constructor (init: YamuxMuxerInit) { this.client = init.direction === 'outbound' - this.config = { ...defaultConfig, ...init } + const config = this.config = { ...defaultConfig, ...init } this.log = this.config.log verifyConfig(this.config) @@ -91,7 +93,7 @@ export class YamuxMuxer implements StreamMuxer { this._streams = new Map() - this.source = pushable({ + this._source = pushableV({ onEnd: (): void => { this.log?.trace('muxer source ended') @@ -100,6 +102,22 @@ export class YamuxMuxer implements StreamMuxer { }) } }) + this.source = pipe( + this._source, + config.minSendBytes === 0 + ? async function * (source) { + for await (const bufs of source) { + yield new Uint8ArrayList(...bufs).subarray() + } + } + : async function * (source) { + yield * batchedBytes(source, { + size: config.minSendBytes, + serialize: (bufs, list) => { list.appendAll(bufs) } + + }) + } + ) this.sink = async (source: Source): Promise => { source = abortableSource( @@ -322,7 +340,7 @@ export class YamuxMuxer implements StreamMuxer { this.closeController.abort() // stop the source - this.source.end() + this._source.end() } /** Create a new stream */ @@ -538,10 +556,10 @@ export class YamuxMuxer implements StreamMuxer { if (data === undefined) { throw new CodeError('invalid frame', ERR_INVALID_FRAME) } - this.source.push(encodeHeader(header)) - this.source.push(data) + this._source.push(encodeHeader(header)) + this._source.push(data) } else { - this.source.push(encodeHeader(header)) + this._source.push(encodeHeader(header)) } }