From 004cd012e1cf94f8bb948d7c350f8c06df44af72 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 29 Aug 2023 01:58:04 -0700 Subject: [PATCH] HttpClient: Include error handler on all connection attempts. (#14915) Currently we have an error handler for https connection attempts, but not for plaintext connection attempts. This leads to warnings like the following for plaintext connection errors: EXCEPTION, please implement org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() for proper handling. This happens because if we don't add our own error handler, the last handler in the chain during a connection attempt is HttpContentDecompressor, which doesn't handle errors. The new error handler for plaintext doesn't do much: it just closes the channel. --- .../client/pool/ChannelResourceFactory.java | 87 ++++++++++++++----- .../util/http/client/JankyServersTest.java | 72 +++++++++++++++ 2 files changed, 135 insertions(+), 24 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java index d6465be305ed..c67989458840 100644 --- a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java @@ -44,6 +44,7 @@ import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.util.Timer; +import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; @@ -53,13 +54,15 @@ import java.util.concurrent.TimeUnit; /** + * */ public class ChannelResourceFactory implements ResourceFactory { private static final Logger log = new Logger(ChannelResourceFactory.class); private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10); - private static final String DRUID_PROXY_HANDLER = "druid_proxyHandler"; + private static final String PROXY_HANDLER_NAME = "druid-proxy"; + private static final String ERROR_HANDLER_NAME = "druid-connection-error"; private final ClientBootstrap bootstrap; private final SSLContext sslContext; @@ -128,7 +131,7 @@ public void operationComplete(ChannelFuture f1) if (f1.isSuccess()) { final Channel channel = f1.getChannel(); channel.getPipeline().addLast( - DRUID_PROXY_HANDLER, + PROXY_HANDLER_NAME, new SimpleChannelUpstreamHandler() { @Override @@ -137,7 +140,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) Object msg = e.getMessage(); final ChannelPipeline pipeline = ctx.getPipeline(); - pipeline.remove(DRUID_PROXY_HANDLER); + pipeline.remove(PROXY_HANDLER_NAME); if (msg instanceof HttpResponse) { HttpResponse httpResponse = (HttpResponse) msg; @@ -217,27 +220,7 @@ public void operationComplete(ChannelFuture f2) sslHandler.setCloseOnSSLException(true); final ChannelFuture handshakeFuture = Channels.future(connectFuture.getChannel()); - connectFuture.getChannel().getPipeline().addLast( - "connectionErrorHandler", new SimpleChannelUpstreamHandler() - { - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - { - final Channel channel = ctx.getChannel(); - if (channel == null) { - // For the case where this pipeline is not attached yet. - handshakeFuture.setFailure(new ChannelException( - StringUtils.format("Channel is null. The context name is [%s]", ctx.getName()) - )); - return; - } - handshakeFuture.setFailure(e.getCause()); - if (channel.isOpen()) { - channel.close(); - } - } - } - ); + connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new ConnectionErrorHandler(handshakeFuture)); connectFuture.addListener( new ChannelFutureListener() { @@ -280,6 +263,7 @@ public void operationComplete(ChannelFuture f2) retVal = handshakeFuture; } else { + connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new ConnectionErrorHandler(null)); retVal = connectFuture; } @@ -308,4 +292,59 @@ public void close(ChannelFuture resource) log.trace("Closing"); resource.awaitUninterruptibly().getChannel().close(); } + + /** + * Handler that captures errors that occur while connecting. Typically superseded by other handlers after + * a connection happens, in {@link org.apache.druid.java.util.http.client.NettyHttpClient}. + * + * It's important to have this for all channels, even if {@link #future} is null, because otherwise exceptions + * that occur during connection land at {@link org.jboss.netty.handler.codec.http.HttpContentDecompressor} (the last + * handler from {@link org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory}) and are dropped on + * the floor along with a scary-looking warning like "EXCEPTION, please implement + * org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() for proper handling." + */ + private static class ConnectionErrorHandler extends SimpleChannelUpstreamHandler + { + @Nullable + private final ChannelFuture future; + + /** + * Constructor. + * + * @param future future to attach errors to + */ + public ConnectionErrorHandler(@Nullable ChannelFuture future) + { + this.future = future; + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) + { + final Channel channel = ctx.getChannel(); + if (channel == null) { + // For the case where this pipeline is not attached yet. + if (future != null && !future.isDone()) { + final ChannelException e2 = + new ChannelException(StringUtils.format("Channel is null. The context name is [%s]", ctx.getName())); + e2.addSuppressed(e.getCause()); + future.setFailure(e2); + } + return; + } + + if (future != null && !future.isDone()) { + future.setFailure(e.getCause()); + } + + // Close the channel if this is the last handler. Otherwise, we expect that NettyHttpClient would have added + // additional handlers to take care of the errors. + //noinspection ObjectEquality + if (channel.isOpen() && this == ctx.getPipeline().getLast()) { + channel.close(); + } + + ctx.sendUpstream(e); + } + } } diff --git a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java index 3d12cf7a1fc2..ec54bd13500a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java @@ -296,6 +296,78 @@ public void testHttpsConnectionClosingServer() throws Throwable } } + @Test + public void testHttpConnectionRefused() throws Throwable + { + final Lifecycle lifecycle = new Lifecycle(); + try { + final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); + final HttpClient client = HttpClientInit.createClient(config, lifecycle); + + // Need to select a port that isn't being listened to. This approach finds an unused port in a racey way. + // Hopefully it works most of the time. + final ServerSocket sock = new ServerSocket(0); + final int port = sock.getLocalPort(); + sock.close(); + + final ListenableFuture response = client + .go( + new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", port))), + StatusResponseHandler.getInstance() + ); + + Throwable e = null; + try { + response.get(); + } + catch (ExecutionException e1) { + e = e1.getCause(); + e1.printStackTrace(); + } + + Assert.assertTrue("ChannelException thrown by 'get'", isChannelClosedException(e)); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testHttpsConnectionRefused() throws Throwable + { + final Lifecycle lifecycle = new Lifecycle(); + try { + final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); + final HttpClient client = HttpClientInit.createClient(config, lifecycle); + + // Need to select a port that isn't being listened to. This approach finds an unused port in a racey way. + // Hopefully it works most of the time. + final ServerSocket sock = new ServerSocket(0); + final int port = sock.getLocalPort(); + sock.close(); + + final ListenableFuture response = client + .go( + new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", port))), + StatusResponseHandler.getInstance() + ); + + Throwable e = null; + try { + response.get(); + } + catch (ExecutionException e1) { + e = e1.getCause(); + e1.printStackTrace(); + } + + Assert.assertTrue("ChannelException thrown by 'get'", isChannelClosedException(e)); + } + finally { + lifecycle.stop(); + } + } + public boolean isChannelClosedException(Throwable e) { return e instanceof ChannelException ||