Skip to content

Commit

Permalink
Ensure Expect header is handled correctly with HTTP/2 (#2916)
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg authored Oct 2, 2023
1 parent 44454d2 commit 4617303
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
Expand All @@ -60,6 +63,7 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.ChannelPipelineConfigurer;
Expand Down Expand Up @@ -635,7 +639,8 @@ static void configureHttp11OrH2CleartextPipeline(
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
HttpClientUpgradeHandler upgradeHandler =
new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());

p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
Expand Down Expand Up @@ -1021,6 +1026,36 @@ public void onUncaughtException(Connection connection, Throwable error) {
}
}

static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler {

boolean is100Continue;

ReactorNettyHttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec, int maxContentLength) {
super(sourceCodec, upgradeCodec, maxContentLength);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
if (HttpResponseStatus.CONTINUE.equals(((HttpResponse) msg).status())) {
is100Continue = true;
ReferenceCountUtil.release(msg);
ctx.read();
return;
}
is100Continue = false;
}

if (is100Continue && msg instanceof LastHttpContent) {
is100Continue = false;
((LastHttpContent) msg).release();
return;
}

super.channelRead(ctx, msg);
}
}

static final class StreamConnectionObserver implements ConnectionObserver {

final Context context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
Expand Down Expand Up @@ -168,6 +170,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
//"FutureReturnValueIgnored" this is deliberate
ctx.write(new DefaultHttpContent((ByteBuf) msg), promise);
}
else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((HttpResponse) msg).status())) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
else {
//"FutureReturnValueIgnored" this is deliberate
ChannelFuture f = ctx.write(msg, promise);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import reactor.netty.channel.ChannelOperations;
Expand Down Expand Up @@ -59,6 +60,13 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
boolean lastContent = false;
if (msg instanceof Http2HeadersFrame) {
final Http2HeadersFrame responseHeaders = (Http2HeadersFrame) msg;

if (HttpResponseStatus.CONTINUE.codeAsText().contentEquals(responseHeaders.headers().status())) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
return;
}

lastContent = responseHeaders.isEndStream();

accessLogArgProvider.responseHeaders(responseHeaders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2Connection;
Expand Down Expand Up @@ -757,6 +758,37 @@ else if (clientProtocols.length == 2 && clientProtocols[1] == HttpProtocol.H2C)
}
}

@ParameterizedCompatibleCombinationsTest
void test100Continue(HttpServer server, HttpClient client) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
disposableServer =
server.handle((req, res) -> req.receive()
.aggregate()
.asString()
.flatMap(s -> {
latch.countDown();
return res.sendString(Mono.just(s))
.then();
}))
.bindNow();

Tuple2<String, Integer> content =
client.port(disposableServer.port())
.headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))
.post()
.uri("/")
.send(ByteBufFlux.fromString(Flux.just("1", "2", "3", "4", "5")))
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.status().code())))
.block(Duration.ofSeconds(5));

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

assertThat(content).isNotNull();
assertThat(content.getT1()).isEqualTo("12345");
assertThat(content.getT2()).isEqualTo(200);
}

static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter {

final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
Expand All @@ -38,7 +36,6 @@
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionProvider;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -299,38 +296,6 @@ public void webSocketRespondsToRequestsFromClients() {
}
*/

@Test
void test100Continue() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
disposableServer =
createServer()
.handle((req, res) -> req.receive()
.aggregate()
.asString()
.flatMap(s -> {
latch.countDown();
return res.sendString(Mono.just(s))
.then();
}))
.bindNow();

Tuple2<String, Integer> content =
createClient(disposableServer.port())
.headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))
.post()
.uri("/")
.send(ByteBufFlux.fromString(Flux.just("1", "2", "3", "4", "5")))
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.status().code())))
.block(Duration.ofSeconds(5));

Assertions.assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

Assertions.assertThat(content).isNotNull();
Assertions.assertThat(content.getT1()).isEqualTo("12345");
Assertions.assertThat(content.getT2()).isEqualTo(200);
}

@Test
void streamAndPoolExplicitCompression() {
Sinks.Many<String> ep = Sinks.unsafe().many().unicast().onBackpressureBuffer();
Expand Down

0 comments on commit 4617303

Please sign in to comment.