Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WARN Logging on Slow Transport Message Handling #62444

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSettings.CONNECTIONS_PER_NODE_PING,
TransportSettings.TRACE_LOG_EXCLUDE_SETTING,
TransportSettings.TRACE_LOG_INCLUDE_SETTING,
TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING,
NetworkService.NETWORK_SERVER,
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING,
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/transport/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,10 @@ void finishParsingHeader(StreamInput input) throws IOException {
this.actionName = RESPONSE_NAME;
}
}

@Override
public String toString() {
return "Header{" + networkMessageSize + "}{" + version + "}{" + requestId + "}{" + isRequest() + "}{" + isError() + "}{"
+ isHandshake() + "}{" + isCompressed() + "}{" + actionName + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -51,6 +52,8 @@ public class InboundHandler {

private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

private volatile long slowLogThresholdMs = Long.MAX_VALUE;

InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, NamedWriteableRegistry namedWriteableRegistry,
TransportHandshaker handshaker, TransportKeepAlive keepAlive, Transport.RequestHandlers requestHandlers,
Transport.ResponseHandlers responseHandlers) {
Expand All @@ -71,6 +74,10 @@ void setMessageListener(TransportMessageListener listener) {
}
}

void setSlowLogThreshold(TimeValue slowLogThreshold) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly the chain of setters here is a little hacky but:

  1. Doing it this way made this a much much smaller change than passing the cluster settings directly to the InboundHandler
  2. I'd do a follow-up of this logic for the outbound path and that would require handling the threshold setting in the TransportService anyway.

this.slowLogThresholdMs = slowLogThreshold.getMillis();
}

void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception {
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
TransportLogger.logInboundMessage(channel, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the timing up to this level, since the logging on this line may also be a source of slowness?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Expand All @@ -90,6 +97,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message) throws
final Header header = message.getHeader();
assert header.needsToReadVariableHeader() == false;

final long startTime = threadPool.relativeTimeInMillis();
ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
// Place the context with the headers from the message
Expand Down Expand Up @@ -138,6 +146,12 @@ private void messageReceived(TcpChannel channel, InboundMessage message) throws
}
}
}
} finally {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
logger.warn("Slow handling of transport message [{}] took [{}ms]", message, took);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be useful to include the phrase warn threshold as this is a useful search term for the logs when investigating general slowness/instability problems:

Suggested change
logger.warn("Slow handling of transport message [{}] took [{}ms]", message, took);
logger.warn("handling inbound transport message [{}] took [{}ms] which is above the warn threshold of [{}ms]", message, took, logThreshold);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,9 @@ public void close() {
IOUtils.closeWhileHandlingException(streamInput);
Releasables.closeWhileHandlingException(content, breakerRelease);
}

@Override
public String toString() {
return "InboundMessage{" + header + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public synchronized void setMessageListener(TransportMessageListener listener) {
inboundHandler.setMessageListener(listener);
}

@Override
public void setSlowLogThreshold(TimeValue slowLogThreshold) {
inboundHandler.setSlowLogThreshold(slowLogThreshold);
}

public final class NodeChannels extends CloseableConnection {
private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping;
private final List<TcpChannel> channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
Expand All @@ -50,6 +51,9 @@ default <Request extends TransportRequest> void registerRequestHandler(RequestHa

void setMessageListener(TransportMessageListener listener);

default void setSlowLogThreshold(TimeValue slowLogThreshold) {
}

default boolean isSecure() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders, ConnectionManager connectionManager) {
this.transport = transport;
transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
this.connectionManager = connectionManager;
Expand All @@ -173,6 +174,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
if (remoteClusterClient) {
remoteClusterService.listenForUpdates(clusterSettings);
}
clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold);
}
registerRequestHandler(
HANDSHAKE_ACTION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public final class TransportSettings {
Arrays.asList("internal:coordination/fault_detection/*"),
Function.identity(), Setting.Property.Dynamic, Setting.Property.NodeScope);

// Time that processing an inbound message on a transport thread may take at the most before a warning is logged
public static final Setting<TimeValue> SLOW_OPERATION_THRESHOLD_SETTING =
Setting.positiveTimeSetting("transport.slow_operation_logging_threshold", TimeValue.timeValueMillis(300),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

300ms since our default resolution on the timer thread is 200ms. This is already plenty IMO (3 requests per second isn't great on a transport thread) but not so low that any cgroup throttling or other system slowness instantly results in massive log spam.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit concerned about log spam -- we aren't aware of anything that blocks the transport threads for so long today but that might just be because we don't surface it yet, and some configs might be hitting this a lot. #processors threads times 3 messages per second is too much for my taste. WDYT about a simple rate limit to avoid logging more than one of these per ten seconds or something? That'd be enough to point us in the right direction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be kind of nice to be able to see bursts here (thinking about spotting something like CPU throttling)? (if you have a time period where there's a bunch of logging across multiple threads for all kinds of messages without a clear pattern or so). I guess we could use something like the log4j burst filter here to capture that kind of thing and still rate limit but what this:
We could just do something like a hard 5s timeout above which we always WARN and make this a DEBUG at 300ms?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be happy with 5s -- I think this would surface problems bad enough to drop nodes from the cluster without risking too much junk.

No need for a separate debug logging threshold IMO -- given that it would need user involvement to see more events in the logs, they may as well just reduce transport.slow_operation_logging_threshold.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point -> pushed the change to 5s :)

Setting.Property.Dynamic, Setting.Property.NodeScope);


private TransportSettings() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -240,6 +241,43 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws
}
}

public void testLogsSlowInboundProcessing() throws Exception {
final MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"expected message",
InboundHandler.class.getCanonicalName(),
Level.WARN,
"Slow handling of transport message "));
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
Loggers.addAppender(inboundHandlerLogger, mockAppender);

handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L));
try {
final Version remoteVersion = Version.CURRENT;
final long requestId = randomNonNegativeLong();
final Header requestHeader = new Header(between(0, 100), requestId,
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
final InboundMessage requestMessage =
new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
});
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
handler.inboundMessage(channel, requestMessage);
assertNotNull(channel.getMessageCaptor().get());
mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(inboundHandlerLogger, mockAppender);
mockAppender.stop();
}
}

private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) {
return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { }) {
@Override
Expand Down