Skip to content

Commit

Permalink
ReactorNetty HttpClient sometimes can't get Tomcat early error 400 re…
Browse files Browse the repository at this point in the history
…sponse (#2864)

In scenarios where the ReactorNetty HttpClient writes a substantial POST request to a Tomcat server through HttpClient.send(Mono), and HTTP/1.1 plain is used, and Tomcat promptly responds with an early "400 Bad Request" status before reading the full request body bytes, an issue arises. The HttpClient, in such cases, delays reading the response until after the entire request body has been flushed. Consequently, instead of correctly reporting the early "400 Bad Request" status, it might mistakenly trigger a "Connection prematurely closed BEFORE response" error, because at some point the connection is closed by the server while the client is still writing the request body.

To address this problem, this patch activates read interest in the channel when it becomes unwritable. This modification significantly improves the situation in most cases, especially considering Tomcat's default configuration, which includes the "maxSwallowSize" property. With this configuration, Tomcat continues reading request body bytes after sending the "400 Bad Request" response (up to 2 MB) before closing the connection, allowing the HttpClient ample time to consume the "400 Bad Request" status and deliver it to the user as expected. But for servers which close the connection immediately, a TCP/RST might be sent to the client, and in this case the patch cannot always work because TCP/RST is not reliable and any unread data from the HttpClient OS may be dropped.

Fixes #2825
  • Loading branch information
pderop authored Sep 15, 2023
1 parent ae0c49f commit 50b24b3
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ protected String asDebugLogMessage(Object o) {
return o.toString();
}

/**
* React on Channel writability change.
*
* @since 1.0.37
*/
protected void onWritabilityChanged() {
}

@Override
public boolean isPersistent() {
return connection.isPersistent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ final public void exceptionCaught(ChannelHandlerContext ctx, Throwable err) {
}
}

@Override
final public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ops.onWritabilityChanged();
}
}

