Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: use abstract stream class from muxer interface #279

Merged
merged 4 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
},
"dependencies": {
"@libp2p/interface-connection": "^5.0.0",
"@libp2p/interface-stream-muxer": "^4.0.0",
"@libp2p/interface-stream-muxer": "^4.1.2",
"@libp2p/interfaces": "^3.2.0",
"@libp2p/logger": "^2.0.0",
"abortable-iterator": "^5.0.0",
Expand All @@ -163,7 +163,7 @@
"varint": "^6.0.0"
},
"devDependencies": {
"@libp2p/interface-stream-muxer-compliance-tests": "^7.0.0",
"@libp2p/interface-stream-muxer-compliance-tests": "^7.0.3",
"@types/varint": "^6.0.0",
"aegir": "^39.0.7",
"cborg": "^1.8.1",
Expand Down
266 changes: 42 additions & 224 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal } from 'any-signal'
import { pushable } from 'it-pushable'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface-stream-muxer/stream'
import { Uint8ArrayList } from 'uint8arraylist'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MAX_MSG_SIZE } from './decode.js'
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
import type { Message } from './message-types.js'
import type { MplexStream } from './mplex.js'
import type { StreamTimeline } from '@libp2p/interface-connection'
import type { Source } from 'it-stream-types'

const log = logger('libp2p:mplex:stream')

const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
const ERR_SINK_ENDED = 'ERR_SINK_ENDED'
const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK'

