Skip to content

Commit

Permalink
Log Slowness on Sending Transport Messages (elastic#67664)
Browse files Browse the repository at this point in the history
Similar to elastic#62444 but for the outbound path.

This does not detect slowness in individual transport handler logic,
this is done via the inbound handler logging already, but instead
warns if it takes a long time to hand off the message to the relevant
transport thread and then transfer the message over the wire.
This gives some visibility into the stability of the network
connection itself and into the reasons for slow network
responses (if they are the result of slow networking on the sender).
  • Loading branch information
original-brownbear committed Jan 19, 2021
1 parent 1f2d027 commit ae0be75
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public String toString() {
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
", profile=" + profile +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public String toString() {
return "TcpNioSocketChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + getRemoteAddress() +
", profile=" + profile +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
Expand All @@ -52,6 +53,9 @@ final class OutboundHandler {
private final StatsTracker statsTracker;
private final ThreadPool threadPool;
private final BigArrays bigArrays;

private volatile long slowLogThresholdMs = Long.MAX_VALUE;

private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

OutboundHandler(String nodeName, Version version, String[] features, StatsTracker statsTracker, ThreadPool threadPool,
Expand All @@ -64,6 +68,10 @@ final class OutboundHandler {
this.bigArrays = bigArrays;
}

void setSlowLogThreshold(TimeValue slowLogThreshold) {
this.slowLogThresholdMs = slowLogThreshold.getMillis();
}

void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
SendContext sendContext = new SendContext(channel, () -> bytes, listener);
try {
Expand Down Expand Up @@ -130,6 +138,7 @@ private void internalSend(TcpChannel channel, SendContext sendContext) throws IO
BytesReference reference = sendContext.get();
// stash thread context so that channel event loop is not polluted by thread context
try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
sendContext.startTime = threadPool.relativeTimeInMillis();
channel.sendMessage(reference, sendContext);
} catch (RuntimeException ex) {
sendContext.onFailure(ex);
Expand Down Expand Up @@ -167,6 +176,11 @@ public BytesReference get() throws IOException {
public void close() {
IOUtils.closeWhileHandlingException(bytesStreamOutput);
}

@Override
public String toString() {
return "MessageSerializer{" + message + "}";
}
}

private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
Expand All @@ -176,6 +190,7 @@ private class SendContext extends NotifyOnceListener<Void> implements CheckedSup
private final ActionListener<Void> listener;
private final Releasable optionalReleasable;
private long messageSize = -1;
private long startTime;

private SendContext(TcpChannel channel, CheckedSupplier<BytesReference, IOException> messageSupplier,
ActionListener<Void> listener) {
Expand Down Expand Up @@ -203,6 +218,15 @@ public BytesReference get() throws IOException {
}
}

private void maybeLogSlowMessage() {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
logger.warn("sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn threshold of [{}ms]",
messageSupplier, messageSize, channel, took, logThreshold);
}
}

@Override
protected void innerOnResponse(Void v) {
assert messageSize != -1 : "If onResponse is being called, the message should have been serialized";
Expand All @@ -221,7 +245,7 @@ protected void innerOnFailure(Exception e) {
}

private void closeAndCallback(Runnable runnable) {
Releasables.close(optionalReleasable, runnable::run);
Releasables.close(optionalReleasable, runnable::run, this::maybeLogSlowMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

abstract class OutboundMessage extends NetworkMessage {

private final Writeable message;
protected final Writeable message;

OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Writeable message) {
super(threadContext, version, status, requestId);
Expand Down Expand Up @@ -133,6 +133,12 @@ private static byte setStatus(boolean compress, boolean isHandshake, Writeable m

return status;
}


@Override
public String toString() {
return "Request{" + action + "}{" + requestId + "}{" + isError() + "}{" + isCompress() + "}{" + isHandshake() + "}";
}
}

static class Response extends OutboundMessage {
Expand Down Expand Up @@ -166,6 +172,12 @@ private static byte setStatus(boolean compress, boolean isHandshake, Writeable m

return status;
}

@Override
public String toString() {
return "Response{" + requestId + "}{" + isError() + "}{" + isCompress() + "}{" + isHandshake() + "}{"
+ message.getClass() + "}";
}
}

private static boolean canCompress(Writeable message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public synchronized void setMessageListener(TransportMessageListener listener) {
@Override
public void setSlowLogThreshold(TimeValue slowLogThreshold) {
inboundHandler.setSlowLogThreshold(slowLogThreshold);
outboundHandler.setSlowLogThreshold(slowLogThreshold);
}

public final class NodeChannels extends CloseableConnection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.elasticsearch.transport;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
Expand All @@ -30,13 +34,15 @@
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -299,4 +305,39 @@ public void onResponseSent(long requestId, String action, Exception error) {

assertEquals("header_value", header.getHeaders().v1().get("header"));
}

public void testSlowLogOutboundMessage() throws Exception {
final MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"expected message",
OutboundHandler.class.getCanonicalName(),
Level.WARN,
"sending transport message "));
final Logger outboundHandlerLogger = LogManager.getLogger(OutboundHandler.class);
Loggers.addAppender(outboundHandlerLogger, mockAppender);
handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L));

try {
final int length = randomIntBetween(1, 100);
final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
handler.sendBytes(new FakeTcpChannel() {
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
try {
TimeUnit.SECONDS.sleep(1L);
listener.onResponse(null);
} catch (InterruptedException e) {
listener.onFailure(e);
}
}
}, new BytesArray(randomByteArrayOfLength(length)), f);
f.get();
mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(outboundHandlerLogger, mockAppender);
mockAppender.stop();
}
}
}

0 comments on commit ae0be75

Please sign in to comment.