Skip to content

Commit

Permalink
src: refactor stream callbacks and ownership
Browse files Browse the repository at this point in the history
Instead of setting individual callbacks on streams and tracking
stream ownership through a boolean `consume_` flag, always have
one specific listener object in charge of a stream, and call
methods on that object rather than generic C-style callbacks.

PR-URL: nodejs#18334
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
addaleax authored and MayaLekova committed May 8, 2018
1 parent 84db680 commit 85d0282
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 474 deletions.
2 changes: 1 addition & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ function onSocketPause() {
function unconsume(parser, socket) {
if (socket._handle) {
if (parser._consumed)
parser.unconsume(socket._handle._externalStream);
parser.unconsume();
parser._consumed = false;
socket.removeListener('pause', onSocketPause);
socket.removeListener('resume', onSocketResume);
Expand Down
1 change: 1 addition & 0 deletions src/connection_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "connect_wrap.h"
#include "env-inl.h"
#include "pipe_wrap.h"
#include "stream_base-inl.h"
#include "stream_wrap.h"
#include "tcp_wrap.h"
#include "util-inl.h"
Expand Down
55 changes: 7 additions & 48 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,55 +25,13 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
StreamBase(env) {
node::Wrap(obj, this);
MakeWeak<JSStream>(this);

set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
}


JSStream::~JSStream() {
}


void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
buf->base = Malloc(size);
buf->len = size;
}


void JSStream::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
JSStream* wrap = static_cast<JSStream*>(ctx);
CHECK_NE(wrap, nullptr);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

if (nread < 0) {
if (buf != nullptr && buf->base != nullptr)
free(buf->base);
wrap->EmitData(nread, Local<Object>(), Local<Object>());
return;
}

if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
}

CHECK_LE(static_cast<size_t>(nread), buf->len);
char* base = node::Realloc(buf->base, nread);

CHECK_EQ(pending, UV_UNKNOWN_HANDLE);

Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
wrap->EmitData(nread, obj, Local<Object>());
}


AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
Expand Down Expand Up @@ -212,26 +170,27 @@ void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
char* data = Buffer::Data(args[0]);
int len = Buffer::Length(args[0]);

do {
uv_buf_t buf;
// Repeatedly ask the stream's owner for memory, copy the data that we
// just read from JS into those buffers and emit them as reads.
while (len != 0) {
uv_buf_t buf = wrap->EmitAlloc(len);
ssize_t avail = len;
wrap->EmitAlloc(len, &buf);
if (static_cast<ssize_t>(buf.len) < avail)
avail = buf.len;

memcpy(buf.base, data, avail);
data += avail;
len -= avail;
wrap->EmitRead(avail, &buf);
} while (len != 0);
wrap->EmitRead(avail, buf);
}
}


void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
JSStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

wrap->EmitRead(UV_EOF, nullptr);
wrap->EmitRead(UV_EOF);
}


Expand Down
Loading

0 comments on commit 85d0282

Please sign in to comment.