static void safeRelease(Object msg) {
if (msg instanceof ReferenceCounted) {
ReferenceCounted referenceCounted = (ReferenceCounted) msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ protected final boolean markSentBody() {
return HTTP_STATE.compareAndSet(this, HEADERS_SENT, BODY_SENT);
}

/**
* Has Body been sent
*
* @return true if body has been sent
* @since 1.0.37
*/
protected final boolean hasSentBody() {
return statusAndHeadersSent == BODY_SENT;
}

/**
* Mark the headers and body sent
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,6 +63,10 @@ abstract class AbstractHttpClientMetricsHandler extends ChannelDuplexHandler {

final Function<String, String> uriTagValue;

int lastReadSeq;

int lastWriteSeq;

protected AbstractHttpClientMetricsHandler(@Nullable Function<String, String> uriTagValue) {
this.uriTagValue = uriTagValue;
}
Expand All @@ -77,6 +81,8 @@ protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy
this.path = copy.path;
this.status = copy.status;
this.uriTagValue = copy.uriTagValue;
this.lastWriteSeq = copy.lastWriteSeq;
this.lastReadSeq = copy.lastReadSeq;
}

@Override
Expand All @@ -90,10 +96,15 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
int currentLastWriteSeq = lastWriteSeq;
SocketAddress address = ctx.channel().remoteAddress();
promise.addListener(future -> {
try {
recordWrite(address);
// Record write, unless channelRead has already done it (because an early full response has been received)
if (currentLastWriteSeq == lastWriteSeq) {
lastWriteSeq = (lastWriteSeq + 1) & 0x7F_FF_FF_FF;
recordWrite(address);
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
Expand Down Expand Up @@ -126,6 +137,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
dataReceived += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
// Detect if we have received an early response before the request has been fully flushed.
// In this case, invoke recordwrite now (because next we will reset all class fields).
lastReadSeq = (lastReadSeq + 1) & 0x7F_FF_FF_FF;
if ((lastReadSeq > lastWriteSeq) || (lastReadSeq == 0 && lastWriteSeq == Integer.MAX_VALUE)) {
lastWriteSeq = (lastWriteSeq + 1) & 0x7F_FF_FF_FF;
recordWrite(ctx.channel().remoteAddress());
}
recordRead(ctx.channel().remoteAddress());
reset();
}
Expand Down Expand Up @@ -223,5 +241,6 @@ private void reset() {
dataSent = 0;
dataReceivedTime = 0;
dataSentTime = 0;
// don't reset lastWriteSeq and lastReadSeq, which must be incremented for ever
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.netty.handler.codec.http.multipart.HttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -524,6 +525,46 @@ else if (version.equals(HttpVersion.HTTP_1_1)) {
throw new IllegalStateException(version.protocolName() + " not supported");
}

/**
* React on channel unwritability event while the http client request is being written.
*
* <p>When using plain HTTP/1.1 and {@code HttpClient.send(Mono<ByteBuf>)}, if the socket becomes unwritable while writing,
* we need to request for reads. This is necessary to read any early server response, such as a 400 bad request followed
* by a socket close, while the request is still being written. Else, a "premature close exception before response" may be reported
* to the user, causing confusion about the server's early response.
*
* <p>There is no need to request for reading in other cases
* (H2/H2C/H1S/WebSocket), because in these cases the read interest has already been requested, or auto-read is enabled
*
* <p>Important notes:
* <p>
* - If the connection is unwritable and {@code send(Flux<ByteBuf>)} has been used, then {@code hasSentBody()} will
* always return false, because when {@code send(Flux<ByteBuf>)} is used, {@code hasSentBody()} can only return true
* if the request is fully written (see {@link #onOutboundComplete()} method which invokes {@code markSentBody()}
* and sets the state to BODY_SENT).
* So if channel is unwritable and {@code hasSentBody()} returns true, it means that {@code send(Mono<ByteBuf>)} has
* been used (see {@link HttpOperations#send(Publisher)} where {@code markSentHeaderAndBody(b)} is setting
* the state to BODY_SENT when the Publisher is a Mono).
*
* <p>- When the channel is unwritable, a channel read() has already been requested or is in auto-read if:
* <ul><li> Secure mode is used (Netty SslHandler requests read() when flushing).</li>
* <li>HTTP2 is used.</li>
* <li>WebSocket is used.</li>
* </ul>
*
* <p>See GH-2825 for more info
*/
@Override
protected void onWritabilityChanged() {
if (!isSecure &&
!channel().isWritable() && !channel().config().isAutoRead() &&
hasSentBody() &&
!(channel() instanceof Http2StreamChannel) &&
!isWebsocket()) {
channel().read();
}
}

@Override
protected void afterMarkSentHeaders() {
//Noop
Expand Down
39 changes: 38 additions & 1 deletion reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import javax.servlet.http.Part;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.Collection;

Expand All @@ -35,6 +36,8 @@
*/
public class TomcatServer {
static final String TOMCAT_BASE_DIR = "./build/tomcat";
public static final String TOO_LARGE = "Request payload too large";
public static final int PAYLOAD_MAX = 5000000;

final Tomcat tomcat;

Expand Down Expand Up @@ -82,6 +85,7 @@ public void createDefaultContext() {
addServlet(ctx, new StatusServlet(), "/status/*");
addServlet(ctx, new MultipartServlet(), "/multipart")
.setMultipartConfigElement(new MultipartConfigElement(""));
addServlet(ctx, new PayloadSizeServlet(), "/payload-size");
}

public void createContext(HttpServlet servlet, String mapping) {
Expand Down Expand Up @@ -163,4 +167,37 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws
}
}
}

static final class PayloadSizeServlet extends HttpServlet {

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException {
InputStream in = req.getInputStream();
int count = 0;
int n;

while ((n = in.read()) != -1) {
count += n;
if (count >= PAYLOAD_MAX) {
// By default, Tomcat is configured with maxSwallowSize=2 MB (see https://tomcat.apache.org/tomcat-9.0-doc/config/http.html)
// This means that once the 400 bad request is sent, the client will still be able to continue writing (if it is currently writing)
// up to 2 MB. So, it is very likely that the client will be blocked and it will then be able to consume the 400 bad request and
// close itself the connection.
sendResponse(resp, TOO_LARGE, HttpServletResponse.SC_BAD_REQUEST);
return;
}
}

sendResponse(resp, String.valueOf(count), HttpServletResponse.SC_OK);
}

private void sendResponse(HttpServletResponse resp, String message, int status) throws IOException {
resp.setStatus(status);
resp.setHeader("Transfer-Encoding", "chunked");
resp.setHeader("Content-Type", "text/plain");
PrintWriter out = resp.getWriter();
out.print(message);
out.flush();
}
}
}
Loading

0 comments on commit 50b24b3

Please sign in to comment.