diff --git a/package.json b/package.json index c69a7e7..20d2a7e 100644 --- a/package.json +++ b/package.json @@ -148,7 +148,7 @@ }, "dependencies": { "@libp2p/interface-connection": "^5.0.0", - "@libp2p/interface-stream-muxer": "^4.0.0", + "@libp2p/interface-stream-muxer": "^4.1.2", "@libp2p/interfaces": "^3.2.0", "@libp2p/logger": "^2.0.0", "abortable-iterator": "^5.0.0", @@ -163,7 +163,7 @@ "varint": "^6.0.0" }, "devDependencies": { - "@libp2p/interface-stream-muxer-compliance-tests": "^7.0.0", + "@libp2p/interface-stream-muxer-compliance-tests": "^7.0.3", "@types/varint": "^6.0.0", "aegir": "^39.0.7", "cborg": "^1.8.1", diff --git a/src/stream.ts b/src/stream.ts index ef03f16..14d705e 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,23 +1,9 @@ -import { CodeError } from '@libp2p/interfaces/errors' -import { logger } from '@libp2p/logger' -import { abortableSource } from 'abortable-iterator' -import { anySignal } from 'any-signal' -import { pushable } from 'it-pushable' +import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface-stream-muxer/stream' import { Uint8ArrayList } from 'uint8arraylist' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { MAX_MSG_SIZE } from './decode.js' import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js' import type { Message } from './message-types.js' -import type { MplexStream } from './mplex.js' -import type { StreamTimeline } from '@libp2p/interface-connection' -import type { Source } from 'it-stream-types' - -const log = logger('libp2p:mplex:stream') - -const ERR_STREAM_RESET = 'ERR_STREAM_RESET' -const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT' -const ERR_SINK_ENDED = 'ERR_SINK_ENDED' -const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK' export interface Options { id: number @@ -28,226 +14,58 @@ export interface Options { maxMsgSize?: number } -export function createStream (options: Options): MplexStream { - const { id, name, send, onEnd, type = 'initiator', maxMsgSize = MAX_MSG_SIZE } = options +interface MplexStreamInit extends AbstractStreamInit { + streamId: number + name: string + send: (msg: Message) => void +} - const abortController = new AbortController() - const resetController = new AbortController() - const closeController = new AbortController() - const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes - const externalId = type === 'initiator' ? (`i${id}`) : `r${id}` - const streamName = `${name == null ? id : name}` +class MplexStream extends AbstractStream { + private readonly name: string + private readonly streamId: number + private readonly send: (msg: Message) => void + private readonly types: Record - let sourceEnded = false - let sinkEnded = false - let sinkSunk = false - let endErr: Error | undefined + constructor (init: MplexStreamInit) { + super(init) - const timeline: StreamTimeline = { - open: Date.now() + this.types = init.direction === 'outbound' ? InitiatorMessageTypes : ReceiverMessageTypes + this.send = init.send + this.name = init.name + this.streamId = init.streamId } - const onSourceEnd = (err?: Error): void => { - if (sourceEnded) { - return - } - - sourceEnded = true - log.trace('%s stream %s source end - err: %o', type, streamName, err) - - if (err != null && endErr == null) { - endErr = err - } - - if (sinkEnded) { - stream.stat.timeline.close = Date.now() - - if (onEnd != null) { - onEnd(endErr) - } - } + sendNewStream (): void { + this.send({ id: this.streamId, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(this.name)) }) } - const onSinkEnd = (err?: Error): void => { - if (sinkEnded) { - return - } - - sinkEnded = true - log.trace('%s stream %s sink end - err: %o', type, streamName, err) - - if (err != null && endErr == null) { - endErr = err - } - - if (sourceEnded) { - timeline.close = Date.now() - - if (onEnd != null) { - onEnd(endErr) - } - } + sendData (data: Uint8ArrayList): void { + this.send({ id: this.streamId, type: this.types.MESSAGE, data }) } - const streamSource = pushable({ - onEnd: onSourceEnd - }) - - const stream: MplexStream = { - // Close for both Reading and Writing - close: () => { - log.trace('%s stream %s close', type, streamName) - - stream.closeRead() - stream.closeWrite() - }, - - // Close for reading - closeRead: () => { - log.trace('%s stream %s closeRead', type, streamName) - - if (sourceEnded) { - return - } - - streamSource.end() - }, - - // Close for writing - closeWrite: () => { - log.trace('%s stream %s closeWrite', type, streamName) - - if (sinkEnded) { - return - } - - closeController.abort() - - try { - send({ id, type: Types.CLOSE }) - } catch (err) { - log.trace('%s stream %s error sending close', type, name, err) - } - - onSinkEnd() - }, - - // Close for reading and writing (local error) - abort: (err: Error) => { - log.trace('%s stream %s abort', type, streamName, err) - // End the source with the passed error - streamSource.end(err) - abortController.abort() - onSinkEnd(err) - }, - - // Close immediately for reading and writing (remote error) - reset: () => { - const err = new CodeError('stream reset', ERR_STREAM_RESET) - resetController.abort() - streamSource.end(err) - onSinkEnd(err) - }, - - sink: async (source: Source) => { - if (sinkSunk) { - throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK) - } - - sinkSunk = true - - if (sinkEnded) { - throw new CodeError('stream closed for writing', ERR_SINK_ENDED) - } - - const signal = anySignal([ - abortController.signal, - resetController.signal, - closeController.signal - ]) - - try { - source = abortableSource(source, signal) - - if (type === 'initiator') { // If initiator, open a new stream - send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) }) - } - - for await (let data of source) { - while (data.length > 0) { - if (data.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data }) - break - } - data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data - send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) }) - data.consume(maxMsgSize) - } - } - } catch (err: any) { - if (err.type === 'aborted' && err.message === 'The operation was aborted') { - if (closeController.signal.aborted) { - return - } - - if (resetController.signal.aborted) { - err.message = 'stream reset' - err.code = ERR_STREAM_RESET - } - - if (abortController.signal.aborted) { - err.message = 'stream aborted' - err.code = ERR_STREAM_ABORT - } - } - - // Send no more data if this stream was remotely reset - if (err.code === ERR_STREAM_RESET) { - log.trace('%s stream %s reset', type, name) - } else { - log.trace('%s stream %s error', type, name, err) - try { - send({ id, type: Types.RESET }) - } catch (err) { - log.trace('%s stream %s error sending reset', type, name, err) - } - } - - streamSource.end(err) - onSinkEnd(err) - return - } finally { - signal.clear() - } - - try { - send({ id, type: Types.CLOSE }) - } catch (err) { - log.trace('%s stream %s error sending close', type, name, err) - } - - onSinkEnd() - }, - - source: streamSource, - - sourcePush: (data: Uint8ArrayList) => { - streamSource.push(data) - }, - - sourceReadableLength () { - return streamSource.readableLength - }, - - stat: { - direction: type === 'initiator' ? 'outbound' : 'inbound', - timeline - }, + sendReset (): void { + this.send({ id: this.streamId, type: this.types.RESET }) + } - metadata: {}, + sendCloseWrite (): void { + this.send({ id: this.streamId, type: this.types.CLOSE }) + } - id: externalId + sendCloseRead (): void { + // mplex does not support close read, only close write } +} + +export function createStream (options: Options): MplexStream { + const { id, name, send, onEnd, type = 'initiator', maxMsgSize = MAX_MSG_SIZE } = options - return stream + return new MplexStream({ + id: type === 'initiator' ? (`i${id}`) : `r${id}`, + streamId: id, + name: `${name == null ? id : name}`, + direction: type === 'initiator' ? 'outbound' : 'inbound', + maxDataSize: maxMsgSize, + onEnd, + send + }) } diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 43ddd5f..ee6e97e 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -113,7 +113,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver } }), receiver - ) + ).catch(() => {}) try { await pipe( @@ -296,7 +296,8 @@ describe('stream', () => { } } - await pipe(input, stream) + await expect(pipe(input, stream)).to.eventually.be + .rejected.with.property('message', error.message) const resetMsg = msgs[msgs.length - 1] @@ -321,7 +322,8 @@ describe('stream', () => { } } - await pipe(input, stream) + await expect(pipe(input, stream)).to.eventually.be.rejected + .with.property('message', error.message) const resetMsg = msgs[msgs.length - 1]