Skip to content

Commit

Permalink
fix!: upgrade to libp2p@2.x.x
Browse files Browse the repository at this point in the history
Incorporates changes necessary to upgrade to libp2p@2.x.x

BREAKING CHANGE: requires libp2p@2.x.x
  • Loading branch information
achingbrain committed Sep 9, 2024
1 parent 0a8b06f commit 9c4dadc
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 69 deletions.
18 changes: 9 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CodeError } from '@libp2p/interface'
import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'
import { InvalidParametersError } from '@libp2p/interface'
import { INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'

// TOOD use config items or delete them
export interface Config {
Expand Down Expand Up @@ -58,24 +58,24 @@ export const defaultConfig: Config = {

export function verifyConfig (config: Config): void {
if (config.keepAliveInterval <= 0) {
throw new CodeError('keep-alive interval must be positive', ERR_INVALID_CONFIG)
throw new InvalidParametersError('keep-alive interval must be positive')
}
if (config.maxInboundStreams < 0) {
throw new CodeError('max inbound streams must be larger or equal 0', ERR_INVALID_CONFIG)
throw new InvalidParametersError('max inbound streams must be larger or equal 0')
}
if (config.maxOutboundStreams < 0) {
throw new CodeError('max outbound streams must be larger or equal 0', ERR_INVALID_CONFIG)
throw new InvalidParametersError('max outbound streams must be larger or equal 0')
}
if (config.initialStreamWindowSize < INITIAL_STREAM_WINDOW) {
throw new CodeError('InitialStreamWindowSize must be larger or equal 256 kB', ERR_INVALID_CONFIG)
throw new InvalidParametersError('InitialStreamWindowSize must be larger or equal 256 kB')
}
if (config.maxStreamWindowSize < config.initialStreamWindowSize) {
throw new CodeError('MaxStreamWindowSize must be larger than the InitialStreamWindowSize', ERR_INVALID_CONFIG)
throw new InvalidParametersError('MaxStreamWindowSize must be larger than the InitialStreamWindowSize')
}
if (config.maxStreamWindowSize > 2 ** 32 - 1) {
throw new CodeError('MaxStreamWindowSize must be less than equal MAX_UINT32', ERR_INVALID_CONFIG)
throw new InvalidParametersError('MaxStreamWindowSize must be less than equal MAX_UINT32')
}
if (config.maxMessageSize < 1024) {
throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG)
throw new InvalidParametersError('MaxMessageSize must be greater than a kilobyte')
}
}
32 changes: 8 additions & 24 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,17 @@
// Protocol violation errors

export const ERR_INVALID_FRAME = 'ERR_INVALID_FRAME'
export const ERR_UNREQUESTED_PING = 'ERR_UNREQUESTED_PING'
export const ERR_NOT_MATCHING_PING = 'ERR_NOT_MATCHING_PING'
export const ERR_STREAM_ALREADY_EXISTS = 'ERR_STREAM_ALREADY_EXISTS'
export const ERR_DECODE_INVALID_VERSION = 'ERR_DECODE_INVALID_VERSION'
export const ERR_BOTH_CLIENTS = 'ERR_BOTH_CLIENTS'
export const ERR_RECV_WINDOW_EXCEEDED = 'ERR_RECV_WINDOW_EXCEEDED'
import { BothClientsError, DecodeInvalidVersionError, InvalidFrameError, NotMatchingPingError, ReceiveWindowExceededError, StreamAlreadyExistsError, UnrequestedPingError } from './errors.js'

export const PROTOCOL_ERRORS = new Set([
ERR_INVALID_FRAME,
ERR_UNREQUESTED_PING,
ERR_NOT_MATCHING_PING,
ERR_STREAM_ALREADY_EXISTS,
ERR_DECODE_INVALID_VERSION,
ERR_BOTH_CLIENTS,
ERR_RECV_WINDOW_EXCEEDED
InvalidFrameError.name,
UnrequestedPingError.name,
NotMatchingPingError.name,
StreamAlreadyExistsError.name,
DecodeInvalidVersionError.name,
BothClientsError.name,
ReceiveWindowExceededError.name
])

// local errors

export const ERR_INVALID_CONFIG = 'ERR_INVALID_CONFIG'
export const ERR_MUXER_LOCAL_CLOSED = 'ERR_MUXER_LOCAL_CLOSED'
export const ERR_MUXER_REMOTE_CLOSED = 'ERR_MUXER_REMOTE_CLOSED'
export const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
export const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
export const ERR_MAX_OUTBOUND_STREAMS_EXCEEDED = 'ERROR_MAX_OUTBOUND_STREAMS_EXCEEDED'
export const ERR_DECODE_IN_PROGRESS = 'ERR_DECODE_IN_PROGRESS'

/**
* INITIAL_STREAM_WINDOW is the initial stream window size.
*
Expand Down
7 changes: 3 additions & 4 deletions src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CodeError } from '@libp2p/interface'
import { Uint8ArrayList } from 'uint8arraylist'
import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js'
import { InvalidFrameError, InvalidStateError } from './errors.js'
import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js'
import type { Source } from 'it-stream-types'

Expand All @@ -15,7 +14,7 @@ const twoPow24 = 2 ** 24
*/
export function decodeHeader (data: Uint8Array): FrameHeader {
if (data[0] !== YAMUX_VERSION) {
throw new CodeError('Invalid frame version', ERR_DECODE_INVALID_VERSION)
throw new InvalidFrameError('Invalid frame version')
}
return {
type: data[1],
Expand Down Expand Up @@ -87,7 +86,7 @@ export class Decoder {
// Sanity check to ensure a header isn't read when another frame is partially decoded
// In practice this shouldn't happen
if (this.frameInProgress) {
throw new CodeError('decoding frame already in progress', ERR_DECODE_IN_PROGRESS)
throw new InvalidStateError('decoding frame already in progress')
}

if (this.buffer.length < HEADER_LENGTH) {
Expand Down
71 changes: 71 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
export class InvalidFrameError extends Error {
static name = 'InvalidFrameError'

constructor (message = 'The frame was invalid') {
super(message)
this.name = 'InvalidFrameError'
}
}

export class UnrequestedPingError extends Error {
static name = 'UnrequestedPingError'

constructor (message = 'Unrequested ping error') {
super(message)
this.name = 'UnrequestedPingError'
}
}

export class NotMatchingPingError extends Error {
static name = 'NotMatchingPingError'

constructor (message = 'Unrequested ping error') {
super(message)
this.name = 'NotMatchingPingError'
}
}

export class InvalidStateError extends Error {
static name = 'InvalidStateError'

constructor (message = 'Invalid state') {
super(message)
this.name = 'InvalidStateError'
}
}

export class StreamAlreadyExistsError extends Error {
static name = 'StreamAlreadyExistsError'

constructor (message = 'Strean already exists') {
super(message)
this.name = 'StreamAlreadyExistsError'
}
}

export class DecodeInvalidVersionError extends Error {
static name = 'DecodeInvalidVersionError'

constructor (message = 'Decode invalid version') {
super(message)
this.name = 'DecodeInvalidVersionError'
}
}

export class BothClientsError extends Error {
static name = 'BothClientsError'

constructor (message = 'Both clients') {
super(message)
this.name = 'BothClientsError'
}
}

export class ReceiveWindowExceededError extends Error {
static name = 'ReceiveWindowExceededError'

constructor (message = 'Receive window exceeded') {
super(message)
this.name = 'ReceiveWindowExceededError'
}
}
38 changes: 19 additions & 19 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { CodeError, serviceCapabilities, setMaxListeners } from '@libp2p/interface'
import { InvalidParametersError, MuxerClosedError, TooManyOutboundProtocolStreamsError, serviceCapabilities, setMaxListeners } from '@libp2p/interface'
import { getIterator } from 'get-iterator'
import { pushable, type Pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
import { encodeHeader } from './encode.js'
import { InvalidFrameError, NotMatchingPingError, UnrequestedPingError } from './errors.js'
import { Flag, type FrameHeader, FrameType, GoAwayCode } from './frame.js'
import { StreamState, YamuxStream } from './stream.js'
import type { YamuxMuxerComponents } from './index.js'
Expand Down Expand Up @@ -139,10 +140,9 @@ export class YamuxMuxer implements StreamMuxer {
}

reason = GoAwayCode.NormalTermination
} catch (err: unknown) {
} catch (err: any) {
// either a protocol or internal error
const errCode = (err as { code: string }).code
if (PROTOCOL_ERRORS.has(errCode)) {
if (PROTOCOL_ERRORS.has(err.name)) {
this.log?.error('protocol error in sink', err)
reason = GoAwayCode.ProtocolError
} else {
Expand Down Expand Up @@ -187,18 +187,18 @@ export class YamuxMuxer implements StreamMuxer {

newStream (name?: string | undefined): YamuxStream {
if (this.remoteGoAway !== undefined) {
throw new CodeError('muxer closed remotely', ERR_MUXER_REMOTE_CLOSED)
throw new MuxerClosedError('Muxer closed remotely')
}
if (this.localGoAway !== undefined) {
throw new CodeError('muxer closed locally', ERR_MUXER_LOCAL_CLOSED)
throw new MuxerClosedError('Muxer closed locally')
}

const id = this.nextStreamID
this.nextStreamID += 2

// check against our configured maximum number of outbound streams
if (this.numOutboundStreams >= this.config.maxOutboundStreams) {
throw new CodeError('max outbound streams exceeded', ERR_MAX_OUTBOUND_STREAMS_EXCEEDED)
throw new TooManyOutboundProtocolStreamsError('max outbound streams exceeded')
}

this.log?.trace('new outgoing stream id=%s', id)
Expand All @@ -224,10 +224,10 @@ export class YamuxMuxer implements StreamMuxer {
*/
async ping (): Promise<number> {
if (this.remoteGoAway !== undefined) {
throw new CodeError('muxer closed remotely', ERR_MUXER_REMOTE_CLOSED)
throw new MuxerClosedError('Muxer closed remotely')
}
if (this.localGoAway !== undefined) {
throw new CodeError('muxer closed locally', ERR_MUXER_LOCAL_CLOSED)
throw new MuxerClosedError('Muxer closed locally')
}

// An active ping does not yet exist, handle the process here
Expand All @@ -239,7 +239,7 @@ export class YamuxMuxer implements StreamMuxer {
// this promise awaits resolution or the close controller aborting
promise: new Promise<void>((resolve, reject) => {
const closed = (): void => {
reject(new CodeError('muxer closed locally', ERR_MUXER_LOCAL_CLOSED))
reject(new MuxerClosedError('Muxer closed locally'))
}
this.closeController.signal.addEventListener('abort', closed, { once: true })
_resolve = (): void => {
Expand Down Expand Up @@ -357,7 +357,7 @@ export class YamuxMuxer implements StreamMuxer {
/** Create a new stream */
private _newStream (id: number, name: string | undefined, state: StreamState, direction: 'inbound' | 'outbound'): YamuxStream {
if (this._streams.get(id) != null) {
throw new CodeError('Stream already exists', ERR_STREAM_ALREADY_EXISTS, { id })
throw new InvalidParametersError('Stream already exists with that id')
}

const stream = new YamuxStream({
Expand Down Expand Up @@ -428,7 +428,7 @@ export class YamuxMuxer implements StreamMuxer {
{ this.handleGoAway(length); return }
default:
// Invalid state
throw new CodeError('Invalid frame type', ERR_INVALID_FRAME, { header })
throw new InvalidFrameError('Invalid frame type')
}
} else {
switch (header.type) {
Expand All @@ -437,7 +437,7 @@ export class YamuxMuxer implements StreamMuxer {
{ await this.handleStreamMessage(header, readData); return }
default:
// Invalid state
throw new CodeError('Invalid frame type', ERR_INVALID_FRAME, { header })
throw new InvalidFrameError('Invalid frame type')
}
}
}
Expand All @@ -452,18 +452,18 @@ export class YamuxMuxer implements StreamMuxer {
this.handlePingResponse(header.length)
} else {
// Invalid state
throw new CodeError('Invalid frame flag', ERR_INVALID_FRAME, { header })
throw new InvalidFrameError('Invalid frame flag')
}
}

private handlePingResponse (pingId: number): void {
if (this.activePing === undefined) {
// this ping was not requested
throw new CodeError('ping not requested', ERR_UNREQUESTED_PING)
throw new UnrequestedPingError('ping not requested')
}
if (this.activePing.id !== pingId) {
// this ping doesn't match our active ping request
throw new CodeError('ping doesn\'t match our id', ERR_NOT_MATCHING_PING)
throw new NotMatchingPingError('ping doesn\'t match our id')
}

// valid ping response
Expand Down Expand Up @@ -522,7 +522,7 @@ export class YamuxMuxer implements StreamMuxer {

private incomingStream (id: number): void {
if (this.client !== (id % 2 === 0)) {
throw new CodeError('both endpoints are clients', ERR_BOTH_CLIENTS)
throw new InvalidParametersError('Both endpoints are clients')
}
if (this._streams.has(id)) {
return
Expand Down Expand Up @@ -565,7 +565,7 @@ export class YamuxMuxer implements StreamMuxer {
this.log?.trace('sending frame %o', header)
if (header.type === FrameType.Data) {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
throw new InvalidFrameError('Invalid frame')
}
this.source.push(
new Uint8ArrayList(encodeHeader(header), data)
Expand Down
9 changes: 5 additions & 4 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { CodeError } from '@libp2p/interface'
import { AbortError } from '@libp2p/interface'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream'
import each from 'it-foreach'
import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js'
import { INITIAL_STREAM_WINDOW } from './constants.js'
import { ReceiveWindowExceededError } from './errors.js'
import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js'
import type { Config } from './config.js'
import type { AbortOptions } from '@libp2p/interface'
Expand Down Expand Up @@ -173,7 +174,7 @@ export class YamuxStream extends AbstractStream {
let reject: (err: Error) => void
const abort = (): void => {
if (this.status === 'open' || this.status === 'closing') {
reject(new CodeError('stream aborted', ERR_STREAM_ABORT))
reject(new AbortError('Stream aborted'))
} else {
// the stream was closed already, ignore the failure to send
resolve()
Expand Down Expand Up @@ -219,7 +220,7 @@ export class YamuxStream extends AbstractStream {

// check that our recv window is not exceeded
if (this.recvWindowCapacity < header.length) {
throw new CodeError('receive window exceeded', ERR_RECV_WINDOW_EXCEEDED, { available: this.recvWindowCapacity, recv: header.length })
throw new ReceiveWindowExceededError('Receive window exceeded')
}

const data = await readData()
Expand Down
5 changes: 2 additions & 3 deletions test/codec.util.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { CodeError } from '@libp2p/interface'
import { ERR_DECODE_INVALID_VERSION } from '../src/constants.js'
import { InvalidFrameError } from '../src/errors.js'
import { type FrameHeader, HEADER_LENGTH, YAMUX_VERSION } from '../src/frame.js'

// Slower encode / decode functions that use dataview
Expand All @@ -8,7 +7,7 @@ export function decodeHeaderNaive (data: Uint8Array): FrameHeader {
const view = new DataView(data.buffer, data.byteOffset, data.byteLength)

if (view.getUint8(0) !== YAMUX_VERSION) {
throw new CodeError('Invalid frame version', ERR_DECODE_INVALID_VERSION)
throw new InvalidFrameError('Invalid frame version')
}
return {
type: view.getUint8(1),
Expand Down
3 changes: 1 addition & 2 deletions test/decode.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-disable @typescript-eslint/dot-notation */
import { expect } from 'aegir/chai'
import { type Pushable, pushable } from 'it-pushable'
import { ERR_DECODE_IN_PROGRESS } from '../src/constants.js'
import { Decoder } from '../src/decode.js'
import { encodeHeader } from '../src/encode.js'
import { Flag, type FrameHeader, FrameType, GoAwayCode } from '../src/frame.js'
Expand Down Expand Up @@ -344,7 +343,7 @@ describe('Decoder', () => {
}
expect.fail('decoding another frame before the first is finished should error')
} catch (e) {
expect((e as { code: string }).code).to.equal(ERR_DECODE_IN_PROGRESS)
expect(e).to.have.property('name', 'InvalidStateError')
}
})
})
Expand Down
3 changes: 1 addition & 2 deletions test/muxer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { expect } from 'aegir/chai'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js'
import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js'

describe('muxer', () => {
Expand Down Expand Up @@ -104,7 +103,7 @@ describe('muxer', () => {

expect(() => {
client.newStream()
}).to.throw().with.property('code', ERR_MUXER_LOCAL_CLOSED, 'should not be able to open a stream after close')
}).to.throw().with.property('name', 'MuxerClosedError', 'should not be able to open a stream after close')
})

it('test keep alive', async () => {
Expand Down
Loading

0 comments on commit 9c4dadc

Please sign in to comment.