diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index fc34d84514..da3fc5bf31 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -44,6 +44,9 @@ import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.ClientCookieEncoder; import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; @@ -60,6 +63,7 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.resolver.AddressResolverGroup; +import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.ChannelPipelineConfigurer; @@ -635,7 +639,8 @@ static void configureHttp11OrH2CleartextPipeline( Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); - HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); + HttpClientUpgradeHandler upgradeHandler = + new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler) @@ -1021,6 +1026,36 @@ public void onUncaughtException(Connection connection, Throwable error) { } } + static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler { + + boolean is100Continue; + + ReactorNettyHttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec, int maxContentLength) { + super(sourceCodec, upgradeCodec, maxContentLength); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HttpResponse) { + if (HttpResponseStatus.CONTINUE.equals(((HttpResponse) msg).status())) { + is100Continue = true; + ReferenceCountUtil.release(msg); + ctx.read(); + return; + } + is100Continue = false; + } + + if (is100Continue && msg instanceof LastHttpContent) { + is100Continue = false; + ((LastHttpContent) msg).release(); + return; + } + + super.channelRead(ctx, msg); + } + } + static final class StreamConnectionObserver implements ConnectionObserver { final Context context; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java index d58a39f485..78a2b8f4d6 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java @@ -31,6 +31,8 @@ import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; @@ -168,6 +170,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) //"FutureReturnValueIgnored" this is deliberate ctx.write(new DefaultHttpContent((ByteBuf) msg), promise); } + else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((HttpResponse) msg).status())) { + //"FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + } else { //"FutureReturnValueIgnored" this is deliberate ChannelFuture f = ctx.write(msg, promise); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogHandlerH2.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogHandlerH2.java index 4493ea9ea6..004716ad35 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogHandlerH2.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogHandlerH2.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http2.Http2DataFrame; import io.netty.handler.codec.http2.Http2HeadersFrame; import reactor.netty.channel.ChannelOperations; @@ -59,6 +60,13 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) boolean lastContent = false; if (msg instanceof Http2HeadersFrame) { final Http2HeadersFrame responseHeaders = (Http2HeadersFrame) msg; + + if (HttpResponseStatus.CONTINUE.codeAsText().contentEquals(responseHeaders.headers().status())) { + //"FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + return; + } + lastContent = responseHeaders.isEndStream(); accessLogArgProvider.responseHeaders(responseHeaders) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java index 01af444975..b7a09a6e40 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java @@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http2.Http2Connection; @@ -757,6 +758,37 @@ else if (clientProtocols.length == 2 && clientProtocols[1] == HttpProtocol.H2C) } } + @ParameterizedCompatibleCombinationsTest + void test100Continue(HttpServer server, HttpClient client) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + disposableServer = + server.handle((req, res) -> req.receive() + .aggregate() + .asString() + .flatMap(s -> { + latch.countDown(); + return res.sendString(Mono.just(s)) + .then(); + })) + .bindNow(); + + Tuple2 content = + client.port(disposableServer.port()) + .headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE)) + .post() + .uri("/") + .send(ByteBufFlux.fromString(Flux.just("1", "2", "3", "4", "5"))) + .responseSingle((res, bytes) -> bytes.asString() + .zipWith(Mono.just(res.status().code()))) + .block(Duration.ofSeconds(5)); + + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + + assertThat(content).isNotNull(); + assertThat(content.getT1()).isEqualTo("12345"); + assertThat(content.getT2()).isEqualTo(200); + } + static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter { final CountDownLatch latch = new CountDownLatch(1); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpTests.java index 3ab904a499..62ed2b6197 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; @@ -38,7 +36,6 @@ import reactor.netty.http.server.HttpServer; import reactor.netty.resources.ConnectionProvider; import reactor.test.StepVerifier; -import reactor.util.function.Tuple2; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -299,38 +296,6 @@ public void webSocketRespondsToRequestsFromClients() { } */ - @Test - void test100Continue() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - disposableServer = - createServer() - .handle((req, res) -> req.receive() - .aggregate() - .asString() - .flatMap(s -> { - latch.countDown(); - return res.sendString(Mono.just(s)) - .then(); - })) - .bindNow(); - - Tuple2 content = - createClient(disposableServer.port()) - .headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE)) - .post() - .uri("/") - .send(ByteBufFlux.fromString(Flux.just("1", "2", "3", "4", "5"))) - .responseSingle((res, bytes) -> bytes.asString() - .zipWith(Mono.just(res.status().code()))) - .block(Duration.ofSeconds(5)); - - Assertions.assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - - Assertions.assertThat(content).isNotNull(); - Assertions.assertThat(content.getT1()).isEqualTo("12345"); - Assertions.assertThat(content.getT2()).isEqualTo(200); - } - @Test void streamAndPoolExplicitCompression() { Sinks.Many ep = Sinks.unsafe().many().unicast().onBackpressureBuffer();