From f0c8e9332ea6cddae799a41a4694b81d67299c19 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Fri, 1 Sep 2023 15:56:07 +0200 Subject: [PATCH] Avoid returning 400 bad request while client is still writing. --- .../test/java/reactor/netty/TomcatServer.java | 30 ++++++++++-- .../netty/http/client/HttpClientTest.java | 46 ++++++++----------- .../http/client/HttpClientWithTomcatTest.java | 16 +++++-- 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java b/reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java index a73cb7f219..3282169128 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java +++ b/reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java @@ -18,6 +18,7 @@ import org.apache.catalina.Context; import org.apache.catalina.Wrapper; import org.apache.catalina.startup.Tomcat; +import reactor.core.publisher.Sinks; import javax.servlet.MultipartConfigElement; import javax.servlet.ServletException; @@ -29,6 +30,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; +import java.time.Duration; import java.util.Collection; /** @@ -42,6 +44,8 @@ public class TomcatServer { boolean started; + final Sinks.Empty clientBlocked = Sinks.empty(); + public TomcatServer() { this(0); } @@ -53,6 +57,10 @@ public TomcatServer(int port) { this.tomcat.setBaseDir(baseDir.getAbsolutePath()); } + public Sinks.Empty getClientBlockedSink() { + return clientBlocked; + } + public int port() { if (this.started) { return this.tomcat.getConnector().getLocalPort(); @@ -84,7 +92,7 @@ public void createDefaultContext() { addServlet(ctx, new StatusServlet(), "/status/*"); addServlet(ctx, new MultipartServlet(), "/multipart") .setMultipartConfigElement(new MultipartConfigElement("")); - addServlet(ctx, new PayloadSizeServlet(), "/payload-size"); + addServlet(ctx, new PayloadSizeServlet(clientBlocked), "/payload-size"); } public void createContext(HttpServlet servlet, String mapping) { @@ -169,23 +177,37 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws static final class PayloadSizeServlet extends HttpServlet { - static final int MAX = 1024 * 64; + static final int MAX = 10; + + final Sinks.Empty clientBlocked; + + public PayloadSizeServlet(Sinks.Empty clientBlocked) { + this.clientBlocked = clientBlocked; + } + + void ensureClientBlocksOnWrite() { + // Ensure that the client is currently blocking on its socket write + clientBlocked.asMono().block(Duration.ofSeconds(30)); + } @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException { InputStream in = req.getInputStream(); - byte[] buf = new byte[4096]; - int count = 0; + byte[] buf = new byte[10]; + int count; int n; if ((count = req.getContentLength()) != -1 && count >= MAX) { + ensureClientBlocksOnWrite(); sendResponse(resp, TOO_LARGE, HttpServletResponse.SC_BAD_REQUEST); + return; } count = 0; while ((n = in.read(buf, 0, buf.length)) != -1) { count += n; if (count >= MAX) { + ensureClientBlocksOnWrite(); sendResponse(resp, TOO_LARGE, HttpServletResponse.SC_BAD_REQUEST); return; } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index f19a0f9617..91599847b5 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -68,7 +68,7 @@ import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; -import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; @@ -150,7 +150,7 @@ class HttpClientTest extends BaseHttpTest { static final Logger log = Loggers.getLogger(HttpClientTest.class); - static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 128, "X")) + static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 1024 * 5, "X")) .getBytes(Charset.defaultCharset()); static SelfSignedCertificate ssc; @@ -3422,32 +3422,18 @@ static Stream issue2825Params() { void testIssue2825(HttpProtocol serverProtocols, HttpProtocol clientProtocols, @Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx, Supplier> payload, long bytesToSend) { - int maxSize = 1024 * 64; // 400 bad request is returned if payload exceeds this limit, and the socket is then closed - AtomicInteger accum = new AtomicInteger(); String tooLargeRequest = "Request too large"; byte[] tooLargeRequestBytes = tooLargeRequest.getBytes(Charset.defaultCharset()); - byte[] requestFullyReceivedBytes = "Request fully received".getBytes(Charset.defaultCharset()); + Sinks.Empty clientBlocked = Sinks.empty(); HttpServer httpServer = createServer() .wiretap(false) - .route(r -> r.post("/large-payload", (req, res) -> req.receive() - .takeUntil(buf -> { - String clen = req.requestHeaders().get("Content-Length"); - if (clen != null) { - int contentLength = Integer.parseInt(clen); - accum.set(contentLength); - return contentLength >= maxSize; - } - else { - return accum.addAndGet(buf.readableBytes()) >= maxSize; - } - }) - .collectList() - .flatMapMany(byteBufs -> res.status(accum.get() < maxSize ? 200 : 400) - .header("Connection", "close") - .header("Content-Type", "text/plain") - .send(Mono.just(Unpooled.wrappedBuffer(accum.get() < maxSize ? - requestFullyReceivedBytes : tooLargeRequestBytes)))))); + .route(r -> r.post("/large-payload", (req, res) -> + res.status(400) + .header("Connection", "close") + .header("Content-Type", "text/plain") + .send(clientBlocked.asMono() + .then(Mono.just(Unpooled.wrappedBuffer(tooLargeRequestBytes)))))); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -3455,14 +3441,20 @@ void testIssue2825(HttpProtocol serverProtocols, HttpProtocol clientProtocols, AtomicReference serverAddress = new AtomicReference<>(); HttpClient client = customizeClientOptions(createClient(disposableServer.port()), clientCtx, clientProtocols) .metrics(true, ClientMetricsRecorder::reset) - .doOnConnected(conn -> serverAddress.set(conn.address())) - .disableRetry(true) - // Needed to trigger many writability change events - .doOnConnected(connection -> connection.channel().config().setOption(ChannelOption.SO_SNDBUF, 128)); + .doOnConnected(conn -> serverAddress.set(conn.address())); StepVerifier.create(client .wiretap(false) .headers(hdr -> hdr.set("Content-Type", "text/plain")) + .doOnRequest((req, conn) -> conn.addHandlerFirst(new ChannelInboundHandlerAdapter() { + @Override + final public void channelWritabilityChanged(ChannelHandlerContext ctx) { + ctx.fireChannelWritabilityChanged(); + if (!ctx.channel().isWritable()) { + clientBlocked.tryEmitEmpty(); + } + } + })) .post() .uri("/large-payload") .send(payload.get()) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java index 9c1a60dda8..f649a9258f 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java @@ -18,7 +18,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; @@ -73,7 +74,7 @@ */ class HttpClientWithTomcatTest { private static TomcatServer tomcat; - private static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 128, "X")) + private static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 1024 * 5, "X")) .getBytes(Charset.defaultCharset()); @BeforeAll @@ -352,11 +353,18 @@ void testIssue2825_Http11(@Nullable Supplier> payload, long b HttpClient client = HttpClient.create() .port(getPort()) .wiretap(false) - .disableRetry(true) .metrics(true, ClientMetricsRecorder::reset) + .doOnRequest((req, conn) -> conn.addHandlerFirst(new ChannelInboundHandlerAdapter() { + @Override + final public void channelWritabilityChanged(ChannelHandlerContext ctx) { + ctx.fireChannelWritabilityChanged(); + if (!ctx.channel().isWritable()) { + HttpClientWithTomcatTest.tomcat.getClientBlockedSink().tryEmitEmpty(); + } + } + })) // Needed to trigger many writability change events .doOnConnected(conn -> { - conn.channel().config().setOption(ChannelOption.SO_SNDBUF, 128); serverAddress.set(conn.address()); });