Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: remove useage of _readableState/_writableState.highWaterMark #12860

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ process.nextTick(() => {

See also: [`writable.cork()`][].

##### writable.writableHighWaterMark
<!-- YAML
added: REPLACEME
-->

Return the value of `highWaterMark` passed when constructing this
`Writable`.

##### writable.write(chunk[, encoding][, callback])
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -879,6 +887,14 @@ to prevent memory leaks.
never closed until the Node.js process exits, regardless of the specified
options.

##### readable.readableHighWaterMark
<!-- YAML
added: REPLACEME
-->

Return the value of `highWaterMark` passed when constructing this
`Readable`.

##### readable.read([size])
<!-- YAML
added: v0.9.4
Expand Down
6 changes: 3 additions & 3 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,13 @@ function connectionListener(socket) {
function updateOutgoingData(socket, state, delta) {
state.outgoingData += delta;
if (socket._paused &&
state.outgoingData < socket._writableState.highWaterMark) {
state.outgoingData < socket.writeHWM) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting enough, our test suite passed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll get it updated.

return socketOnDrain(socket, state);
}
}

function socketOnDrain(socket, state) {
var needPause = state.outgoingData > socket._writableState.highWaterMark;
var needPause = state.outgoingData > socket.writeHWM;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


// If we previously paused, then start reading again.
if (socket._paused && !needPause) {
Expand Down Expand Up @@ -569,7 +569,7 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
// pipelined requests that may never be resolved.
if (!socket._paused) {
var ws = socket._writableState;
if (ws.needDrain || state.outgoingData >= ws.highWaterMark) {
if (ws.needDrain || state.outgoingData >= socket.writableHighWaterMark) {
socket._paused = true;
// We also need to pause the parser, but don't do that until after
// the call to execute, because we may still be processing the last
Expand Down
23 changes: 18 additions & 5 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ const Writable = require('_stream_writable');

util.inherits(Duplex, Readable);

var keys = Object.keys(Writable.prototype);
for (var v = 0; v < keys.length; v++) {
var method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
{
// avoid scope creep, the keys array can then be collected
const keys = Object.keys(Writable.prototype);
for (var v = 0; v < keys.length; v++) {
const method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}
}

function Duplex(options) {
Expand All @@ -61,6 +64,16 @@ function Duplex(options) {
this.once('end', onend);
}

Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState.highWaterMark;
}
});

// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,
Expand Down
9 changes: 9 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,15 @@ Readable.prototype.wrap = function(stream) {
return self;
};

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._readableState.highWaterMark;
}
});

// exposed for testing purposes only.
Readable._fromList = fromList;
Expand Down
10 changes: 10 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ function decodeChunk(state, chunk, encoding) {
return chunk;
}

Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState.highWaterMark;
}
});

// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
Expand Down
2 changes: 1 addition & 1 deletion lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2072,7 +2072,7 @@ ReadStream.prototype._read = function(n) {

if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool.
allocNewPool(this._readableState.highWaterMark);
allocNewPool(this.readableHighWaterMark);
}

// Grab another reference to the pool in the case that while we're
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http-pipeline-regr-3508.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const server = http.createServer(function(req, res) {
res.end(chunk);
}
size += res.outputSize;
if (size <= req.socket._writableState.highWaterMark) {
if (size <= req.socket.writableHighWaterMark) {
more();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-big-packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ s1.pipe(s3);
s2.pipe(s3, { end: false });

// We must write a buffer larger than highWaterMark
const big = Buffer.alloc(s1._writableState.highWaterMark + 1, 'x');
const big = Buffer.alloc(s1.writableHighWaterMark + 1, 'x');

// Since big is larger than highWaterMark, it will be buffered internally.
assert(!s1.write(big));
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-flow-recursion.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ flow(stream, 5000, function() {
process.on('exit', function(code) {
assert.strictEqual(reads, 2);
// we pushed up the high water mark
assert.strictEqual(stream._readableState.highWaterMark, 8192);
assert.strictEqual(stream.readableHighWaterMark, 8192);
// length is 0 right now, because we pulled it all out.
assert.strictEqual(stream._readableState.length, 0);
assert(!code);
Expand Down
16 changes: 12 additions & 4 deletions test/parallel/test-stream-transform-split-objectmode.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ const parser = new Transform({ readableObjectMode: true });

assert(parser._readableState.objectMode);
assert(!parser._writableState.objectMode);
assert.strictEqual(parser._readableState.highWaterMark, 16);
assert.strictEqual(parser._writableState.highWaterMark, 16 * 1024);
assert.strictEqual(parser.readableHighWaterMark, 16);
assert.strictEqual(parser.writableHighWaterMark, 16 * 1024);
assert.strictEqual(parser.readableHighWaterMark,
parser._readableState.highWaterMark);
assert.strictEqual(parser.writableHighWaterMark,
parser._writableState.highWaterMark);

parser._transform = function(chunk, enc, callback) {
callback(null, { val: chunk[0] });
Expand All @@ -53,8 +57,12 @@ const serializer = new Transform({ writableObjectMode: true });

assert(!serializer._readableState.objectMode);
assert(serializer._writableState.objectMode);
assert.strictEqual(serializer._readableState.highWaterMark, 16 * 1024);
assert.strictEqual(serializer._writableState.highWaterMark, 16);
assert.strictEqual(serializer.readableHighWaterMark, 16 * 1024);
assert.strictEqual(serializer.writableHighWaterMark, 16);
assert.strictEqual(parser.readableHighWaterMark,
parser._readableState.highWaterMark);
assert.strictEqual(parser.writableHighWaterMark,
parser._writableState.highWaterMark);

serializer._transform = function(obj, _, callback) {
callback(null, Buffer.from([obj.val]));
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream2-unpipe-leak.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ console.error(src._readableState);
process.on('exit', function() {
src._readableState.buffer.length = 0;
console.error(src._readableState);
assert(src._readableState.length >= src._readableState.highWaterMark);
assert(src._readableState.length >= src.readableHighWaterMark);
console.log('ok');
});