Skip to content

Commit

Permalink
Merge #2864 into 1.1.12
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Sep 15, 2023
2 parents a35ea43 + 50b24b3 commit c7ba195
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,14 @@ protected String asDebugLogMessage(Object o) {
return o.toString();
}

/**
* React on Channel writability change.
*
* @since 1.0.37
*/
protected void onWritabilityChanged() {
}

@Override
public boolean isPersistent() {
return connection.isPersistent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ final public void exceptionCaught(ChannelHandlerContext ctx, Throwable err) {
}
}

@Override
final public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ops.onWritabilityChanged();
}
}

static void safeRelease(Object msg) {
if (msg instanceof ReferenceCounted) {
ReferenceCounted referenceCounted = (ReferenceCounted) msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ protected final boolean markSentBody() {
return HTTP_STATE.compareAndSet(this, HEADERS_SENT, BODY_SENT);
}

/**
* Has Body been sent
*
* @return true if body has been sent
* @since 1.0.37
*/
protected final boolean hasSentBody() {
return statusAndHeadersSent == BODY_SENT;
}

/**
* Mark the headers and body sent
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,6 +64,10 @@ abstract class AbstractHttpClientMetricsHandler extends ChannelDuplexHandler {

final Function<String, String> uriTagValue;

int lastReadSeq;

int lastWriteSeq;

protected AbstractHttpClientMetricsHandler(@Nullable Function<String, String> uriTagValue) {
this.uriTagValue = uriTagValue;
}
Expand All @@ -78,6 +82,8 @@ protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy
this.path = copy.path;
this.status = copy.status;
this.uriTagValue = copy.uriTagValue;
this.lastWriteSeq = copy.lastWriteSeq;
this.lastReadSeq = copy.lastReadSeq;
}

@Override
Expand All @@ -91,10 +97,15 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
int currentLastWriteSeq = lastWriteSeq;
SocketAddress address = ctx.channel().remoteAddress();
promise.addListener(future -> {
try {
recordWrite(address);
// Record write, unless channelRead has already done it (because an early full response has been received)
if (currentLastWriteSeq == lastWriteSeq) {
lastWriteSeq = (lastWriteSeq + 1) & 0x7F_FF_FF_FF;
recordWrite(address);
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
Expand Down Expand Up @@ -128,6 +139,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
dataReceived += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
// Detect if we have received an early response before the request has been fully flushed.
// In this case, invoke recordwrite now (because next we will reset all class fields).
lastReadSeq = (lastReadSeq + 1) & 0x7F_FF_FF_FF;
if ((lastReadSeq > lastWriteSeq) || (lastReadSeq == 0 && lastWriteSeq == Integer.MAX_VALUE)) {
lastWriteSeq = (lastWriteSeq + 1) & 0x7F_FF_FF_FF;
recordWrite(ctx.channel().remoteAddress());
}
recordRead(ctx.channel());
reset();
}
Expand Down Expand Up @@ -217,6 +235,7 @@ protected void reset() {
dataSent = 0;
dataReceivedTime = 0;
dataSentTime = 0;
// don't reset lastWriteSeq and lastReadSeq, which must be incremented for ever
}

protected void startRead(HttpResponse msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.netty.handler.codec.http.multipart.HttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -524,6 +525,46 @@ else if (version.equals(HttpVersion.HTTP_1_1)) {
throw new IllegalStateException(version.protocolName() + " not supported");
}

/**
* React on channel unwritability event while the http client request is being written.
*
* <p>When using plain HTTP/1.1 and {@code HttpClient.send(Mono<ByteBuf>)}, if the socket becomes unwritable while writing,
* we need to request for reads. This is necessary to read any early server response, such as a 400 bad request followed
* by a socket close, while the request is still being written. Else, a "premature close exception before response" may be reported
* to the user, causing confusion about the server's early response.
*
* <p>There is no need to request for reading in other cases
* (H2/H2C/H1S/WebSocket), because in these cases the read interest has already been requested, or auto-read is enabled
*
* <p>Important notes:
* <p>
* - If the connection is unwritable and {@code send(Flux<ByteBuf>)} has been used, then {@code hasSentBody()} will
* always return false, because when {@code send(Flux<ByteBuf>)} is used, {@code hasSentBody()} can only return true
* if the request is fully written (see {@link #onOutboundComplete()} method which invokes {@code markSentBody()}
* and sets the state to BODY_SENT).
* So if channel is unwritable and {@code hasSentBody()} returns true, it means that {@code send(Mono<ByteBuf>)} has
* been used (see {@link HttpOperations#send(Publisher)} where {@code markSentHeaderAndBody(b)} is setting
* the state to BODY_SENT when the Publisher is a Mono).
*
* <p>- When the channel is unwritable, a channel read() has already been requested or is in auto-read if:
* <ul><li> Secure mode is used (Netty SslHandler requests read() when flushing).</li>
* <li>HTTP2 is used.</li>
* <li>WebSocket is used.</li>
* </ul>
*
* <p>See GH-2825 for more info
*/
@Override
protected void onWritabilityChanged() {
if (!isSecure &&
!channel().isWritable() && !channel().config().isAutoRead() &&
hasSentBody() &&
!(channel() instanceof Http2StreamChannel) &&
!isWebsocket()) {
channel().read();
}
}

@Override
protected void afterMarkSentHeaders() {
//Noop
Expand Down
39 changes: 38 additions & 1 deletion reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import javax.servlet.http.Part;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.Collection;

Expand All @@ -35,6 +36,8 @@
*/
public class TomcatServer {
static final String TOMCAT_BASE_DIR = "./build/tomcat";
public static final String TOO_LARGE = "Request payload too large";
public static final int PAYLOAD_MAX = 5000000;

final Tomcat tomcat;

Expand Down Expand Up @@ -82,6 +85,7 @@ public void createDefaultContext() {
addServlet(ctx, new StatusServlet(), "/status/*");
addServlet(ctx, new MultipartServlet(), "/multipart")
.setMultipartConfigElement(new MultipartConfigElement(""));
addServlet(ctx, new PayloadSizeServlet(), "/payload-size");
}

public void createContext(HttpServlet servlet, String mapping) {
Expand Down Expand Up @@ -163,4 +167,37 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws
}
}
}

static final class PayloadSizeServlet extends HttpServlet {

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException {
InputStream in = req.getInputStream();
int count = 0;
int n;

while ((n = in.read()) != -1) {
count += n;
if (count >= PAYLOAD_MAX) {
// By default, Tomcat is configured with maxSwallowSize=2 MB (see https://tomcat.apache.org/tomcat-9.0-doc/config/http.html)
// This means that once the 400 bad request is sent, the client will still be able to continue writing (if it is currently writing)
// up to 2 MB. So, it is very likely that the client will be blocked and it will then be able to consume the 400 bad request and
// close itself the connection.
sendResponse(resp, TOO_LARGE, HttpServletResponse.SC_BAD_REQUEST);
return;
}
}

sendResponse(resp, String.valueOf(count), HttpServletResponse.SC_OK);
}

private void sendResponse(HttpServletResponse resp, String message, int status) throws IOException {
resp.setStatus(status);
resp.setHeader("Transfer-Encoding", "chunked");
resp.setHeader("Content-Type", "text/plain");
PrintWriter out = resp.getWriter();
out.print(message);
out.flush();
}
}
}
Loading

0 comments on commit c7ba195

Please sign in to comment.