diff --git a/benchmark/net/tcp-raw-c2s.js b/benchmark/net/tcp-raw-c2s.js index 2be3bb3b538ffd..4b6dd9c3f2b145 100644 --- a/benchmark/net/tcp-raw-c2s.js +++ b/benchmark/net/tcp-raw-c2s.js @@ -118,7 +118,7 @@ function client(type, len) { fail(err, 'write'); } - function afterWrite(err, handle, req) { + function afterWrite(err, handle) { if (err) fail(err, 'write'); diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js index 2fc03f08cd4a90..dfde3d40b50b55 100644 --- a/benchmark/net/tcp-raw-pipe.js +++ b/benchmark/net/tcp-raw-pipe.js @@ -51,7 +51,7 @@ function main({ dur, len, type }) { if (err) fail(err, 'write'); - writeReq.oncomplete = function(status, handle, req, err) { + writeReq.oncomplete = function(status, handle, err) { if (err) fail(err, 'write'); }; @@ -130,7 +130,7 @@ function main({ dur, len, type }) { fail(err, 'write'); } - function afterWrite(err, handle, req) { + function afterWrite(err, handle) { if (err) fail(err, 'write'); diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js index 339f5e393d9254..fe0bffd8127ec7 100644 --- a/benchmark/net/tcp-raw-s2c.js +++ b/benchmark/net/tcp-raw-s2c.js @@ -74,14 +74,14 @@ function main({ dur, len, type }) { fail(err, 'write'); } else if (!writeReq.async) { process.nextTick(function() { - afterWrite(null, clientHandle, writeReq); + afterWrite(0, clientHandle); }); } } - function afterWrite(status, handle, req, err) { - if (err) - fail(err, 'write'); + function afterWrite(status, handle) { + if (status) + fail(status, 'write'); while (clientHandle.writeQueueSize === 0) write(); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index f3f953e35d67d5..9247292ff47fce 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1396,20 +1396,19 @@ function trackWriteState(stream, bytes) { session[kHandle].chunksSentSinceLastWrite = 0; } -function afterDoStreamWrite(status, handle, req) { +function afterDoStreamWrite(status, handle) { const stream = handle[kOwner]; const session = stream[kSession]; stream[kUpdateTimer](); - const { bytes } = req; + const { bytes } = this; stream[kState].writeQueueSize -= bytes; if (session !== undefined) session[kState].writeQueueSize -= bytes; - if (typeof req.callback === 'function') - req.callback(null); - req.handle = undefined; + if (typeof this.callback === 'function') + this.callback(null); } function streamOnResume() { diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js index aed26ed8c6976a..feacab267bea4d 100644 --- a/lib/internal/wrap_js_stream.js +++ b/lib/internal/wrap_js_stream.js @@ -115,9 +115,9 @@ class JSStreamWrap extends Socket { const handle = this._handle; - this.stream.end(() => { - // Ensure that write was dispatched - setImmediate(() => { + setImmediate(() => { + // Ensure that write is dispatched asynchronously. + this.stream.end(() => { this.finishShutdown(handle, 0); }); }); diff --git a/lib/net.js b/lib/net.js index 286f4d46b74fdf..09ad917ad0b7f6 100644 --- a/lib/net.js +++ b/lib/net.js @@ -323,7 +323,7 @@ function onSocketFinish() { } -function afterShutdown(status, handle, req) { +function afterShutdown(status, handle) { var self = handle.owner; debug('afterShutdown destroyed=%j', self.destroyed, @@ -842,12 +842,12 @@ protoGetter('bytesWritten', function bytesWritten() { }); -function afterWrite(status, handle, req, err) { +function afterWrite(status, handle, err) { var self = handle.owner; if (self !== process.stderr && self !== process.stdout) debug('afterWrite', status); - if (req.async) + if (this.async) self[kLastWriteQueueSize] = 0; // callback may come after call to destroy. @@ -857,9 +857,9 @@ function afterWrite(status, handle, req, err) { } if (status < 0) { - var ex = errnoException(status, 'write', req.error); + var ex = errnoException(status, 'write', this.error); debug('write failure', ex); - self.destroy(ex, req.cb); + self.destroy(ex, this.cb); return; } @@ -868,8 +868,8 @@ function afterWrite(status, handle, req, err) { if (self !== process.stderr && self !== process.stdout) debug('afterWrite call cb'); - if (req.cb) - req.cb.call(undefined); + if (this.cb) + this.cb.call(undefined); } diff --git a/src/env.h b/src/env.h index 8540b439b67333..be8791b5344471 100644 --- a/src/env.h +++ b/src/env.h @@ -303,6 +303,7 @@ class ModuleWrap; V(script_context_constructor_template, v8::FunctionTemplate) \ V(script_data_constructor_function, v8::Function) \ V(secure_context_constructor_template, v8::FunctionTemplate) \ + V(shutdown_wrap_constructor_function, v8::Function) \ V(tcp_constructor_template, v8::FunctionTemplate) \ V(tick_callback_function, v8::Function) \ V(tls_wrap_constructor_function, v8::Function) \ diff --git a/src/js_stream.cc b/src/js_stream.cc index 9e67a2094ded89..3ba6a254cfc03e 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -91,8 +91,6 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) { req_wrap->object() }; - req_wrap->Dispatched(); - TryCatch try_catch(env()->isolate()); Local value; int value_int = UV_EPROTO; @@ -127,8 +125,6 @@ int JSStream::DoWrite(WriteWrap* w, bufs_arr }; - w->Dispatched(); - TryCatch try_catch(env()->isolate()); Local value; int value_int = UV_EPROTO; @@ -154,9 +150,8 @@ void JSStream::New(const FunctionCallbackInfo& args) { template void JSStream::Finish(const FunctionCallbackInfo& args) { - Wrap* w; CHECK(args[0]->IsObject()); - ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As()); + Wrap* w = static_cast(StreamReq::FromObject(args[0].As())); w->Done(args[1]->Int32Value()); } diff --git a/src/node_http2.cc b/src/node_http2.cc index 2f688a4b352a15..6f59c119e53a6b 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1552,18 +1552,9 @@ void Http2Session::SendPendingData() { chunks_sent_since_last_write_++; - // DoTryWrite may modify both the buffer list start itself and the - // base pointers/length of the individual buffers. - uv_buf_t* writebufs = *bufs; - if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) { - // All writes finished synchronously, nothing more to do here. - ClearOutgoing(0); - return; - } - - WriteWrap* req = AllocateSend(); - if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) { - req->Dispose(); + StreamWriteResult res = underlying_stream()->Write(*bufs, count); + if (!res.async) { + ClearOutgoing(res.err); } DEBUG_HTTP2SESSION2(this, "wants data in return? %d", @@ -1649,15 +1640,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) { chunks_sent_since_last_write_ = n; } -// Allocates the data buffer used to pass outbound data to the i/o stream. -WriteWrap* Http2Session::AllocateSend() { - HandleScope scope(env()->isolate()); - Local obj = - env()->write_wrap_constructor_function() - ->NewInstance(env()->context()).ToLocalChecked(); - return WriteWrap::New(env(), obj, static_cast(stream_)); -} - // Callback used to receive inbound data from the i/o stream void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { Http2Scope h2scope(this); @@ -1833,20 +1815,15 @@ inline void Http2Stream::Close(int32_t code) { DEBUG_HTTP2STREAM2(this, "closed with code %d", code); } - -inline void Http2Stream::Shutdown() { - CHECK(!this->IsDestroyed()); - Http2Scope h2scope(this); - flags_ |= NGHTTP2_STREAM_FLAG_SHUT; - CHECK_NE(nghttp2_session_resume_data(session_->session(), id_), - NGHTTP2_ERR_NOMEM); - DEBUG_HTTP2STREAM(this, "writable side shutdown"); -} - int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { CHECK(!this->IsDestroyed()); - req_wrap->Dispatched(); - Shutdown(); + { + Http2Scope h2scope(this); + flags_ |= NGHTTP2_STREAM_FLAG_SHUT; + CHECK_NE(nghttp2_session_resume_data(session_->session(), id_), + NGHTTP2_ERR_NOMEM); + DEBUG_HTTP2STREAM(this, "writable side shutdown"); + } req_wrap->Done(0); return 0; } @@ -2038,7 +2015,6 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap, CHECK_EQ(send_handle, nullptr); Http2Scope h2scope(this); session_->SetChunksSinceLastWrite(); - req_wrap->Dispatched(); if (!IsWritable()) { req_wrap->Done(UV_EOF); return 0; diff --git a/src/node_http2.h b/src/node_http2.h index b22539f5119919..217c19c09287af 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -605,9 +605,6 @@ class Http2Stream : public AsyncWrap, inline void Close(int32_t code); - // Shutdown the writable side of the stream - inline void Shutdown(); - // Destroy this stream instance and free all held memory. inline void Destroy(); @@ -822,6 +819,10 @@ class Http2Session : public AsyncWrap, public StreamListener { inline void EmitStatistics(); + inline StreamBase* underlying_stream() { + return static_cast(stream_); + } + void Start(); void Stop(); void Close(uint32_t code = NGHTTP2_NO_ERROR, @@ -911,8 +912,6 @@ class Http2Session : public AsyncWrap, public StreamListener { template static void GetSettings(const FunctionCallbackInfo& args); - WriteWrap* AllocateSend(); - uv_loop_t* event_loop() const { return env()->event_loop(); } diff --git a/src/req_wrap-inl.h b/src/req_wrap-inl.h index 6d0c57cd81d367..4a7984e649c733 100644 --- a/src/req_wrap-inl.h +++ b/src/req_wrap-inl.h @@ -33,6 +33,11 @@ void ReqWrap::Dispatched() { req_.data = this; } +template +ReqWrap* ReqWrap::from_req(T* req) { + return ContainerOf(&ReqWrap::req_, req); +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/req_wrap.h b/src/req_wrap.h index ddd0840aad2ab6..656be38dcea943 100644 --- a/src/req_wrap.h +++ b/src/req_wrap.h @@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap { inline void Dispatched(); // Call this after the req has been dispatched. T* req() { return &req_; } + static ReqWrap* from_req(T* req); + private: friend class Environment; friend int GenDebugSymbols(); diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 76922c1d8af77d..b479e04bae4c8a 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -25,6 +25,25 @@ using v8::Value; using AsyncHooks = Environment::AsyncHooks; +inline void StreamReq::AttachToObject(v8::Local req_wrap_obj) { + CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField), + nullptr); + req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this); +} + +inline StreamReq* StreamReq::FromObject(v8::Local req_wrap_obj) { + return static_cast( + req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField)); +} + +inline void StreamReq::Dispose() { + object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr); + delete this; +} + +inline v8::Local StreamReq::object() { + return GetAsyncWrap()->object(); +} inline StreamListener::~StreamListener() { if (stream_ != nullptr) @@ -36,6 +55,15 @@ inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); } +inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamAfterShutdown(w, status); +} + +inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamAfterWrite(w, status); +} inline StreamResource::~StreamResource() { while (listener_ != nullptr) { @@ -93,6 +121,9 @@ inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { listener_->OnStreamAfterWrite(w, status); } +inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { + listener_->OnStreamAfterShutdown(w, status); +} inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); @@ -102,6 +133,150 @@ inline Environment* StreamBase::stream_env() const { return env_; } +inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { + AfterRequest(req_wrap, [&]() { + EmitAfterWrite(req_wrap, status); + }); +} + +inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { + AfterRequest(req_wrap, [&]() { + EmitAfterShutdown(req_wrap, status); + }); +} + +template +inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) { + Environment* env = stream_env(); + + v8::HandleScope handle_scope(env->isolate()); + v8::Context::Scope context_scope(env->context()); + + emit(); + req_wrap->Dispose(); +} + +inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { + Environment* env = stream_env(); + if (req_wrap_obj.IsEmpty()) { + req_wrap_obj = + env->shutdown_wrap_constructor_function() + ->NewInstance(env->context()).ToLocalChecked(); + } + + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope( + env, GetAsyncWrap()->get_async_id()); + ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); + int err = DoShutdown(req_wrap); + + if (err != 0) { + req_wrap->Dispose(); + } + + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + + return err; +} + +inline StreamWriteResult StreamBase::Write( + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + v8::Local req_wrap_obj) { + Environment* env = stream_env(); + int err; + if (send_handle == nullptr) { + err = DoTryWrite(&bufs, &count); + if (err != 0 || count == 0) { + return StreamWriteResult { false, err, nullptr }; + } + } + + if (req_wrap_obj.IsEmpty()) { + req_wrap_obj = + env->write_wrap_constructor_function() + ->NewInstance(env->context()).ToLocalChecked(); + } + + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope( + env, GetAsyncWrap()->get_async_id()); + WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); + + err = DoWrite(req_wrap, bufs, count, send_handle); + bool async = err == 0; + + if (!async) { + req_wrap->Dispose(); + req_wrap = nullptr; + } + + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + + req_wrap_obj->Set(env->async(), v8::Boolean::New(env->isolate(), async)); + + return StreamWriteResult { async, err, req_wrap }; +} + +template +SimpleShutdownWrap::SimpleShutdownWrap( + StreamBase* stream, + v8::Local req_wrap_obj) + : ShutdownWrap(stream, req_wrap_obj), + OtherBase(stream->stream_env(), + req_wrap_obj, + AsyncWrap::PROVIDER_SHUTDOWNWRAP) { + Wrap(req_wrap_obj, static_cast(this)); +} + +template +SimpleShutdownWrap::~SimpleShutdownWrap() { + ClearWrap(static_cast(this)->object()); + if (kResetPersistent) { + auto& persistent = static_cast(this)->persistent(); + CHECK_EQ(persistent.IsEmpty(), false); + persistent.Reset(); + } +} + +inline ShutdownWrap* StreamBase::CreateShutdownWrap( + v8::Local object) { + return new SimpleShutdownWrap(this, object); +} + +template +SimpleWriteWrap::SimpleWriteWrap( + StreamBase* stream, + v8::Local req_wrap_obj) + : WriteWrap(stream, req_wrap_obj), + OtherBase(stream->stream_env(), + req_wrap_obj, + AsyncWrap::PROVIDER_WRITEWRAP) { + Wrap(req_wrap_obj, static_cast(this)); +} + +template +SimpleWriteWrap::~SimpleWriteWrap() { + ClearWrap(static_cast(this)->object()); + if (kResetPersistent) { + auto& persistent = static_cast(this)->persistent(); + CHECK_EQ(persistent.IsEmpty(), false); + persistent.Reset(); + } +} + +inline WriteWrap* StreamBase::CreateWriteWrap( + v8::Local object) { + return new SimpleWriteWrap(this, object); +} + template void StreamBase::AddMethods(Environment* env, Local t, @@ -230,38 +405,35 @@ inline void ShutdownWrap::OnDone(int status) { stream()->AfterShutdown(this, status); } - -WriteWrap* WriteWrap::New(Environment* env, - Local obj, - StreamBase* wrap, - size_t extra) { - size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra; - char* storage = new char[storage_size]; - - return new(storage) WriteWrap(env, obj, wrap, storage_size); +inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) { + CHECK_EQ(storage_, nullptr); + storage_ = data; + storage_size_ = size; } - -void WriteWrap::Dispose() { - this->~WriteWrap(); - delete[] reinterpret_cast(this); -} - - -char* WriteWrap::Extra(size_t offset) { - return reinterpret_cast(this) + - ROUND_UP(sizeof(*this), kAlignSize) + - offset; +inline char* WriteWrap::Storage() { + return storage_; } -size_t WriteWrap::ExtraSize() const { - return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize); +inline size_t WriteWrap::StorageSize() const { + return storage_size_; } inline void WriteWrap::OnDone(int status) { stream()->AfterWrite(this, status); } +inline void StreamReq::Done(int status, const char* error_str) { + AsyncWrap* async_wrap = GetAsyncWrap(); + Environment* env = async_wrap->env(); + if (error_str != nullptr) { + async_wrap->object()->Set(env->error_string(), + OneByteString(env->isolate(), error_str)); + } + + OnDone(status); +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/stream_base.cc b/src/stream_base.cc index 8bdcebe88ab19f..9ad9fd5bcb4a46 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -34,6 +34,11 @@ template int StreamBase::WriteString( const FunctionCallbackInfo& args); +struct Free { + void operator()(char* ptr) const { free(ptr); } +}; + + int StreamBase::ReadStartJS(const FunctionCallbackInfo& args) { return ReadStart(); } @@ -45,45 +50,10 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo& args) { int StreamBase::Shutdown(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - CHECK(args[0]->IsObject()); Local req_wrap_obj = args[0].As(); - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id()); - ShutdownWrap* req_wrap = new ShutdownWrap(env, - req_wrap_obj, - this); - - int err = DoShutdown(req_wrap); - if (err) - delete req_wrap; - return err; -} - - -void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - Environment* env = req_wrap->env(); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local req_wrap_obj = req_wrap->object(); - Local argv[3] = { - Integer::New(env->isolate(), status), - GetObject(), - req_wrap_obj - }; - - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) - req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); - - delete req_wrap; + return Shutdown(req_wrap_obj); } @@ -104,19 +74,14 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { count = chunks->Length() >> 1; MaybeStackBuffer bufs(count); - uv_buf_t* buf_list = *bufs; size_t storage_size = 0; uint32_t bytes = 0; size_t offset; - WriteWrap* req_wrap; - int err; if (!all_buffers) { // Determine storage size first for (size_t i = 0; i < count; i++) { - storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize); - Local chunk = chunks->Get(i * 2); if (Buffer::HasInstance(chunk)) @@ -145,20 +110,11 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { bufs[i].len = Buffer::Length(chunk); bytes += bufs[i].len; } - - // Try writing immediately without allocation - err = DoTryWrite(&buf_list, &count); - if (err != 0 || count == 0) - goto done; } - { - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, - wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); - } + std::unique_ptr storage; + if (storage_size > 0) + storage = std::unique_ptr(Malloc(storage_size)); offset = 0; if (!all_buffers) { @@ -174,9 +130,8 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { } // Write string - offset = ROUND_UP(offset, WriteWrap::kAlignSize); CHECK_LE(offset, storage_size); - char* str_storage = req_wrap->Extra(offset); + char* str_storage = storage.get() + offset; size_t str_size = storage_size - offset; Local string = chunk->ToString(env->context()).ToLocalChecked(); @@ -192,35 +147,17 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { offset += str_size; bytes += str_size; } - - err = DoTryWrite(&buf_list, &count); - if (err != 0 || count == 0) { - req_wrap->Dispatched(); - req_wrap->Dispose(); - goto done; - } } - err = DoWrite(req_wrap, buf_list, count, nullptr); - req_wrap_obj->Set(env->async(), True(env->isolate())); - - if (err) - req_wrap->Dispose(); - - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } + StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes)); - - return err; + if (res.wrap != nullptr && storage) { + res.wrap->SetAllocatedStorage(storage.release(), storage_size); + } + return res.err; } - - int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); @@ -232,49 +169,20 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { } Local req_wrap_obj = args[0].As(); - const char* data = Buffer::Data(args[1]); - size_t length = Buffer::Length(args[1]); - WriteWrap* req_wrap; uv_buf_t buf; - buf.base = const_cast(data); - buf.len = length; - - // Try writing immediately without allocation - uv_buf_t* bufs = &buf; - size_t count = 1; - int err = DoTryWrite(&bufs, &count); - if (err != 0) - goto done; - if (count == 0) - goto done; - CHECK_EQ(count, 1); - - // Allocate, or write rest - { - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, - wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this); - } + buf.base = Buffer::Data(args[1]); + buf.len = Buffer::Length(args[1]); - err = DoWrite(req_wrap, bufs, count, nullptr); - req_wrap_obj->Set(env->async(), True(env->isolate())); - req_wrap_obj->Set(env->buffer_string(), args[1]); + StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj); - if (err) - req_wrap->Dispose(); + if (res.async) + req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust(); + req_wrap_obj->Set(env->context(), env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), buf.len)) + .FromJust(); - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), length)); - return err; + return res.err; } @@ -305,8 +213,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { return UV_ENOBUFS; // Try writing immediately if write size isn't too big - WriteWrap* req_wrap; - char* data; char stack_storage[16384]; // 16kb size_t data_size; uv_buf_t buf; @@ -325,36 +231,33 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { size_t count = 1; err = DoTryWrite(&bufs, &count); - // Failure - if (err != 0) - goto done; - - // Success - if (count == 0) - goto done; + // Immediate failure or success + if (err != 0 || count == 0) { + req_wrap_obj->Set(env->context(), env->async(), False(env->isolate())) + .FromJust(); + req_wrap_obj->Set(env->context(), + env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)) + .FromJust(); + return err; + } // Partial write CHECK_EQ(count, 1); } - { - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, - wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); - } - - data = req_wrap->Extra(); + std::unique_ptr data; if (try_write) { // Copy partial data - memcpy(data, buf.base, buf.len); + data = std::unique_ptr(Malloc(buf.len)); + memcpy(data.get(), buf.base, buf.len); data_size = buf.len; } else { // Write it + data = std::unique_ptr(Malloc(storage_size)); data_size = StringBytes::Write(env->isolate(), - data, + data.get(), storage_size, string, enc); @@ -362,78 +265,36 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { CHECK_LE(data_size, storage_size); - buf = uv_buf_init(data, data_size); - - if (!IsIPCPipe()) { - err = DoWrite(req_wrap, &buf, 1, nullptr); - } else { - uv_handle_t* send_handle = nullptr; - - if (!send_handle_obj.IsEmpty()) { - HandleWrap* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); - send_handle = wrap->GetHandle(); - // Reference LibuvStreamWrap instance to prevent it from being garbage - // collected before `AfterWrite` is called. - CHECK_EQ(false, req_wrap->persistent().IsEmpty()); - req_wrap_obj->Set(env->handle_string(), send_handle_obj); - } - - err = DoWrite( - req_wrap, - &buf, - 1, - reinterpret_cast(send_handle)); + buf = uv_buf_init(data.get(), data_size); + + uv_stream_t* send_handle = nullptr; + + if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { + // TODO(addaleax): This relies on the fact that HandleWrap comes first + // as a superclass of each individual subclass. + // There are similar assumptions in other places in the code base. + // A better idea would be having all BaseObject's internal pointers + // refer to the BaseObject* itself; this would require refactoring + // throughout the code base but makes Node rely much less on C++ quirks. + HandleWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); + send_handle = reinterpret_cast(wrap->GetHandle()); + // Reference LibuvStreamWrap instance to prevent it from being garbage + // collected before `AfterWrite` is called. + req_wrap_obj->Set(env->handle_string(), send_handle_obj); } - req_wrap_obj->Set(env->async(), True(env->isolate())); + StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); - if (err) - req_wrap->Dispose(); + req_wrap_obj->Set(env->context(), env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)) + .FromJust(); - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); + if (res.wrap != nullptr) { + res.wrap->SetAllocatedStorage(data.release(), data_size); } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), data_size)); - return err; -} - - -void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - Environment* env = req_wrap->env(); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - - // Unref handle property - Local req_wrap_obj = req_wrap->object(); - req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust(); - EmitAfterWrite(req_wrap, status); - - Local argv[] = { - Integer::New(env->isolate(), status), - GetObject(), - req_wrap_obj, - Undefined(env->isolate()) - }; - - const char* msg = Error(); - if (msg != nullptr) { - argv[3] = OneByteString(env->isolate(), msg); - ClearError(); - } - - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) - req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); - req_wrap->Dispose(); + return res.err; } @@ -510,4 +371,39 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { stream->CallJSOnreadMethod(nread, obj); } + +void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( + StreamReq* req_wrap, int status) { + StreamBase* stream = static_cast(stream_); + Environment* env = stream->stream_env(); + AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); + Local req_wrap_obj = async_wrap->object(); + + Local argv[] = { + Integer::New(env->isolate(), status), + stream->GetObject(), + Undefined(env->isolate()) + }; + + const char* msg = stream->Error(); + if (msg != nullptr) { + argv[2] = OneByteString(env->isolate(), msg); + stream->ClearError(); + } + + if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) + async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); +} + +void ReportWritesToJSStreamListener::OnStreamAfterWrite( + WriteWrap* req_wrap, int status) { + OnStreamAfterReqFinished(req_wrap, status); +} + +void ReportWritesToJSStreamListener::OnStreamAfterShutdown( + ShutdownWrap* req_wrap, int status) { + OnStreamAfterReqFinished(req_wrap, status); +} + + } // namespace node diff --git a/src/stream_base.h b/src/stream_base.h index f18b6bda0a08a4..59b8ee7b7221f0 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -14,114 +14,75 @@ namespace node { // Forward declarations +class ShutdownWrap; +class WriteWrap; class StreamBase; class StreamResource; -template +struct StreamWriteResult { + bool async; + int err; + WriteWrap* wrap; +}; + + class StreamReq { public: - explicit StreamReq(StreamBase* stream) : stream_(stream) { + static constexpr int kStreamReqField = 1; + + explicit StreamReq(StreamBase* stream, + v8::Local req_wrap_obj) : stream_(stream) { + AttachToObject(req_wrap_obj); } - inline void Done(int status, const char* error_str = nullptr) { - Base* req = static_cast(this); - Environment* env = req->env(); - if (error_str != nullptr) { - req->object()->Set(env->error_string(), - OneByteString(env->isolate(), error_str)); - } + virtual ~StreamReq() {} + virtual AsyncWrap* GetAsyncWrap() = 0; + v8::Local object(); - req->OnDone(status); - } + void Done(int status, const char* error_str = nullptr); + void Dispose(); inline StreamBase* stream() const { return stream_; } + static StreamReq* FromObject(v8::Local req_wrap_obj); + + protected: + virtual void OnDone(int status) = 0; + + void AttachToObject(v8::Local req_wrap_obj); + private: StreamBase* const stream_; }; -class ShutdownWrap : public ReqWrap, - public StreamReq { +class ShutdownWrap : public StreamReq { public: - ShutdownWrap(Environment* env, - v8::Local req_wrap_obj, - StreamBase* stream) - : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), - StreamReq(stream) { - Wrap(req_wrap_obj, this); - } + ShutdownWrap(StreamBase* stream, + v8::Local req_wrap_obj) + : StreamReq(stream, req_wrap_obj) { } - ~ShutdownWrap() { - ClearWrap(object()); - } - - static ShutdownWrap* from_req(uv_shutdown_t* req) { - return ContainerOf(&ShutdownWrap::req_, req); - } - - size_t self_size() const override { return sizeof(*this); } - - inline void OnDone(int status); // Just calls stream()->AfterShutdown() + void OnDone(int status) override; // Just calls stream()->AfterShutdown() }; -class WriteWrap : public ReqWrap, - public StreamReq { +class WriteWrap : public StreamReq { public: - static inline WriteWrap* New(Environment* env, - v8::Local obj, - StreamBase* stream, - size_t extra = 0); - inline void Dispose(); - inline char* Extra(size_t offset = 0); - inline size_t ExtraSize() const; - - size_t self_size() const override { return storage_size_; } - - static WriteWrap* from_req(uv_write_t* req) { - return ContainerOf(&WriteWrap::req_, req); - } + char* Storage(); + size_t StorageSize() const; + void SetAllocatedStorage(char* data, size_t size); - static const size_t kAlignSize = 16; - - WriteWrap(Environment* env, - v8::Local obj, - StreamBase* stream) - : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(stream), - storage_size_(0) { - Wrap(obj, this); - } - - inline void OnDone(int status); // Just calls stream()->AfterWrite() - - protected: - WriteWrap(Environment* env, - v8::Local obj, - StreamBase* stream, - size_t storage_size) - : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(stream), - storage_size_(storage_size) { - Wrap(obj, this); - } + WriteWrap(StreamBase* stream, + v8::Local req_wrap_obj) + : StreamReq(stream, req_wrap_obj) { } ~WriteWrap() { - ClearWrap(object()); + free(storage_); } - void* operator new(size_t size) = delete; - void* operator new(size_t size, char* storage) { return storage; } - - // This is just to keep the compiler happy. It should never be called, since - // we don't use exceptions in node. - void operator delete(void* ptr, char* storage) { UNREACHABLE(); } + void OnDone(int status) override; // Just calls stream()->AfterWrite() private: - // People should not be using the non-placement new and delete operator on a - // WriteWrap. Ensure this never happens. - void operator delete(void* ptr) { UNREACHABLE(); } - - const size_t storage_size_; + char* storage_ = nullptr; + size_t storage_size_ = 0; }; @@ -147,15 +108,23 @@ class StreamListener { // `OnStreamRead()` is called when data is available on the socket and has // been read into the buffer provided by `OnStreamAlloc()`. // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer - // with base nullpptr in case of an error. + // with base nullptr in case of an error. // `nread` is the number of read bytes (which is at most the buffer length), // or, if negative, a libuv error code. virtual void OnStreamRead(ssize_t nread, const uv_buf_t& buf) = 0; - // This is called once a Write has finished. `status` may be 0 or, + // This is called once a write has finished. `status` may be 0 or, // if negative, a libuv error code. - virtual void OnStreamAfterWrite(WriteWrap* w, int status) {} + // By default, this is simply passed on to the previous listener + // (and raises an assertion if there is none). + virtual void OnStreamAfterWrite(WriteWrap* w, int status); + + // This is called once a shutdown has finished. `status` may be 0 or, + // if negative, a libuv error code. + // By default, this is simply passed on to the previous listener + // (and raises an assertion if there is none). + virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); // This is called immediately before the stream is destroyed. virtual void OnStreamDestroy() {} @@ -174,9 +143,21 @@ class StreamListener { }; +// An (incomplete) stream listener class that calls the `.oncomplete()` +// method of the JS objects associated with the wrap objects. +class ReportWritesToJSStreamListener : public StreamListener { + public: + void OnStreamAfterWrite(WriteWrap* w, int status) override; + void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; + + private: + void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); +}; + + // A default emitter that just pushes data chunks as Buffer instances to // JS land via the handle’s .ondata method. -class EmitToJSStreamListener : public StreamListener { +class EmitToJSStreamListener : public ReportWritesToJSStreamListener { public: void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; }; @@ -188,20 +169,31 @@ class StreamResource { public: virtual ~StreamResource(); - virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; - virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); - virtual int DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) = 0; + // These need to be implemented on the readable side of this stream: // Start reading from the underlying resource. This is called by the consumer - // when more data is desired. + // when more data is desired. Use `EmitAlloc()` and `EmitData()` to + // pass data along to the consumer. virtual int ReadStart() = 0; // Stop reading from the underlying resource. This is called by the // consumer when its buffers are full and no more data can be handled. virtual int ReadStop() = 0; + // These need to be implemented on the writable side of this stream: + // All of these methods may return an error code synchronously. + // In that case, the finish callback should *not* be called. + + // Perform a shutdown operation, and call req_wrap->Done() when finished. + virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; + // Try to write as much data as possible synchronously, and modify + // `*bufs` and `*count` accordingly. This is a no-op by default. + virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); + // Perform a write of data, and call req_wrap->Done() when finished. + virtual int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) = 0; + // Optionally, this may provide an error message to be used for // failing writes. virtual const char* Error() const; @@ -223,6 +215,8 @@ class StreamResource { void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); // Call the current listener's OnStreamAfterWrite() method. void EmitAfterWrite(WriteWrap* w, int status); + // Call the current listener's OnStreamAfterShutdown() method. + void EmitAfterShutdown(ShutdownWrap* w, int status); StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0; @@ -251,21 +245,40 @@ class StreamBase : public StreamResource { void CallJSOnreadMethod(ssize_t nread, v8::Local buf); - // These are called by the respective {Write,Shutdown}Wrap class. - virtual void AfterShutdown(ShutdownWrap* req, int status); - virtual void AfterWrite(WriteWrap* req, int status); - // This is named `stream_env` to avoid name clashes, because a lot of // subclasses are also `BaseObject`s. Environment* stream_env() const; - protected: - explicit StreamBase(Environment* env); + // Shut down the current stream. This request can use an existing + // ShutdownWrap object (that was created in JS), or a new one will be created. + int Shutdown(v8::Local req_wrap_obj = v8::Local()); + + // Write data to the current stream. This request can use an existing + // WriteWrap object (that was created in JS), or a new one will be created. + // This will first try to write synchronously using `DoTryWrite()`, then + // asynchronously using `DoWrite()`. + // If the return value indicates a synchronous completion, no callback will + // be invoked. + StreamWriteResult Write( + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle = nullptr, + v8::Local req_wrap_obj = v8::Local()); + + // These can be overridden by subclasses to get more specific wrap instances. + // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap + // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like + // an associated libuv request. + virtual ShutdownWrap* CreateShutdownWrap(v8::Local object); + virtual WriteWrap* CreateWriteWrap(v8::Local object); // One of these must be implemented virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local GetObject(); + protected: + explicit StreamBase(Environment* env); + // JS Methods int ReadStartJS(const v8::FunctionCallbackInfo& args); int ReadStopJS(const v8::FunctionCallbackInfo& args); @@ -292,6 +305,43 @@ class StreamBase : public StreamResource { private: Environment* env_; EmitToJSStreamListener default_listener_; + + // These are called by the respective {Write,Shutdown}Wrap class. + void AfterShutdown(ShutdownWrap* req, int status); + void AfterWrite(WriteWrap* req, int status); + + template + void AfterRequest(Wrap* req_wrap, EmitEvent emit); + + friend class WriteWrap; + friend class ShutdownWrap; +}; + + +// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances. +// `OtherBase` must have a constructor that matches the `AsyncWrap` +// constructors’s (Environment*, Local, AsyncWrap::Provider) signature +// and be a subclass of `AsyncWrap`. +template +class SimpleShutdownWrap : public ShutdownWrap, public OtherBase { + public: + SimpleShutdownWrap(StreamBase* stream, + v8::Local req_wrap_obj); + ~SimpleShutdownWrap(); + + AsyncWrap* GetAsyncWrap() override { return this; } + size_t self_size() const override { return sizeof(*this); } +}; + +template +class SimpleWriteWrap : public WriteWrap, public OtherBase { + public: + SimpleWriteWrap(StreamBase* stream, + v8::Local req_wrap_obj); + ~SimpleWriteWrap(); + + AsyncWrap* GetAsyncWrap() override { return this; } + size_t self_size() const override { return sizeof(*this) + StorageSize(); } }; } // namespace node diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index bc10cf80e828f1..e1df9edd39e151 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -61,19 +61,22 @@ void LibuvStreamWrap::Initialize(Local target, [](const FunctionCallbackInfo& args) { CHECK(args.IsConstructCall()); ClearWrap(args.This()); + args.This()->SetAlignedPointerInInternalField( + StreamReq::kStreamReqField, nullptr); }; Local sw = FunctionTemplate::New(env->isolate(), is_construct_call_callback); - sw->InstanceTemplate()->SetInternalFieldCount(1); + sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1); Local wrapString = FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap"); sw->SetClassName(wrapString); AsyncWrap::AddWrapMethods(env, sw); target->Set(wrapString, sw->GetFunction()); + env->set_shutdown_wrap_constructor_function(sw->GetFunction()); Local ww = FunctionTemplate::New(env->isolate(), is_construct_call_callback); - ww->InstanceTemplate()->SetInternalFieldCount(1); + ww->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1); Local writeWrapString = FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"); ww->SetClassName(writeWrapString); @@ -261,8 +264,20 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); } +typedef SimpleShutdownWrap, false> LibuvShutdownWrap; +typedef SimpleWriteWrap, false> LibuvWriteWrap; -int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) { +ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local object) { + return new LibuvShutdownWrap(this, object); +} + +WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local object) { + return new LibuvWriteWrap(this, object); +} + + +int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) { + LibuvShutdownWrap* req_wrap = static_cast(req_wrap_); int err; err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown); req_wrap->Dispatched(); @@ -271,7 +286,8 @@ int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) { void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) { - ShutdownWrap* req_wrap = ShutdownWrap::from_req(req); + LibuvShutdownWrap* req_wrap = static_cast( + LibuvShutdownWrap::from_req(req)); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); Context::Scope context_scope(req_wrap->env()->context()); @@ -319,10 +335,11 @@ int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { } -int LibuvStreamWrap::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) { +int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) { + LibuvWriteWrap* w = static_cast(req_wrap); int r; if (send_handle == nullptr) { r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite); @@ -349,7 +366,8 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { - WriteWrap* req_wrap = WriteWrap::from_req(req); + LibuvWriteWrap* req_wrap = static_cast( + LibuvWriteWrap::from_req(req)); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); Context::Scope context_scope(req_wrap->env()->context()); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index e5ad25b91e6fea..a97e8ba10f91d5 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -73,6 +73,9 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { return stream()->type == UV_TCP; } + ShutdownWrap* CreateShutdownWrap(v8::Local object) override; + WriteWrap* CreateWriteWrap(v8::Local object) override; + protected: LibuvStreamWrap(Environment* env, v8::Local object, diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 561a9785f28edc..109a08003e73af 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -292,37 +292,29 @@ void TLSWrap::EncOut() { for (size_t i = 0; i < count; i++) buf[i] = uv_buf_init(data[i], size[i]); - int err = stream_->DoTryWrite(&bufs, &count); - if (err != 0) { - InvokeQueued(err); - } else if (count == 0) { - env()->SetImmediate([](Environment* env, void* data) { - NODE_COUNT_NET_BYTES_SENT(write_size_); - static_cast(data)->OnStreamAfterWrite(nullptr, 0); - }, this, object()); + StreamWriteResult res = underlying_stream()->Write(bufs, count); + if (res.err != 0) { + InvokeQueued(res.err); return; } - Local req_wrap_obj = - env()->write_wrap_constructor_function() - ->NewInstance(env()->context()).ToLocalChecked(); - WriteWrap* write_req = WriteWrap::New(env(), - req_wrap_obj, - static_cast(stream_)); + NODE_COUNT_NET_BYTES_SENT(write_size_); - err = stream_->DoWrite(write_req, buf, count, nullptr); - - // Ignore errors, this should be already handled in js - if (err) { - write_req->Dispose(); - InvokeQueued(err); - } else { - NODE_COUNT_NET_BYTES_SENT(write_size_); + if (!res.async) { + // Simulate asynchronous finishing, TLS cannot handle this at the moment. + env()->SetImmediate([](Environment* env, void* data) { + static_cast(data)->OnStreamAfterWrite(nullptr, 0); + }, this, object()); } } void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) { + // Report back to the previous listener as well. This is only needed for the + // "empty" writes that are passed through directly to the underlying stream. + if (req_wrap != nullptr) + previous_listener_->OnStreamAfterWrite(req_wrap, status); + if (ssl_ == nullptr) status = UV_ECANCELED; @@ -520,24 +512,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() { bool TLSWrap::IsIPCPipe() { - return static_cast(stream_)->IsIPCPipe(); + return underlying_stream()->IsIPCPipe(); } int TLSWrap::GetFD() { - return static_cast(stream_)->GetFD(); + return underlying_stream()->GetFD(); } bool TLSWrap::IsAlive() { return ssl_ != nullptr && stream_ != nullptr && - static_cast(stream_)->IsAlive(); + underlying_stream()->IsAlive(); } bool TLSWrap::IsClosing() { - return static_cast(stream_)->IsClosing(); + return underlying_stream()->IsClosing(); } @@ -587,6 +579,17 @@ int TLSWrap::DoWrite(WriteWrap* w, // However, if there is any data that should be written to the socket, // the callback should not be invoked immediately if (BIO_pending(enc_out_) == 0) { + // We destroy the current WriteWrap* object and create a new one that + // matches the underlying stream, rather than the TLSWrap itself. + + // Note: We cannot simply use w->object() because of the "optimized" + // way in which we read persistent handles; the JS object itself might be + // destroyed by w->Dispose(), and the Local we have is not a + // "real" handle in the sense the V8 is aware of its existence. + Local req_wrap_obj = + w->GetAsyncWrap()->persistent().Get(env()->isolate()); + w->Dispose(); + w = underlying_stream()->CreateWriteWrap(req_wrap_obj); return stream_->DoWrite(w, bufs, count, send_handle); } } @@ -594,7 +597,6 @@ int TLSWrap::DoWrite(WriteWrap* w, // Store the current write wrap CHECK_EQ(current_write_, nullptr); current_write_ = w; - w->Dispatched(); // Write queued data if (empty) { @@ -684,6 +686,11 @@ void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { } +ShutdownWrap* TLSWrap::CreateShutdownWrap(Local req_wrap_object) { + return underlying_stream()->CreateShutdownWrap(req_wrap_object); +} + + int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) { crypto::MarkPopErrorOnReturn mark_pop_error_on_return; diff --git a/src/tls_wrap.h b/src/tls_wrap.h index a1f0b99e86beec..afd19c027e7007 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -65,6 +65,8 @@ class TLSWrap : public AsyncWrap, int ReadStart() override; int ReadStop() override; + ShutdownWrap* CreateShutdownWrap( + v8::Local req_wrap_object) override; int DoShutdown(ShutdownWrap* req_wrap) override; int DoWrite(WriteWrap* w, uv_buf_t* bufs, @@ -78,6 +80,10 @@ class TLSWrap : public AsyncWrap, size_t self_size() const override { return sizeof(*this); } protected: + inline StreamBase* underlying_stream() { + return static_cast(stream_); + } + static const int kClearOutChunkSize = 16384; // Maximum number of bytes for hello parser diff --git a/test/parallel/test-tcp-wrap-connect.js b/test/parallel/test-tcp-wrap-connect.js index c2746bca64d198..9f5560a385086a 100644 --- a/test/parallel/test-tcp-wrap-connect.js +++ b/test/parallel/test-tcp-wrap-connect.js @@ -23,10 +23,10 @@ function makeConnection() { const err = client.shutdown(shutdownReq); assert.strictEqual(err, 0); - shutdownReq.oncomplete = function(status, client_, req_) { + shutdownReq.oncomplete = function(status, client_, error) { assert.strictEqual(0, status); assert.strictEqual(client, client_); - assert.strictEqual(shutdownReq, req_); + assert.strictEqual(error, undefined); shutdownCount++; client.close(); };