Skip to content

Commit

Permalink
feat!: allow stream muxers and connection encrypters to yield lists (#…
Browse files Browse the repository at this point in the history
…2256)

Updates the stream type for `MultiaddrConnection` to `Uint8Array | Uint8ArrayList` - this lets us yield `Uint8ArrayList`s from stream muxers and connection encrypters instead of having to copy the list contents into a new `Uint8Array` every time.

This lowers the connection latency slightly and increases stream throughput according to the [perf test results](https://observablehq.com/@libp2p-workspace/performance-dashboard?branch=fa6fd4179febbd14ed92d4a7e83d52f729a3af07).

BREAKING CHANGE: the `minSendBytes` option has been removed from Mplex since the transport can now decide how to optimise sending data
  • Loading branch information
achingbrain authored Nov 25, 2023
1 parent ac7bc38 commit 4a474d5
Show file tree
Hide file tree
Showing 35 changed files with 220 additions and 283 deletions.
4 changes: 1 addition & 3 deletions packages/connection-encrypter-plaintext/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@
"@libp2p/interface": "^0.1.2",
"@libp2p/peer-id": "^3.0.6",
"@multiformats/multiaddr": "^12.1.10",
"it-handshake": "^4.1.3",
"it-length-prefixed": "^9.0.3",
"it-map": "^3.0.4",
"it-protobuf-stream": "^1.1.1",
"it-stream-types": "^2.0.1",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.4.3"
Expand Down
84 changes: 39 additions & 45 deletions packages/connection-encrypter-plaintext/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,53 @@

import { UnexpectedPeerError, InvalidCryptoExchangeError } from '@libp2p/interface/errors'
import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id'
import { handshake } from 'it-handshake'
import * as lp from 'it-length-prefixed'
import map from 'it-map'
import { pbStream } from 'it-protobuf-stream'
import { Exchange, KeyType } from './pb/proto.js'
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { MultiaddrConnection } from '@libp2p/interface/connection'
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface/connection-encrypter'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Duplex, Source } from 'it-stream-types'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const PROTOCOL = '/plaintext/2.0.0'

function lpEncodeExchange (exchange: Exchange): Uint8ArrayList {
const pb = Exchange.encode(exchange)

return lp.encode.single(pb)
}

export interface PlaintextComponents {
logger: ComponentLogger
}

export interface PlaintextInit {
/**
* The peer id exchange must complete within this many milliseconds
* (default: 1000)
*/
timeout?: number
}

class Plaintext implements ConnectionEncrypter {
public protocol: string = PROTOCOL
private readonly log: Logger
private readonly timeout: number

constructor (components: PlaintextComponents) {
constructor (components: PlaintextComponents, init: PlaintextInit = {}) {
this.log = components.logger.forComponent('libp2p:plaintext')
this.timeout = init.timeout ?? 1000
}

async secureInbound (localId: PeerId, conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remoteId?: PeerId): Promise<SecuredConnection> {
async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, remoteId)
}

async secureOutbound (localId: PeerId, conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remoteId?: PeerId): Promise<SecuredConnection> {
async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, remoteId)
}

/**
* Encrypt connection
*/
async _encrypt (localId: PeerId, conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remoteId?: PeerId): Promise<SecuredConnection> {
const shake = handshake(conn)
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
const signal = AbortSignal.timeout(this.timeout)
const pb = pbStream(conn).pb(Exchange)

let type = KeyType.RSA

Expand All @@ -75,45 +79,40 @@ class Plaintext implements ConnectionEncrypter {
}

// Encode the public key and write it to the remote peer
shake.write(
lpEncodeExchange({
id: localId.toBytes(),
pubkey: {
Type: type,
Data: localId.publicKey ?? new Uint8Array(0)
}
}).subarray()
)
await pb.write({
id: localId.toBytes(),
pubkey: {
Type: type,
Data: localId.publicKey ?? new Uint8Array(0)
}
}, {
signal
})

this.log('write pubkey exchange to peer %p', remoteId)

// Get the Exchange message
const response = (await lp.decode.fromReader(shake.reader).next()).value

if (response == null) {
throw new Error('Did not read response')
}

const id = Exchange.decode(response)
this.log('read pubkey exchange from peer %p', remoteId)
const response = await pb.read({
signal
})

let peerId
try {
if (id.pubkey == null) {
if (response.pubkey == null) {
throw new Error('Public key missing')
}

if (id.pubkey.Data.length === 0) {
if (response.pubkey.Data.length === 0) {
throw new Error('Public key data too short')
}

if (id.id == null) {
if (response.id == null) {
throw new Error('Remote id missing')
}

peerId = await peerIdFromKeys(id.pubkey.Data)
peerId = await peerIdFromKeys(response.pubkey.Data)

if (!peerId.equals(peerIdFromBytes(id.id))) {
if (!peerId.equals(peerIdFromBytes(response.id))) {
throw new Error('Public key did not match id')
}
} catch (err: any) {
Expand All @@ -127,18 +126,13 @@ class Plaintext implements ConnectionEncrypter {

this.log('plaintext key exchange completed successfully with peer %p', peerId)

shake.rest()

return {
conn: {
sink: shake.stream.sink,
source: map(shake.stream.source, (buf) => buf.subarray())
},
conn: pb.unwrap().unwrap(),
remotePeer: peerId
}
}
}

export function plaintext (): (components: PlaintextComponents) => ConnectionEncrypter {
return (components) => new Plaintext(components)
export function plaintext (init?: PlaintextInit): (components: PlaintextComponents) => ConnectionEncrypter {
return (components) => new Plaintext(components, init)
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ export default (common: TestSetup<ConnectionEncrypter>): void => {
// Send some data and collect the result
const input = uint8ArrayFromString('data to encrypt')
const result = await pipe(
[input],
async function * () {
yield input
},
outboundResult.conn,
async (source) => all(source)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import { multiaddr } from '@multiformats/multiaddr'
import { duplexPair } from 'it-pair/duplex'
import type { MultiaddrConnection } from '@libp2p/interface/connection'
import type { Duplex, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export function createMaConnPair (): [MultiaddrConnection, MultiaddrConnection] {
const [local, remote] = duplexPair<Uint8Array>()
const [local, remote] = duplexPair<Uint8Array | Uint8ArrayList>()

function duplexToMaConn (duplex: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>): MultiaddrConnection {
function duplexToMaConn (duplex: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>, Source<Uint8Array | Uint8ArrayList>, Promise<void>>): MultiaddrConnection {
const output: MultiaddrConnection = {
...duplex,
close: async () => {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ export interface Peer {
}

export function multiaddrConnectionPair (a: { peerId: PeerId, registrar: Registrar }, b: { peerId: PeerId, registrar: Registrar }): [ MultiaddrConnection, MultiaddrConnection ] {
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array>()
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array | Uint8ArrayList>()

return [
mockMultiaddrConnection(peerAtoPeerB, b.peerId),
Expand Down
3 changes: 2 additions & 1 deletion packages/interface-compliance-tests/src/mocks/duplex.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Duplex, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export function mockDuplex (): Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
export function mockDuplex (): Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>, Source<Uint8Array | Uint8ArrayList>, Promise<void>> {
return {
source: (async function * () {
yield * []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import type { MultiaddrConnection } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Array>> & Partial<MultiaddrConnection>, peerId: PeerId): MultiaddrConnection {
export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> & Partial<MultiaddrConnection>, peerId: PeerId): MultiaddrConnection {
const maConn: MultiaddrConnection = {
async close () {

Expand Down Expand Up @@ -36,7 +37,7 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
const { addrs, remotePeer } = opts
const controller = new AbortController()
const [localAddr, remoteAddr] = addrs
const [inboundStream, outboundStream] = duplexPair<Uint8Array>()
const [inboundStream, outboundStream] = duplexPair<Uint8Array | Uint8ArrayList>()

const outbound: MultiaddrConnection = {
...outboundStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async function drainAndClose (stream: Duplex<any>): Promise<void> {
export default (common: TestSetup<StreamMuxerFactory>): void => {
describe('base', () => {
it('Open a stream from the dialer', async () => {
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const onStreamPromise: DeferredPromise<Stream> = defer()
Expand Down Expand Up @@ -75,7 +75,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
})

it('Open a stream from the listener', async () => {
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onStreamPromise: DeferredPromise<Stream> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
Expand Down Expand Up @@ -106,7 +106,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
})

it('Open a stream on both sides', async () => {
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
const dialerFactory = await common.setup()
Expand Down Expand Up @@ -146,7 +146,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {

it('Open a stream on one side, write, open a stream on the other side', async () => {
const toString = (source: Source<Uint8ArrayList>): AsyncGenerator<string> => map(source, (u) => uint8ArrayToString(u.subarray()))
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
const dialerFactory = await common.setup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
}
})

const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

Expand All @@ -104,7 +104,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {

// Pause, and then close the dialer
await delay(50)
await pipe([], dialer, drain)
await pipe(async function * () {}, dialer, drain)

expect(openedStreams).to.have.equal(expectedStreams)
expect(dialer.streams).to.have.lengthOf(0)
Expand All @@ -126,7 +126,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
}
})

const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

Expand Down Expand Up @@ -169,7 +169,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
}
})

const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

Expand Down Expand Up @@ -225,7 +225,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
})

it('closing one of the muxed streams doesn\'t close others', async () => {
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })

Expand Down Expand Up @@ -276,7 +276,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
it('can close a stream for writing', async () => {
const deferred = pDefer<Error>()

const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const data = [randomBuffer(), randomBuffer()]
Expand Down Expand Up @@ -321,7 +321,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {

it('can close a stream for reading', async () => {
const deferred = pDefer<Uint8ArrayList[]>()
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d))
Expand Down Expand Up @@ -387,7 +387,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
it('should wait for all data to be sent when closing streams', async () => {
const deferred = pDefer<Message>()

const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })

Expand Down Expand Up @@ -429,7 +429,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
it('should abort closing a stream with outstanding data to read', async () => {
const deferred = pDefer<Message>()
const p = duplexPair<Uint8Array>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer'

export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMuxer>, nStreams: number, nMsg: number, limit?: number): Promise<void> => {
const [dialerSocket, listenerSocket] = duplexPair<Uint8Array>()
const [dialerSocket, listenerSocket] = duplexPair<Uint8Array | Uint8ArrayList>()

const msg = new Uint8ArrayList(uint8ArrayFromString('simple msg'))

Expand Down
12 changes: 7 additions & 5 deletions packages/interface/src/connection-encrypter/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { MultiaddrConnection } from '../connection/index.js'
import type { PeerId } from '../peer-id/index.js'
import type { Duplex, Source } from 'it-stream-types'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

/**
* A libp2p connection encrypter module must be compliant to this interface
Expand All @@ -13,18 +15,18 @@ export interface ConnectionEncrypter<Extension = unknown> {
* pass it for extra verification, otherwise it will be determined during
* the handshake.
*/
secureOutbound(localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<Extension>>
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>

/**
* Decrypt incoming data. If the remote PeerId is known,
* pass it for extra verification, otherwise it will be determined during
* the handshake
*/
secureInbound(localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<Extension>>
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
}

export interface SecuredConnection<Extension = unknown> {
conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>
export interface SecuredConnection<Stream = any, Extension = unknown> {
conn: Stream
remoteExtensions?: Extension
remotePeer: PeerId
}
2 changes: 1 addition & 1 deletion packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export interface MultiaddrConnectionTimeline {
* a peer. It is a low-level primitive and is the raw connection
* without encryption or stream multiplexing.
*/
export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
/**
* Gracefully close the connection. All queued data will be written to the
* underlying transport.
Expand Down
Loading

0 comments on commit 4a474d5

Please sign in to comment.