Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: refactor how trailers are done #19959

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,19 @@ When setting the priority for an HTTP/2 stream, the stream may be marked as
a dependency for a parent stream. This error code is used when an attempt is
made to mark a stream and dependent of itself.

<a id="ERR_HTTP2_TRAILERS_ALREADY_SENT"></a>
### ERR_HTTP2_TRAILERS_ALREADY_SENT

Trailing headers have already been sent on the `Http2Stream`.

<a id="ERR_HTTP2_TRAILERS_NOT_READY"></a>
### ERR_HTTP2_TRAILERS_NOT_READY

The `http2stream.sendTrailers()` method cannot be called until after the
`'wantTrailers'` event is emitted on an `Http2Stream` object. The
`'wantTrailers'` event will only be emitted if the `waitForTrailers` option
is set for the `Http2Stream`.

<a id="ERR_HTTP2_UNSUPPORTED_PROTOCOL"></a>
### ERR_HTTP2_UNSUPPORTED_PROTOCOL

Expand Down
191 changes: 116 additions & 75 deletions doc/api/http2.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,11 @@ E('ERR_HTTP2_STREAM_CANCEL', 'The pending stream has been canceled', Error);
E('ERR_HTTP2_STREAM_ERROR', 'Stream closed with error code %s', Error);
E('ERR_HTTP2_STREAM_SELF_DEPENDENCY',
'A stream cannot depend on itself', Error);
E('ERR_HTTP2_TRAILERS_ALREADY_SENT',
'Trailing headers have already been sent', Error);
E('ERR_HTTP2_TRAILERS_NOT_READY',
'Trailing headers cannot be sent until after the wantTrailers event is ' +
'emitted', Error);
E('ERR_HTTP2_UNSUPPORTED_PROTOCOL', 'protocol "%s" is unsupported.', Error);
E('ERR_HTTP_HEADERS_SENT',
'Cannot %s headers after they are sent to the client', Error);
Expand Down
7 changes: 6 additions & 1 deletion lib/internal/http2/compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ class Http2ServerRequest extends Readable {
}
}

function onStreamTrailersReady() {
this[kStream].sendTrailers(this[kTrailers]);
}

