Skip to content

Commit

Permalink
feat: add message byte batching
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Aug 14, 2023
1 parent e4fac56 commit 9c45f5e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"abortable-iterator": "^5.0.1",
"it-batched-bytes": "^2.0.3",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
Expand Down
18 changes: 17 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ export interface Config {
* This ensures that a single stream doesn't hog a connection.
*/
maxMessageSize: number

/**
* Each byte array written into a multiplexed stream is converted to one or
* more messages which are sent as byte arrays to the remote node. Sending
* lots of small messages can be expensive - use this setting to batch up
* the serialized bytes of all messages sent during the current tick up to
* this limit to send in one go similar to Nagle's algorithm. N.b. you
* should benchmark your application carefully when using this setting as it
* may cause the opposite of the desired effect. Omit this setting to send
* all messages as they become available. (default: 0)
*/
minSendBytes: number
}

export const defaultConfig: Config = {
Expand All @@ -62,7 +74,8 @@ export const defaultConfig: Config = {
maxOutboundStreams: 1_000,
initialStreamWindowSize: INITIAL_STREAM_WINDOW,
maxStreamWindowSize: MAX_STREAM_WINDOW,
maxMessageSize: 64 * 1024
maxMessageSize: 64 * 1024,
minSendBytes: 0
}

export function verifyConfig (config: Config): void {
Expand All @@ -87,4 +100,7 @@ export function verifyConfig (config: Config): void {
if (config.maxMessageSize < 1024) {
throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG)
}
if (config.minSendBytes < 0) {
throw new CodeError('MinSendBytes must be greater or equal to 0', ERR_INVALID_CONFIG)
}
}
36 changes: 27 additions & 9 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import batchedBytes from 'it-batched-bytes'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
import { pushableV, type PushableV } from 'it-pushable'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
Expand All @@ -13,7 +14,7 @@ import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500
Expand Down Expand Up @@ -43,12 +44,13 @@ export interface CloseOptions extends AbortOptions {

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
source: AsyncGenerator<Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly config: Config
private readonly log?: Logger

private readonly _source: PushableV<Uint8Array>
/** Used to close the muxer from either the sink or source */
private readonly closeController: AbortController

Expand Down Expand Up @@ -80,7 +82,7 @@ export class YamuxMuxer implements StreamMuxer {

constructor (init: YamuxMuxerInit) {
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
const config = this.config = { ...defaultConfig, ...init }
this.log = this.config.log
verifyConfig(this.config)

Expand All @@ -91,7 +93,7 @@ export class YamuxMuxer implements StreamMuxer {

this._streams = new Map()

this.source = pushable({
this._source = pushableV({
onEnd: (): void => {
this.log?.trace('muxer source ended')

Expand All @@ -100,6 +102,22 @@ export class YamuxMuxer implements StreamMuxer {
})
}
})
this.source = pipe(
this._source,
config.minSendBytes === 0
? async function * (source) {
for await (const bufs of source) {
yield new Uint8ArrayList(...bufs).subarray()
}
}
: async function * (source) {
yield * batchedBytes(source, {
size: config.minSendBytes,
serialize: (bufs, list) => { list.appendAll(bufs) }

})
}
)

this.sink = async (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> => {
source = abortableSource(
Expand Down Expand Up @@ -322,7 +340,7 @@ export class YamuxMuxer implements StreamMuxer {
this.closeController.abort()

// stop the source
this.source.end()
this._source.end()
}

/** Create a new stream */
Expand Down Expand Up @@ -538,10 +556,10 @@ export class YamuxMuxer implements StreamMuxer {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
}
this.source.push(encodeHeader(header))
this.source.push(data)
this._source.push(encodeHeader(header))
this._source.push(data)
} else {
this.source.push(encodeHeader(header))
this._source.push(encodeHeader(header))
}
}

Expand Down

0 comments on commit 9c45f5e

Please sign in to comment.