diff --git a/index.d.ts b/index.d.ts index 65faf63..f68ce8a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,4 +1,8 @@ /// + +// Note: marking anything protected or private in the exported +// class will limit Minipass's ability to be used as the base +// for mixin classes. import { EventEmitter } from 'events' import { Stream } from 'stream' @@ -16,12 +20,6 @@ declare namespace Minipass { pipe(): any } - interface Pipe { - src: Minipass - dest: Writable - opts: PipeOptions - } - type DualIterable = Iterable & AsyncIterable type ContiguousData = Buffer | ArrayBufferLike | ArrayBufferView | string @@ -76,12 +74,6 @@ declare class Minipass< readonly emittedEnd: boolean readonly destroyed: boolean - /** - * Not technically private or readonly, but not safe to mutate. - */ - private readonly buffer: RType[] - private readonly pipes: Minipass.Pipe[] - /** * Technically writable, but mutating it can change the type, * so is not safe to do in TypeScript. diff --git a/index.js b/index.js index e8797aa..d5003ed 100644 --- a/index.js +++ b/index.js @@ -21,6 +21,8 @@ const DECODER = Symbol('decoder') const FLOWING = Symbol('flowing') const PAUSED = Symbol('paused') const RESUME = Symbol('resume') +const BUFFER = Symbol('buffer') +const PIPES = Symbol('pipes') const BUFFERLENGTH = Symbol('bufferLength') const BUFFERPUSH = Symbol('bufferPush') const BUFFERSHIFT = Symbol('bufferShift') @@ -94,8 +96,8 @@ module.exports = class Minipass extends Stream { this[FLOWING] = false // whether we're explicitly paused this[PAUSED] = false - this.pipes = [] - this.buffer = [] + this[PIPES] = [] + this[BUFFER] = [] this[OBJECTMODE] = options && options.objectMode || false if (this[OBJECTMODE]) this[ENCODING] = null @@ -114,6 +116,12 @@ module.exports = class Minipass extends Stream { this.readable = true this[BUFFERLENGTH] = 0 this[DESTROYED] = false + if (options && options.debugExposeBuffer === true) { + Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] }) + } + if (options && options.debugExposePipes === true) { + Object.defineProperty(this, 'pipes', { get: () => this[PIPES] }) + } } get bufferLength () { return this[BUFFERLENGTH] } @@ -129,8 +137,8 @@ module.exports = class Minipass extends Stream { if (this[ENCODING] !== enc) { this[DECODER] = enc ? new SD(enc) : null - if (this.buffer.length) - this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk)) + if (this[BUFFER].length) + this[BUFFER] = this[BUFFER].map(chunk => this[DECODER].write(chunk)) } this[ENCODING] = enc @@ -252,14 +260,14 @@ module.exports = class Minipass extends Stream { if (this[OBJECTMODE]) n = null - if (this.buffer.length > 1 && !this[OBJECTMODE]) { + if (this[BUFFER].length > 1 && !this[OBJECTMODE]) { if (this.encoding) - this.buffer = [this.buffer.join('')] + this[BUFFER] = [this[BUFFER].join('')] else - this.buffer = [Buffer.concat(this.buffer, this[BUFFERLENGTH])] + this[BUFFER] = [Buffer.concat(this[BUFFER], this[BUFFERLENGTH])] } - const ret = this[READ](n || null, this.buffer[0]) + const ret = this[READ](n || null, this[BUFFER][0]) this[MAYBE_EMIT_END]() return ret } @@ -268,14 +276,14 @@ module.exports = class Minipass extends Stream { if (n === chunk.length || n === null) this[BUFFERSHIFT]() else { - this.buffer[0] = chunk.slice(n) + this[BUFFER][0] = chunk.slice(n) chunk = chunk.slice(0, n) this[BUFFERLENGTH] -= n } this.emit('data', chunk) - if (!this.buffer.length && !this[EOF]) + if (!this[BUFFER].length && !this[EOF]) this.emit('drain') return chunk @@ -310,7 +318,7 @@ module.exports = class Minipass extends Stream { this[PAUSED] = false this[FLOWING] = true this.emit('resume') - if (this.buffer.length) + if (this[BUFFER].length) this[FLUSH]() else if (this[EOF]) this[MAYBE_EMIT_END]() @@ -344,23 +352,23 @@ module.exports = class Minipass extends Stream { this[BUFFERLENGTH] += 1 else this[BUFFERLENGTH] += chunk.length - this.buffer.push(chunk) + this[BUFFER].push(chunk) } [BUFFERSHIFT] () { - if (this.buffer.length) { + if (this[BUFFER].length) { if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1 else - this[BUFFERLENGTH] -= this.buffer[0].length + this[BUFFERLENGTH] -= this[BUFFER][0].length } - return this.buffer.shift() + return this[BUFFER].shift() } [FLUSH] (noDrain) { do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]())) - if (!noDrain && !this.buffer.length && !this[EOF]) + if (!noDrain && !this[BUFFER].length && !this[EOF]) this.emit('drain') } @@ -385,7 +393,7 @@ module.exports = class Minipass extends Stream { if (opts.end) dest.end() } else { - this.pipes.push(!opts.proxyErrors ? new Pipe(this, dest, opts) + this[PIPES].push(!opts.proxyErrors ? new Pipe(this, dest, opts) : new PipeProxyErrors(this, dest, opts)) if (this[ASYNC]) defer(() => this[RESUME]()) @@ -397,9 +405,9 @@ module.exports = class Minipass extends Stream { } unpipe (dest) { - const p = this.pipes.find(p => p.dest === dest) + const p = this[PIPES].find(p => p.dest === dest) if (p) { - this.pipes.splice(this.pipes.indexOf(p), 1) + this[PIPES].splice(this[PIPES].indexOf(p), 1) p.unpipe() } } @@ -410,7 +418,7 @@ module.exports = class Minipass extends Stream { on (ev, fn) { const ret = super.on(ev, fn) - if (ev === 'data' && !this.pipes.length && !this.flowing) + if (ev === 'data' && !this[PIPES].length && !this.flowing) this[RESUME]() else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) super.emit('readable') @@ -434,7 +442,7 @@ module.exports = class Minipass extends Stream { if (!this[EMITTING_END] && !this[EMITTED_END] && !this[DESTROYED] && - this.buffer.length === 0 && + this[BUFFER].length === 0 && this[EOF]) { this[EMITTING_END] = true this.emit('end') @@ -486,7 +494,7 @@ module.exports = class Minipass extends Stream { } [EMITDATA] (data) { - for (const p of this.pipes) { + for (const p of this[PIPES]) { if (p.dest.write(data) === false) this.pause() } @@ -511,14 +519,14 @@ module.exports = class Minipass extends Stream { if (this[DECODER]) { const data = this[DECODER].end() if (data) { - for (const p of this.pipes) { + for (const p of this[PIPES]) { p.dest.write(data) } super.emit('data', data) } } - for (const p of this.pipes) { + for (const p of this[PIPES]) { p.end() } const ret = super.emit('end') @@ -625,7 +633,7 @@ module.exports = class Minipass extends Stream { this[DESTROYED] = true // throw away all buffered data, it's never coming out - this.buffer.length = 0 + this[BUFFER].length = 0 this[BUFFERLENGTH] = 0 if (typeof this.close === 'function' && !this[CLOSED]) diff --git a/test/basic.js b/test/basic.js index e8c9627..67620b1 100644 --- a/test/basic.js +++ b/test/basic.js @@ -3,7 +3,7 @@ const t = require('tap') const EE = require('events').EventEmitter t.test('some basic piping and writing', async t => { - let mp = new MiniPass({ encoding: 'base64' }) + let mp = new MiniPass({ encoding: 'base64', debugExposeBuffer: true }) t.notOk(mp.flowing) mp.flowing = true t.notOk(mp.flowing) @@ -13,7 +13,7 @@ t.test('some basic piping and writing', async t => { t.equal(mp.readable, true) t.equal(mp.writable, true) t.equal(mp.write('hello'), false) - let dest = new MiniPass() + let dest = new MiniPass({ debugExposeBuffer: true }) let sawDestData = false dest.once('data', chunk => { sawDestData = true @@ -279,7 +279,7 @@ t.test('emit works with many args', t => { }) t.test('emit drain on resume, even if no flush', t => { - const mp = new MiniPass() + const mp = new MiniPass({ debugExposeBuffer: true }) mp.encoding = 'utf8' const chunks = [] diff --git a/test/unpipe.js b/test/unpipe.js index 5a97c0a..f24ec3d 100644 --- a/test/unpipe.js +++ b/test/unpipe.js @@ -1,7 +1,7 @@ const t = require('tap') const Minipass = require('../') -const src = new Minipass({ encoding: 'utf8' }) +const src = new Minipass({ encoding: 'utf8', debugExposePipes: true }) const dest = new Minipass({ encoding: 'utf8' }) const dest2 = new Minipass({ encoding: 'utf8' }) const destOut = []