Skip to content

Commit

Permalink
Avoid returning 400 bad request while client is still writing.
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Sep 4, 2023
1 parent 972b528 commit f0c8e93
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 35 deletions.
30 changes: 26 additions & 4 deletions reactor-netty-http/src/test/java/reactor/netty/TomcatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.catalina.Context;
import org.apache.catalina.Wrapper;
import org.apache.catalina.startup.Tomcat;
import reactor.core.publisher.Sinks;

import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletException;
Expand All @@ -29,6 +30,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.time.Duration;
import java.util.Collection;

/**
Expand All @@ -42,6 +44,8 @@ public class TomcatServer {

boolean started;

final Sinks.Empty<Void> clientBlocked = Sinks.empty();

public TomcatServer() {
this(0);
}
Expand All @@ -53,6 +57,10 @@ public TomcatServer(int port) {
this.tomcat.setBaseDir(baseDir.getAbsolutePath());
}

public Sinks.Empty<Void> getClientBlockedSink() {
return clientBlocked;
}

public int port() {
if (this.started) {
return this.tomcat.getConnector().getLocalPort();
Expand Down Expand Up @@ -84,7 +92,7 @@ public void createDefaultContext() {
addServlet(ctx, new StatusServlet(), "/status/*");
addServlet(ctx, new MultipartServlet(), "/multipart")
.setMultipartConfigElement(new MultipartConfigElement(""));
addServlet(ctx, new PayloadSizeServlet(), "/payload-size");
addServlet(ctx, new PayloadSizeServlet(clientBlocked), "/payload-size");
}

public void createContext(HttpServlet servlet, String mapping) {
Expand Down Expand Up @@ -169,23 +177,37 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws

static final class PayloadSizeServlet extends HttpServlet {

static final int MAX = 1024 * 64;
static final int MAX = 10;

final Sinks.Empty<Void> clientBlocked;

public PayloadSizeServlet(Sinks.Empty<Void> clientBlocked) {
this.clientBlocked = clientBlocked;
}

void ensureClientBlocksOnWrite() {
// Ensure that the client is currently blocking on its socket write
clientBlocked.asMono().block(Duration.ofSeconds(30));
}

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException {
InputStream in = req.getInputStream();
byte[] buf = new byte[4096];
int count = 0;
byte[] buf = new byte[10];
int count;
int n;

if ((count = req.getContentLength()) != -1 && count >= MAX) {
ensureClientBlocksOnWrite();
sendResponse(resp, TOO_LARGE, HttpServletResponse.SC_BAD_REQUEST);
return;
}

count = 0;
while ((n = in.read(buf, 0, buf.length)) != -1) {
count += n;
if (count >= MAX) {
ensureClientBlocksOnWrite();
sendResponse(resp, TOO_LARGE, HttpServletResponse.SC_BAD_REQUEST);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
Expand Down Expand Up @@ -150,7 +150,7 @@ class HttpClientTest extends BaseHttpTest {

static final Logger log = Loggers.getLogger(HttpClientTest.class);

static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 128, "X"))
static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 1024 * 5, "X"))
.getBytes(Charset.defaultCharset());

static SelfSignedCertificate ssc;
Expand Down Expand Up @@ -3422,47 +3422,39 @@ static Stream<Arguments> issue2825Params() {
void testIssue2825(HttpProtocol serverProtocols, HttpProtocol clientProtocols,
@Nullable SslProvider.ProtocolSslContextSpec serverCtx, @Nullable SslProvider.ProtocolSslContextSpec clientCtx,
Supplier<Publisher<ByteBuf>> payload, long bytesToSend) {
int maxSize = 1024 * 64; // 400 bad request is returned if payload exceeds this limit, and the socket is then closed
AtomicInteger accum = new AtomicInteger();
String tooLargeRequest = "Request too large";
byte[] tooLargeRequestBytes = tooLargeRequest.getBytes(Charset.defaultCharset());
byte[] requestFullyReceivedBytes = "Request fully received".getBytes(Charset.defaultCharset());
Sinks.Empty<Void> clientBlocked = Sinks.empty();

HttpServer httpServer = createServer()
.wiretap(false)
.route(r -> r.post("/large-payload", (req, res) -> req.receive()
.takeUntil(buf -> {
String clen = req.requestHeaders().get("Content-Length");
if (clen != null) {
int contentLength = Integer.parseInt(clen);
accum.set(contentLength);
return contentLength >= maxSize;
}
else {
return accum.addAndGet(buf.readableBytes()) >= maxSize;
}
})
.collectList()
.flatMapMany(byteBufs -> res.status(accum.get() < maxSize ? 200 : 400)
.header("Connection", "close")
.header("Content-Type", "text/plain")
.send(Mono.just(Unpooled.wrappedBuffer(accum.get() < maxSize ?
requestFullyReceivedBytes : tooLargeRequestBytes))))));
.route(r -> r.post("/large-payload", (req, res) ->
res.status(400)
.header("Connection", "close")
.header("Content-Type", "text/plain")
.send(clientBlocked.asMono()
.then(Mono.just(Unpooled.wrappedBuffer(tooLargeRequestBytes))))));

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.bindNow();

AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
HttpClient client = customizeClientOptions(createClient(disposableServer.port()), clientCtx, clientProtocols)
.metrics(true, ClientMetricsRecorder::reset)
.doOnConnected(conn -> serverAddress.set(conn.address()))
.disableRetry(true)
// Needed to trigger many writability change events
.doOnConnected(connection -> connection.channel().config().setOption(ChannelOption.SO_SNDBUF, 128));
.doOnConnected(conn -> serverAddress.set(conn.address()));

StepVerifier.create(client
.wiretap(false)
.headers(hdr -> hdr.set("Content-Type", "text/plain"))
.doOnRequest((req, conn) -> conn.addHandlerFirst(new ChannelInboundHandlerAdapter() {
@Override
final public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ctx.fireChannelWritabilityChanged();
if (!ctx.channel().isWritable()) {
clientBlocked.tryEmitEmpty();
}
}
}))
.post()
.uri("/large-payload")
.send(payload.get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
Expand Down Expand Up @@ -73,7 +74,7 @@
*/
class HttpClientWithTomcatTest {
private static TomcatServer tomcat;
private static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 128, "X"))
private static final byte[] PAYLOAD = String.join("", Collections.nCopies(1024 * 1024 * 5, "X"))
.getBytes(Charset.defaultCharset());

@BeforeAll
Expand Down Expand Up @@ -352,11 +353,18 @@ void testIssue2825_Http11(@Nullable Supplier<Publisher<ByteBuf>> payload, long b
HttpClient client = HttpClient.create()
.port(getPort())
.wiretap(false)
.disableRetry(true)
.metrics(true, ClientMetricsRecorder::reset)
.doOnRequest((req, conn) -> conn.addHandlerFirst(new ChannelInboundHandlerAdapter() {
@Override
final public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ctx.fireChannelWritabilityChanged();
if (!ctx.channel().isWritable()) {
HttpClientWithTomcatTest.tomcat.getClientBlockedSink().tryEmitEmpty();
}
}
}))
// Needed to trigger many writability change events
.doOnConnected(conn -> {
conn.channel().config().setOption(ChannelOption.SO_SNDBUF, 128);
serverAddress.set(conn.address());
});

Expand Down

0 comments on commit f0c8e93

Please sign in to comment.