This repository has been archived by the owner on Jul 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
stream.js
124 lines (111 loc) · 4 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
'use strict'
const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
const log = require('debug')('libp2p:mplex:stream')
const pushable = require('it-pushable')
const BufferList = require('bl/BufferList')
const errCode = require('err-code')
const { MAX_MSG_SIZE } = require('./restrict-size')
const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types')
const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET'
const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
/**
* @param {object} options
* @param {number} options.id
* @param {string} options.name
* @param {function(*)} options.send - Called to send data through the stream
* @param {function(Error)} [options.onEnd] - Called whenever the stream ends
* @param {string} [options.type] - One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] - Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @returns {*} A muxed stream
*/
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => {
const abortController = new AbortController()
const resetController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
const externalId = type === 'initiator' ? (`i${id}`) : `r${id}`
name = String(name == null ? id : name)
let sourceEnded = false
let sinkEnded = false
let endErr
const onSourceEnd = err => {
if (sourceEnded) return
sourceEnded = true
log('%s stream %s source end', type, name, err)
if (err && !endErr) endErr = err
if (sinkEnded) {
stream.timeline.close = Date.now()
onEnd(endErr)
}
}
const onSinkEnd = err => {
if (sinkEnded) return
sinkEnded = true
log('%s stream %s sink end', type, name, err)
if (err && !endErr) endErr = err
if (sourceEnded) {
stream.timeline.close = Date.now()
onEnd(endErr)
}
}
const stream = {
// Close for reading
close: () => stream.source.end(),
// Close for reading and writing (local error)
abort: err => {
log('%s stream %s abort', type, name, err)
// End the source with the passed error
stream.source.end(err)
abortController.abort()
onSinkEnd(err)
},
// Close immediately for reading and writing (remote error)
reset: () => {
const err = errCode(new Error('stream reset'), ERR_MPLEX_STREAM_RESET)
resetController.abort()
stream.source.end(err)
onSinkEnd(err)
},
sink: async source => {
source = abortable(source, [
{ signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: ERR_MPLEX_STREAM_ABORT } },
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } }
])
if (type === 'initiator') { // If initiator, open a new stream
send({ id, type: Types.NEW_STREAM, data: name })
}
try {
for await (let data of source) {
while (data.length) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data })
break
}
data = BufferList.isBufferList(data) ? data : new BufferList(data)
send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err) {
// Send no more data if this stream was remotely reset
if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
send({ id, type: Types.RESET })
}
stream.source.end(err)
return onSinkEnd(err)
}
send({ id, type: Types.CLOSE })
onSinkEnd()
},
source: pushable(onSourceEnd),
timeline: {
open: Date.now(),
close: null
},
id: externalId
}
return stream
}