Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only compress responses if request was compressed #36867

Merged
merged 7 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/reference/modules/transport.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@ and ensuring that the keepalive interval is shorter than any timeout that might
cause idle connections to be closed, or by setting `transport.ping_schedule` if
keepalives cannot be configured.

[float]
==== Transport Compression

[float]
===== Request Compresssion

By default, the `transport.compress` setting is `false` and network-level
request compression is disabled between nodes in the cluster. This default
normally makes sense for local cluster communication as compression has a
noticeable CPU cost and local clusters tend to be set up with fast network
connections between nodes.

The `transport.compress` setting always configures local cluster request
compression and is the fallback setting for remote cluster request compression.
If you want to configure remote request compression differently than local
request compression, you can set it on a per-remote cluster basis using the
<<remote-cluster-settings,`cluster.remote.${cluster_alias}.transport.compress` setting>>.


[float]
===== Response Compression

The compression settings do not configure compression for responses. {es} will
compress a response if the inbound request was compressed--even when compression
is not enabled. Similarly, {es} will not compress a response if the inbound
request was uncompressed--even when compression is enabled.


[float]
=== Transport Tracer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final boolean compressAllResponses;
private volatile BoundTransportAddress boundAddress;
private final String transportName;

Expand All @@ -166,7 +165,6 @@ public TcpTransport(String transportName, Settings settings, Version version, T
this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.compressAllResponses = TransportSettings.TRANSPORT_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
Expand Down Expand Up @@ -826,14 +824,13 @@ private void sendResponse(
final String action,
boolean compress,
byte status) throws IOException {
boolean compressMessage = compress || compressAllResponses;

status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compress);
boolean addedReleaseListener = false;
try {
if (compressMessage) {
if (compress) {
status = TransportStatus.setCompress(status);
}
threadPool.getThreadContext().writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -34,20 +35,22 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.mock;

/** Unit tests for {@link TcpTransport} */
public class TcpTransportTests extends ESTestCase {
Expand Down Expand Up @@ -184,11 +187,12 @@ public void testEnsureVersionCompatibility() {
+ version.minimumCompatibilityVersion() + "]", ise.getMessage());
}

public void testCompressRequest() throws IOException {
@SuppressForbidden(reason = "Allow accessing localhost")
public void testCompressRequestAndResponse() throws IOException {
final boolean compressed = randomBoolean();
Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100));
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
AtomicReference<BytesReference> requestCaptor = new AtomicReference<>();
try {
TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) {
Expand All @@ -200,7 +204,7 @@ protected FakeServerChannel bind(String name, InetSocketAddress address) throws

@Override
protected FakeTcpChannel initiateChannel(DiscoveryNode node) throws IOException {
return new FakeTcpChannel(true, messageCaptor);
return new FakeTcpChannel(false, requestCaptor);
}

@Override
Expand All @@ -215,7 +219,7 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
int numConnections = profile.getNumConnections();
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
fakeChannels.add(new FakeTcpChannel(false, messageCaptor));
fakeChannels.add(new FakeTcpChannel(false, requestCaptor));
}
listener.onResponse(new NodeChannels(node, fakeChannels, profile, Version.CURRENT));
return () -> CloseableChannel.closeChannels(fakeChannels, false);
Expand All @@ -233,11 +237,20 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
transport.openConnection(node, profileBuilder.build(), future);
Transport.Connection connection = future.actionGet();
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
transport.registerRequestHandler(new RequestHandlerRegistry<>("foobar", Req::new, mock(TaskManager.class),
(request1, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE), ThreadPool.Names.SAME,
true, true));

BytesReference reference = messageCaptor.get();
BytesReference reference = requestCaptor.get();
assertNotNull(reference);

StreamInput streamIn = reference.streamInput();
AtomicReference<BytesReference> responseCaptor = new AtomicReference<>();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
FakeTcpChannel responseChannel = new FakeTcpChannel(true, address, address, responseCaptor);
transport.messageReceived(reference.slice(6, reference.length() - 6), responseChannel);


StreamInput streamIn = responseCaptor.get().streamInput();
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
@SuppressWarnings("unused")
int len = streamIn.readInt();
Expand All @@ -247,17 +260,14 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
Version version = Version.fromId(streamIn.readInt());
assertEquals(Version.CURRENT, version);
assertEquals(compressed, TransportStatus.isCompress(status));
assertFalse(TransportStatus.isRequest(status));
if (compressed) {
final int bytesConsumed = TcpHeader.HEADER_SIZE;
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
.streamInput(streamIn);
}
threadPool.getThreadContext().readHeaders(streamIn);
assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features
assertEquals("foobar", streamIn.readString());
Req readReq = new Req("");
readReq.readFrom(streamIn);
assertEquals(request.value, readReq.value);
TransportResponse.Empty.INSTANCE.readFrom(streamIn);

} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -297,6 +307,10 @@ private Req(String value) {
this.value = value;
}

private Req(StreamInput in) throws IOException {
value = in.readString();
}

@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class FakeTcpChannel implements TcpChannel {

private final boolean isServer;
private final InetSocketAddress localAddress;
private final InetSocketAddress remoteAddress;
private final String profile;
private final AtomicReference<BytesReference> messageCaptor;
private final ChannelStats stats = new ChannelStats();
Expand All @@ -45,9 +47,21 @@ public FakeTcpChannel(boolean isServer, AtomicReference<BytesReference> messageC
this(isServer, "profile", messageCaptor);
}

public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress,
AtomicReference<BytesReference> messageCaptor) {
this(isServer, localAddress, remoteAddress,"profile", messageCaptor);
}


public FakeTcpChannel(boolean isServer, String profile, AtomicReference<BytesReference> messageCaptor) {
this(isServer, null, null, profile, messageCaptor);
}

public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress, String profile,
AtomicReference<BytesReference> messageCaptor) {
this.isServer = isServer;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
this.profile = profile;
this.messageCaptor = messageCaptor;
}
Expand All @@ -64,12 +78,12 @@ public String getProfile() {

@Override
public InetSocketAddress getLocalAddress() {
return null;
return localAddress;
}

@Override
public InetSocketAddress getRemoteAddress() {
return null;
return remoteAddress;
}

@Override
Expand Down