From dba1c264b1cf8c07b466b19e405e6d9ceb564db8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 19 Dec 2018 15:57:31 -0700 Subject: [PATCH 1/5] Only compress responses if request was compressed This is a follow-up to some discussions around #36399. Currently we have relatively confusing compression behavior where compression can be configured for requests based on `transport.compress` or a specific setting for a remote cluster. However, we can only compress responses based on `transport.compress` as we do not know where a request is coming from (currently). This commit modifies the behavior to NEVER compress responses based on settings. Instead, a response will only be compressed if the request was compressed. This commit also updates the documentation to more clearly described transport level compression. --- docs/reference/modules/transport.asciidoc | 14 ++++++++++++++ .../org/elasticsearch/transport/TcpTransport.java | 7 ++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index bc9a4fb544d67..547cf8f0a901e 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -102,6 +102,20 @@ and ensuring that the keepalive interval is shorter than any timeout that might cause idle connections to be closed, or by setting `transport.ping_schedule` if keepalives cannot be configured. +[float] +==== Transport Compression + +By default Elasticsearch network-level compression is disabled. This default +normally makes sense for local cluster communication as compression has a +noticeable CPU cost and local cluster nodes tend to have fast network connections +within a single data center. If compression is desired for remote cluster +connections only, this can be configured on a per-cluster basis using the +`cluster.remote.${cluster_alias}.transport.compress` setting. + +When compression is enabled using either `transport.compress` or for a specific +remote cluster, only the outbound requests are compressed. Elasticsearch only +compresses responses if and only-if the inbound request received was compressed. + [float] === Transport Tracer diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 8edf97c929550..a10bad9e88b69 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -141,7 +141,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); - private final boolean compressAllResponses; private volatile BoundTransportAddress boundAddress; private final String transportName; @@ -166,7 +165,6 @@ public TcpTransport(String transportName, Settings settings, Version version, T this.pageCacheRecycler = pageCacheRecycler; this.circuitBreakerService = circuitBreakerService; this.namedWriteableRegistry = namedWriteableRegistry; - this.compressAllResponses = TransportSettings.TRANSPORT_COMPRESS.get(settings); this.networkService = networkService; this.transportName = transportName; this.transportLogger = new TransportLogger(); @@ -826,14 +824,13 @@ private void sendResponse( final String action, boolean compress, byte status) throws IOException { - boolean compressMessage = compress || compressAllResponses; status = TransportStatus.setResponse(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage); + CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compress); boolean addedReleaseListener = false; try { - if (compressMessage) { + if (compress) { status = TransportStatus.setCompress(status); } threadPool.getThreadContext().writeTo(stream); From 02e424390f8b9fc4da9de0b495f279ec521e0fae Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 19 Dec 2018 16:12:13 -0700 Subject: [PATCH 2/5] cleanup --- docs/reference/modules/transport.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 547cf8f0a901e..d53b258e07ca8 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -107,8 +107,8 @@ keepalives cannot be configured. By default Elasticsearch network-level compression is disabled. This default normally makes sense for local cluster communication as compression has a -noticeable CPU cost and local cluster nodes tend to have fast network connections -within a single data center. If compression is desired for remote cluster +noticeable CPU cost and local clusters tend to be setup with fast network +connections between nodes. If compression is necessary for remote cluster connections only, this can be configured on a per-cluster basis using the `cluster.remote.${cluster_alias}.transport.compress` setting. From 87dfb3bb2ef39cf72c1da2197df6a3a139e0a681 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 09:59:23 -0700 Subject: [PATCH 3/5] Add test --- docs/reference/modules/transport.asciidoc | 4 +-- .../transport/TcpTransportTests.java | 36 ++++++++++++------- .../transport/FakeTcpChannel.java | 18 ++++++++-- 3 files changed, 42 insertions(+), 16 deletions(-) diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index d53b258e07ca8..64d594a07b03a 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -113,8 +113,8 @@ connections only, this can be configured on a per-cluster basis using the `cluster.remote.${cluster_alias}.transport.compress` setting. When compression is enabled using either `transport.compress` or for a specific -remote cluster, only the outbound requests are compressed. Elasticsearch only -compresses responses if and only-if the inbound request received was compressed. +remote cluster, only the outbound requests are compressed. Elasticsearch compresses +responses if and only-if the inbound request received was compressed. [float] === Transport Tracer diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 7cf8ed67d6335..dac20f9427c5c 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -41,13 +42,14 @@ import java.io.IOException; import java.io.StreamCorruptedException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.mockito.Mockito.mock; /** Unit tests for {@link TcpTransport} */ public class TcpTransportTests extends ESTestCase { @@ -184,11 +186,11 @@ public void testEnsureVersionCompatibility() { + version.minimumCompatibilityVersion() + "]", ise.getMessage()); } - public void testCompressRequest() throws IOException { + public void testCompressRequestAndResponse() throws IOException { final boolean compressed = randomBoolean(); Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100)); ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName()); - AtomicReference messageCaptor = new AtomicReference<>(); + AtomicReference requestCaptor = new AtomicReference<>(); try { TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) { @@ -200,7 +202,7 @@ protected FakeServerChannel bind(String name, InetSocketAddress address) throws @Override protected FakeTcpChannel initiateChannel(DiscoveryNode node) throws IOException { - return new FakeTcpChannel(true, messageCaptor); + return new FakeTcpChannel(false, requestCaptor); } @Override @@ -215,7 +217,7 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, int numConnections = profile.getNumConnections(); ArrayList fakeChannels = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; ++i) { - fakeChannels.add(new FakeTcpChannel(false, messageCaptor)); + fakeChannels.add(new FakeTcpChannel(false, requestCaptor)); } listener.onResponse(new NodeChannels(node, fakeChannels, profile, Version.CURRENT)); return () -> CloseableChannel.closeChannels(fakeChannels, false); @@ -233,11 +235,20 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, transport.openConnection(node, profileBuilder.build(), future); Transport.Connection connection = future.actionGet(); connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY); + transport.registerRequestHandler(new RequestHandlerRegistry<>("foobar", Req::new, mock(TaskManager.class), + (request1, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE), ThreadPool.Names.SAME, + true, true)); - BytesReference reference = messageCaptor.get(); + BytesReference reference = requestCaptor.get(); assertNotNull(reference); - StreamInput streamIn = reference.streamInput(); + AtomicReference responseCaptor = new AtomicReference<>(); + InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 0); + FakeTcpChannel responseChannel = new FakeTcpChannel(true, address, address, responseCaptor); + transport.messageReceived(reference.slice(6, reference.length() - 6), responseChannel); + + + StreamInput streamIn = responseCaptor.get().streamInput(); streamIn.skip(TcpHeader.MARKER_BYTES_SIZE); @SuppressWarnings("unused") int len = streamIn.readInt(); @@ -247,17 +258,14 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, Version version = Version.fromId(streamIn.readInt()); assertEquals(Version.CURRENT, version); assertEquals(compressed, TransportStatus.isCompress(status)); + assertFalse(TransportStatus.isRequest(status)); if (compressed) { final int bytesConsumed = TcpHeader.HEADER_SIZE; streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed)) .streamInput(streamIn); } threadPool.getThreadContext().readHeaders(streamIn); - assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features - assertEquals("foobar", streamIn.readString()); - Req readReq = new Req(""); - readReq.readFrom(streamIn); - assertEquals(request.value, readReq.value); + TransportResponse.Empty.INSTANCE.readFrom(streamIn); } finally { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); @@ -297,6 +305,10 @@ private Req(String value) { this.value = value; } + private Req(StreamInput in) throws IOException { + value = in.readString(); + } + @Override public void readFrom(StreamInput in) throws IOException { value = in.readString(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index cd598a6ca3106..63cacfbb093a8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -28,6 +28,8 @@ public class FakeTcpChannel implements TcpChannel { private final boolean isServer; + private final InetSocketAddress localAddress; + private final InetSocketAddress remoteAddress; private final String profile; private final AtomicReference messageCaptor; private final ChannelStats stats = new ChannelStats(); @@ -45,9 +47,21 @@ public FakeTcpChannel(boolean isServer, AtomicReference messageC this(isServer, "profile", messageCaptor); } + public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress, + AtomicReference messageCaptor) { + this(isServer, localAddress, remoteAddress,"profile", messageCaptor); + } + public FakeTcpChannel(boolean isServer, String profile, AtomicReference messageCaptor) { + this(isServer, null, null, profile, messageCaptor); + } + + public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress, String profile, + AtomicReference messageCaptor) { this.isServer = isServer; + this.localAddress = localAddress; + this.remoteAddress = remoteAddress; this.profile = profile; this.messageCaptor = messageCaptor; } @@ -64,12 +78,12 @@ public String getProfile() { @Override public InetSocketAddress getLocalAddress() { - return null; + return localAddress; } @Override public InetSocketAddress getRemoteAddress() { - return null; + return remoteAddress; } @Override From 9f10684bd380729c381050c992f98b3b82c12ca5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 10:22:29 -0700 Subject: [PATCH 4/5] Add suppress --- .../java/org/elasticsearch/transport/TcpTransportTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index dac20f9427c5c..f3294366b8fe1 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -186,6 +187,7 @@ public void testEnsureVersionCompatibility() { + version.minimumCompatibilityVersion() + "]", ise.getMessage()); } + @SuppressForbidden(reason = "Allow accessing localhost") public void testCompressRequestAndResponse() throws IOException { final boolean compressed = randomBoolean(); Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100)); From fd291d463c431e292cda6efb60bd3891373ee7ff Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 17:42:43 -0700 Subject: [PATCH 5/5] Changes --- docs/reference/modules/transport.asciidoc | 32 ++++++++++++++++------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 64d594a07b03a..a80f4a3b11b57 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -105,16 +105,30 @@ keepalives cannot be configured. [float] ==== Transport Compression -By default Elasticsearch network-level compression is disabled. This default +[float] +===== Request Compresssion + +By default, the `transport.compress` setting is `false` and network-level +request compression is disabled between nodes in the cluster. This default normally makes sense for local cluster communication as compression has a -noticeable CPU cost and local clusters tend to be setup with fast network -connections between nodes. If compression is necessary for remote cluster -connections only, this can be configured on a per-cluster basis using the -`cluster.remote.${cluster_alias}.transport.compress` setting. - -When compression is enabled using either `transport.compress` or for a specific -remote cluster, only the outbound requests are compressed. Elasticsearch compresses -responses if and only-if the inbound request received was compressed. +noticeable CPU cost and local clusters tend to be set up with fast network +connections between nodes. + +The `transport.compress` setting always configures local cluster request +compression and is the fallback setting for remote cluster request compression. +If you want to configure remote request compression differently than local +request compression, you can set it on a per-remote cluster basis using the +<>. + + +[float] +===== Response Compression + +The compression settings do not configure compression for responses. {es} will +compress a response if the inbound request was compressed--even when compression +is not enabled. Similarly, {es} will not compress a response if the inbound +request was uncompressed--even when compression is enabled. + [float] === Transport Tracer