class Http2ServerResponse extends Stream {
constructor(stream, options) {
super(options);
Expand All @@ -377,6 +381,7 @@ class Http2ServerResponse extends Stream {
stream.on('drain', onStreamDrain);
stream.on('aborted', onStreamAbortedResponse);
stream.on('close', this[kFinish].bind(this));
stream.on('wantTrailers', onStreamTrailersReady.bind(this));
}

// User land modules such as finalhandler just check truthiness of this
Expand Down Expand Up @@ -648,7 +653,7 @@ class Http2ServerResponse extends Stream {
headers[HTTP2_HEADER_STATUS] = state.statusCode;
const options = {
endStream: state.ending,
getTrailers: (trailers) => Object.assign(trailers, this[kTrailers])
waitForTrailers: true,
};
this[kStream].respond(headers, options);
}
Expand Down
85 changes: 43 additions & 42 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const {
ERR_HTTP2_STREAM_CANCEL,
ERR_HTTP2_STREAM_ERROR,
ERR_HTTP2_STREAM_SELF_DEPENDENCY,
ERR_HTTP2_TRAILERS_ALREADY_SENT,
ERR_HTTP2_TRAILERS_NOT_READY,
ERR_HTTP2_UNSUPPORTED_PROTOCOL,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_CALLBACK,
Expand Down Expand Up @@ -295,25 +297,18 @@ function tryClose(fd) {
fs.close(fd, (err) => assert.ifError(err));
}

// Called to determine if there are trailers to be sent at the end of a
// Stream. The 'getTrailers' callback is invoked and passed a holder object.
// The trailers to return are set on that object by the handler. Once the
// event handler returns, those are sent off for processing. Note that this
// is a necessarily synchronous operation. We need to know immediately if
// there are trailing headers to send.
// Called when the Http2Stream has finished sending data and is ready for
// trailers to be sent. This will only be called if the { hasOptions: true }
// option is set.
function onStreamTrailers() {
const stream = this[kOwner];
stream[kState].trailersReady = true;
if (stream.destroyed)
return [];
const trailers = Object.create(null);
stream[kState].getTrailers.call(stream, trailers);
const headersList = mapToHeaders(trailers, assertValidPseudoHeaderTrailer);
if (!Array.isArray(headersList)) {
stream.destroy(headersList);
return [];
return;
if (!stream.emit('wantTrailers')) {
// There are no listeners, send empty trailing HEADERS frame and close.
stream.sendTrailers({});
}
stream[kSentTrailers] = trailers;
return headersList;
}

// Submit an RST-STREAM frame to be sent to the remote peer.
Expand Down Expand Up @@ -527,10 +522,8 @@ function requestOnConnect(headers, options) {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (typeof options.getTrailers === 'function') {
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

// ret will be either the reserved stream ID (if positive)
// or an error code (if negative)
Expand Down Expand Up @@ -1408,11 +1401,6 @@ class ClientHttp2Session extends Http2Session {
throw new ERR_INVALID_OPT_VALUE('endStream', options.endStream);
}

if (options.getTrailers !== undefined &&
typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}

const headersList = mapToHeaders(headers);
if (!Array.isArray(headersList))
throw headersList;
Expand Down Expand Up @@ -1504,7 +1492,8 @@ class Http2Stream extends Duplex {
this[kState] = {
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0
writeQueueSize: 0,
trailersReady: false
};

this.on('resume', streamOnResume);
Expand Down Expand Up @@ -1745,6 +1734,33 @@ class Http2Stream extends Duplex {
priorityFn();
}

sendTrailers(headers) {
if (this.destroyed || this.closed)
throw new ERR_HTTP2_INVALID_STREAM();
if (this[kSentTrailers])
throw new ERR_HTTP2_TRAILERS_ALREADY_SENT();
if (!this[kState].trailersReady)
throw new ERR_HTTP2_TRAILERS_NOT_READY();

assertIsObject(headers, 'headers');
headers = Object.assign(Object.create(null), headers);

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: sending trailers`);

this[kUpdateTimer]();

const headersList = mapToHeaders(headers, assertValidPseudoHeaderTrailer);
if (!Array.isArray(headersList))
throw headersList;
this[kSentTrailers] = headers;

const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
}

get closed() {
return !!(this[kState].flags & STREAM_FLAGS_CLOSED);
}
Expand Down Expand Up @@ -2208,13 +2224,8 @@ class ServerHttp2Stream extends Http2Stream {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
state.getTrailers = options.getTrailers;
}

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2274,13 +2285,8 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', 'number', fd);
Expand Down Expand Up @@ -2340,13 +2346,8 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.getTrailers !== undefined) {
if (typeof options.getTrailers !== 'function') {
throw new ERR_INVALID_OPT_VALUE('getTrailers', options.getTrailers);
}
if (options.waitForTrailers)
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].getTrailers = options.getTrailers;
}

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
Expand Down
100 changes: 40 additions & 60 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,16 +1066,6 @@ int Http2Session::OnNghttpError(nghttp2_session* handle,
return 0;
}

// Once all of the DATA frames for a Stream have been sent, the GetTrailers
// method calls out to JavaScript to fetch the trailing headers that need
// to be sent.
void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) {
if (!stream->IsDestroyed() && stream->HasTrailers()) {
Http2Stream::SubmitTrailers submit_trailers{this, stream, flags};
stream->OnTrailers(submit_trailers);
}
}

uv_buf_t Http2StreamListener::OnStreamAlloc(size_t size) {
// See the comments in Http2Session::OnDataChunkReceived
// (which is the only possible call site for this method).
Expand Down Expand Up @@ -1111,25 +1101,6 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
stream->CallJSOnreadMethod(nread, buffer);
}

Http2Stream::SubmitTrailers::SubmitTrailers(
Http2Session* session,
Http2Stream* stream,
uint32_t* flags)
: session_(session), stream_(stream), flags_(flags) { }


void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
Http2Scope h2scope(session_);
if (length == 0)
return;
DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d",
stream_->id(), length);
*flags_ |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
CHECK_EQ(
nghttp2_submit_trailer(**session_, stream_->id(), trailers, length), 0);
}


// Called by OnFrameReceived to notify JavaScript land that a complete
// HEADERS frame has been received and processed. This method converts the
Expand Down Expand Up @@ -1725,30 +1696,6 @@ nghttp2_stream* Http2Stream::operator*() {
return nghttp2_session_find_stream(**session_, id_);
}


// Calls out to JavaScript land to fetch the actual trailer headers to send
// for this stream.
void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
DEBUG_HTTP2STREAM(this, "prompting for trailers");
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);

Local<Value> ret =
MakeCallback(env()->ontrailers_string(), 0, nullptr).ToLocalChecked();
if (!ret.IsEmpty() && !IsDestroyed()) {
if (ret->IsArray()) {
Local<Array> headers = ret.As<Array>();
if (headers->Length() > 0) {
Headers trailers(isolate, context, headers);
submit_trailers.Submit(*trailers, trailers.length());
}
}
}
}

void Http2Stream::Close(int32_t code) {
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
Expand Down Expand Up @@ -1843,6 +1790,26 @@ int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
return ret;
}

void Http2Stream::OnTrailers() {
DEBUG_HTTP2STREAM(this, "let javascript know we are ready for trailers");
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);
MakeCallback(env()->ontrailers_string(), 0, nullptr);
}

// Submit informational headers for a stream.
int Http2Stream::SubmitTrailers(nghttp2_nv* nva, size_t len) {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending %d trailers", len);
int ret = nghttp2_submit_trailer(**session_, id_, nva, len);
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
return ret;
}

// Submit a PRIORITY frame to the connected peer.
int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
bool silent) {
Expand Down Expand Up @@ -2068,13 +2035,10 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
if (stream->queue_.empty() && !stream->IsWritable()) {
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
session->GetTrailers(stream, flags);
// If the stream or session gets destroyed during the GetTrailers
// callback, check that here and close down the stream
if (stream->IsDestroyed())
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if (session->IsDestroyed())
return NGHTTP2_ERR_CALLBACK_FAILURE;
if (stream->HasTrailers()) {
*flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
stream->OnTrailers();
}
}

stream->statistics_.sent_bytes += amount;
Expand Down Expand Up @@ -2361,6 +2325,21 @@ void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {
headers->Length());
}

// Submits trailing headers on the Http2Stream
void Http2Stream::Trailers(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
Isolate* isolate = env->isolate();
Http2Stream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());

Local<Array> headers = args[0].As<Array>();

Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitTrailers(*list, list.length()));
DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", headers->Length());
}

// Grab the numeric id of the Http2Stream
void Http2Stream::GetID(const FunctionCallbackInfo<Value>& args) {
Http2Stream* stream;
Expand Down Expand Up @@ -2706,6 +2685,7 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(stream, "priority", Http2Stream::Priority);
env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise);
env->SetProtoMethod(stream, "info", Http2Stream::Info);
env->SetProtoMethod(stream, "trailers", Http2Stream::Trailers);
env->SetProtoMethod(stream, "respond", Http2Stream::Respond);
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);
Expand Down
Loading