export interface Options {
id: number
Expand All @@ -28,226 +14,58 @@ export interface Options {
maxMsgSize?: number
}

export function createStream (options: Options): MplexStream {
const { id, name, send, onEnd, type = 'initiator', maxMsgSize = MAX_MSG_SIZE } = options
interface MplexStreamInit extends AbstractStreamInit {
streamId: number
name: string
send: (msg: Message) => void
}

const abortController = new AbortController()
const resetController = new AbortController()
const closeController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
const externalId = type === 'initiator' ? (`i${id}`) : `r${id}`
const streamName = `${name == null ? id : name}`
class MplexStream extends AbstractStream {
private readonly name: string
private readonly streamId: number
private readonly send: (msg: Message) => void
private readonly types: Record<string, number>

let sourceEnded = false
let sinkEnded = false
let sinkSunk = false
let endErr: Error | undefined
constructor (init: MplexStreamInit) {
super(init)

const timeline: StreamTimeline = {
open: Date.now()
this.types = init.direction === 'outbound' ? InitiatorMessageTypes : ReceiverMessageTypes
this.send = init.send
this.name = init.name
this.streamId = init.streamId
}

const onSourceEnd = (err?: Error): void => {
if (sourceEnded) {
return
}

sourceEnded = true
log.trace('%s stream %s source end - err: %o', type, streamName, err)

if (err != null && endErr == null) {
endErr = err
}

if (sinkEnded) {
stream.stat.timeline.close = Date.now()

if (onEnd != null) {
onEnd(endErr)
}
}
sendNewStream (): void {
this.send({ id: this.streamId, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(this.name)) })
}

const onSinkEnd = (err?: Error): void => {
if (sinkEnded) {
return
}

sinkEnded = true
log.trace('%s stream %s sink end - err: %o', type, streamName, err)

if (err != null && endErr == null) {
endErr = err
}

if (sourceEnded) {
timeline.close = Date.now()

if (onEnd != null) {
onEnd(endErr)
}
}
sendData (data: Uint8ArrayList): void {
this.send({ id: this.streamId, type: this.types.MESSAGE, data })
}

const streamSource = pushable<Uint8ArrayList>({
onEnd: onSourceEnd
})

const stream: MplexStream = {
// Close for both Reading and Writing
close: () => {
log.trace('%s stream %s close', type, streamName)

stream.closeRead()
stream.closeWrite()
},

// Close for reading
closeRead: () => {
log.trace('%s stream %s closeRead', type, streamName)

if (sourceEnded) {
return
}

streamSource.end()
},

// Close for writing
closeWrite: () => {
log.trace('%s stream %s closeWrite', type, streamName)

if (sinkEnded) {
return
}

closeController.abort()

try {
send({ id, type: Types.CLOSE })
} catch (err) {
log.trace('%s stream %s error sending close', type, name, err)
}

onSinkEnd()
},

// Close for reading and writing (local error)
abort: (err: Error) => {
log.trace('%s stream %s abort', type, streamName, err)
// End the source with the passed error
streamSource.end(err)
abortController.abort()
onSinkEnd(err)
},

// Close immediately for reading and writing (remote error)
reset: () => {
const err = new CodeError('stream reset', ERR_STREAM_RESET)
resetController.abort()
streamSource.end(err)
onSinkEnd(err)
},

sink: async (source: Source<Uint8ArrayList | Uint8Array>) => {
if (sinkSunk) {
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK)
}

sinkSunk = true

if (sinkEnded) {
throw new CodeError('stream closed for writing', ERR_SINK_ENDED)
}

const signal = anySignal([
abortController.signal,
resetController.signal,
closeController.signal
])

try {
source = abortableSource(source, signal)

if (type === 'initiator') { // If initiator, open a new stream
send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) })
}

for await (let data of source) {
while (data.length > 0) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data })
break
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err: any) {
if (err.type === 'aborted' && err.message === 'The operation was aborted') {
if (closeController.signal.aborted) {
return
}

if (resetController.signal.aborted) {
err.message = 'stream reset'
err.code = ERR_STREAM_RESET
}

if (abortController.signal.aborted) {
err.message = 'stream aborted'
err.code = ERR_STREAM_ABORT
}
}

// Send no more data if this stream was remotely reset
if (err.code === ERR_STREAM_RESET) {
log.trace('%s stream %s reset', type, name)
} else {
log.trace('%s stream %s error', type, name, err)
try {
send({ id, type: Types.RESET })
} catch (err) {
log.trace('%s stream %s error sending reset', type, name, err)
}
}

streamSource.end(err)
onSinkEnd(err)
return
} finally {
signal.clear()
}

try {
send({ id, type: Types.CLOSE })
} catch (err) {
log.trace('%s stream %s error sending close', type, name, err)
}

onSinkEnd()
},

source: streamSource,

sourcePush: (data: Uint8ArrayList) => {
streamSource.push(data)
},

sourceReadableLength () {
return streamSource.readableLength
},

stat: {
direction: type === 'initiator' ? 'outbound' : 'inbound',
timeline
},
sendReset (): void {
this.send({ id: this.streamId, type: this.types.RESET })
}

metadata: {},
sendCloseWrite (): void {
this.send({ id: this.streamId, type: this.types.CLOSE })
}

id: externalId
sendCloseRead (): void {
// mplex does not support close read, only close write
}
}

export function createStream (options: Options): MplexStream {
const { id, name, send, onEnd, type = 'initiator', maxMsgSize = MAX_MSG_SIZE } = options

return stream
return new MplexStream({
id: type === 'initiator' ? (`i${id}`) : `r${id}`,
streamId: id,
name: `${name == null ? id : name}`,
direction: type === 'initiator' ? 'outbound' : 'inbound',
maxDataSize: maxMsgSize,
onEnd,
send
})
}
8 changes: 5 additions & 3 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver
}
}),
receiver
)
).catch(() => {})

try {
await pipe(
Expand Down Expand Up @@ -296,7 +296,8 @@ describe('stream', () => {
}
}

await pipe(input, stream)
await expect(pipe(input, stream)).to.eventually.be
.rejected.with.property('message', error.message)

const resetMsg = msgs[msgs.length - 1]

Expand All @@ -321,7 +322,8 @@ describe('stream', () => {
}
}

await pipe(input, stream)
await expect(pipe(input, stream)).to.eventually.be.rejected
.with.property('message', error.message)

const resetMsg = msgs[msgs.length - 1]

Expand Down