diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 5da66d1134bdd..35b0dcb481424 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -355,6 +355,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportSettings.CONNECTIONS_PER_NODE_PING, TransportSettings.TRACE_LOG_EXCLUDE_SETTING, TransportSettings.TRACE_LOG_INCLUDE_SETTING, + TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, NetworkService.NETWORK_SERVER, NetworkService.GLOBAL_NETWORK_HOST_SETTING, NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING, diff --git a/server/src/main/java/org/elasticsearch/transport/Header.java b/server/src/main/java/org/elasticsearch/transport/Header.java index 5b6dd2aaecbd4..23e664212c38d 100644 --- a/server/src/main/java/org/elasticsearch/transport/Header.java +++ b/server/src/main/java/org/elasticsearch/transport/Header.java @@ -108,4 +108,10 @@ void finishParsingHeader(StreamInput input) throws IOException { this.actionName = RESPONSE_NAME; } } + + @Override + public String toString() { + return "Header{" + networkMessageSize + "}{" + version + "}{" + requestId + "}{" + isRequest() + "}{" + isError() + "}{" + + isHandshake() + "}{" + isCompressed() + "}{" + actionName + "}"; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 047de1557552c..8e446848b62a5 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; @@ -51,6 +52,8 @@ public class InboundHandler { private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER; + private volatile long slowLogThresholdMs = Long.MAX_VALUE; + InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, NamedWriteableRegistry namedWriteableRegistry, TransportHandshaker handshaker, TransportKeepAlive keepAlive, Transport.RequestHandlers requestHandlers, Transport.ResponseHandlers responseHandlers) { @@ -71,21 +74,26 @@ void setMessageListener(TransportMessageListener listener) { } } + void setSlowLogThreshold(TimeValue slowLogThreshold) { + this.slowLogThresholdMs = slowLogThreshold.getMillis(); + } + void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception { - channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + final long startTime = threadPool.relativeTimeInMillis(); + channel.getChannelStats().markAccessed(startTime); TransportLogger.logInboundMessage(channel, message); if (message.isPing()) { keepAlive.receiveKeepAlive(channel); } else { - messageReceived(channel, message); + messageReceived(channel, message, startTime); } } // Empty stream constant to avoid instantiating a new stream for empty messages. private static final StreamInput EMPTY_STREAM_INPUT = new ByteBufferStreamInput(ByteBuffer.wrap(BytesRef.EMPTY_BYTES)); - private void messageReceived(TcpChannel channel, InboundMessage message) throws IOException { + private void messageReceived(TcpChannel channel, InboundMessage message, long startTime) throws IOException { final InetSocketAddress remoteAddress = channel.getRemoteAddress(); final Header header = message.getHeader(); assert header.needsToReadVariableHeader() == false; @@ -138,6 +146,13 @@ private void messageReceived(TcpChannel channel, InboundMessage message) throws } } } + } finally { + final long took = threadPool.relativeTimeInMillis() - startTime; + final long logThreshold = slowLogThresholdMs; + if (logThreshold > 0 && took > logThreshold) { + logger.warn("handling inbound transport message [{}] took [{}ms] which is above the warn threshold of [{}ms]", + message, took, logThreshold); + } } } diff --git a/server/src/main/java/org/elasticsearch/transport/InboundMessage.java b/server/src/main/java/org/elasticsearch/transport/InboundMessage.java index 0d2e46578cdd7..b39d755bbc083 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundMessage.java @@ -105,4 +105,9 @@ public void close() { IOUtils.closeWhileHandlingException(streamInput); Releasables.closeWhileHandlingException(content, breakerRelease); } + + @Override + public String toString() { + return "InboundMessage{" + header + "}"; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 40289374fef39..7b268ea768bb0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -183,6 +183,11 @@ public synchronized void setMessageListener(TransportMessageListener listener) { inboundHandler.setMessageListener(listener); } + @Override + public void setSlowLogThreshold(TimeValue slowLogThreshold) { + inboundHandler.setSlowLogThreshold(slowLogThreshold); + } + public final class NodeChannels extends CloseableConnection { private final Map typeMapping; private final List channels; diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index c131406b2b473..8a4f286ec420b 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; @@ -50,6 +51,9 @@ default void registerRequestHandler(RequestHa void setMessageListener(TransportMessageListener listener); + default void setSlowLogThreshold(TimeValue slowLogThreshold) { + } + default boolean isSecure() { return false; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index efc88e6d27e34..1e60239c2e6e4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -154,6 +154,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders, ConnectionManager connectionManager) { this.transport = transport; + transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; this.connectionManager = connectionManager; @@ -173,6 +174,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa if (remoteClusterClient) { remoteClusterService.listenForUpdates(clusterSettings); } + clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold); } registerRequestHandler( HANDSHAKE_ACTION_NAME, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportSettings.java b/server/src/main/java/org/elasticsearch/transport/TransportSettings.java index 7643262e6fbd7..45ddd41b307e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportSettings.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportSettings.java @@ -132,6 +132,12 @@ public final class TransportSettings { Arrays.asList("internal:coordination/fault_detection/*"), Function.identity(), Setting.Property.Dynamic, Setting.Property.NodeScope); + // Time that processing an inbound message on a transport thread may take at the most before a warning is logged + public static final Setting SLOW_OPERATION_THRESHOLD_SETTING = + Setting.positiveTimeSetting("transport.slow_operation_logging_threshold", TimeValue.timeValueSeconds(5), + Setting.Property.Dynamic, Setting.Property.NodeScope); + + private TransportSettings() { } } diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index 71a464efd8d37..1a8c29ee173af 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; @@ -240,6 +241,43 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws } } + public void testLogsSlowInboundProcessing() throws Exception { + final MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "expected message", + InboundHandler.class.getCanonicalName(), + Level.WARN, + "handling inbound transport message ")); + final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class); + Loggers.addAppender(inboundHandlerLogger, mockAppender); + + handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L)); + try { + final Version remoteVersion = Version.CURRENT; + final long requestId = randomNonNegativeLong(); + final Header requestHeader = new Header(between(0, 100), requestId, + TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion); + final InboundMessage requestMessage = + new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }); + requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME; + requestHeader.headers = Tuple.tuple(Map.of(), Map.of()); + handler.inboundMessage(channel, requestMessage); + assertNotNull(channel.getMessageCaptor().get()); + mockAppender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(inboundHandlerLogger, mockAppender); + mockAppender.stop(); + } + } + private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) { return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { }) { @Override