diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index 57407bd61fb82..c8889c86c1df7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -29,15 +29,11 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject { - public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], - new DiscoveryNode[0], Collections.emptyMap()); - private final ClusterSearchShardsGroup[] groups; private final DiscoveryNode[] nodes; private final Map indicesAndFilters; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 3f03c521df52a..30e030eca7376 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -22,10 +22,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -39,6 +41,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; @@ -50,6 +53,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -60,8 +64,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -195,17 +202,23 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); } else { - remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), - searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, - remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); - executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, - clusters); - }, listener::onFailure)); + AtomicInteger skippedClusters = new AtomicInteger(0); + collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, + remoteClusterIndices, remoteClusterService, threadPool, + ActionListener.wrap( + searchShardsResponses -> { + List remoteShardIterators = new ArrayList<>(); + Map remoteAliasFilters = new HashMap<>(); + BiFunction clusterNodeLookup = processRemoteShards( + searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int localClusters = localIndices == null ? 0 : 1; + int totalClusters = remoteClusterIndices.size() + localClusters; + int successfulClusters = searchShardsResponses.size() + localClusters; + executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, + new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); + }, + listener::onFailure)); } }, listener::onFailure); if (searchRequest.source() == null) { @@ -216,18 +229,56 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< } } - static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map remoteIndices, - Map searchShardsResponses) { - int localClusters = localIndices == null ? 0 : 1; - int totalClusters = remoteIndices.size() + localClusters; - int successfulClusters = localClusters; - for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { - if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) { - successfulClusters++; - } + static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters, + Map remoteIndicesByCluster, RemoteClusterService remoteClusterService, + ThreadPool threadPool, ActionListener> listener) { + final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); + final Map searchShardsResponses = new ConcurrentHashMap<>(); + final AtomicReference transportException = new AtomicReference<>(); + for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { + final String clusterAlias = entry.getKey(); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + final String[] indices = entry.getValue().indices(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) + .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); + clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener() { + @Override + public void onResponse(ClusterSearchShardsResponse response) { + searchShardsResponses.put(clusterAlias, response); + maybeFinish(); + } + + @Override + public void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + RemoteTransportException exception = + new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + if (transportException.compareAndSet(null, exception) == false) { + transportException.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } + maybeFinish(); + } + + private void maybeFinish() { + if (responsesCountDown.countDown()) { + RemoteTransportException exception = transportException.get(); + if (exception == null) { + listener.onResponse(searchShardsResponses); + } else { + listener.onFailure(transportException.get()); + } + } + } + } + ); } - int skippedClusters = totalClusters - successfulClusters; - return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); } static BiFunction processRemoteShards(Map searchShardsResponses, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 7ea55925262ff..d7e3de92e4028 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -25,9 +25,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -62,7 +59,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -172,6 +168,13 @@ void updateSkipUnavailable(boolean skipUnavailable) { this.skipUnavailable = skipUnavailable; } + /** + * Returns whether this cluster is configured to be skipped when unavailable + */ + boolean isSkipUnavailable() { + return skipUnavailable; + } + @Override public void onNodeDisconnected(DiscoveryNode node) { boolean remove = connectedNodes.remove(node); @@ -181,31 +184,11 @@ public void onNodeDisconnected(DiscoveryNode node) { } } - /** - * Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end. - */ - public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, - ActionListener listener) { - - final ActionListener searchShardsListener; - final Consumer onConnectFailure; - if (skipUnavailable) { - onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY); - searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY)); - } else { - onConnectFailure = listener::onFailure; - searchShardsListener = listener; - } - // in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on - // the skip_unavailable setting - ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure)); - } - /** * Ensures that this cluster is connected. If the cluster is connected this operation * will invoke the listener immediately. */ - public void ensureConnected(ActionListener voidActionListener) { + void ensureConnected(ActionListener voidActionListener) { if (connectedNodes.size() == 0) { connectHandler.connect(voidActionListener); } else { @@ -213,35 +196,6 @@ public void ensureConnected(ActionListener voidActionListener) { } } - private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, - final ActionListener listener) { - final DiscoveryNode node = getAnyConnectedNode(); - Transport.Connection connection = connectionManager.getConnection(node); - transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - - @Override - public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - return new ClusterSearchShardsResponse(in); - } - - @Override - public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { - listener.onResponse(clusterSearchShardsResponse); - } - - @Override - public void handleException(TransportException e) { - listener.onFailure(e); - } - - @Override - public String executor() { - return ThreadPool.Names.SEARCH; - } - }); - } - /** * Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function * that returns null if the node ID is not found. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index d9fcb01df4ce8..7d19b2eebcb1d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -24,8 +24,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; @@ -50,10 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -287,7 +283,7 @@ public Map groupIndices(IndicesOptions indicesOptions, String clusterAlias = entry.getKey(); List originalIndices = entry.getValue(); originalIndicesMap.put(clusterAlias, - new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions)); + new OriginalIndices(originalIndices.toArray(new String[0]), indicesOptions)); } } } else { @@ -311,55 +307,6 @@ public Set getRegisteredRemoteClusterNames() { return remoteClusters.keySet(); } - public void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, - Map remoteIndicesByCluster, - ActionListener> listener) { - final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); - final Map searchShardsResponses = new ConcurrentHashMap<>(); - final AtomicReference transportException = new AtomicReference<>(); - for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { - final String clusterName = entry.getKey(); - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterName); - } - final String[] indices = entry.getValue().indices(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) - .indicesOptions(indicesOptions).local(true).preference(preference) - .routing(routing); - remoteClusterConnection.fetchSearchShards(searchShardsRequest, - new ActionListener() { - @Override - public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { - searchShardsResponses.put(clusterName, clusterSearchShardsResponse); - if (responsesCountDown.countDown()) { - RemoteTransportException exception = transportException.get(); - if (exception == null) { - listener.onResponse(searchShardsResponses); - } else { - listener.onFailure(transportException.get()); - } - } - } - - @Override - public void onFailure(Exception e) { - RemoteTransportException exception = - new RemoteTransportException("error while communicating with remote cluster [" + clusterName + "]", e); - if (transportException.compareAndSet(null, exception) == false) { - exception = transportException.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - if (responsesCountDown.countDown()) { - listener.onFailure(exception); - } - } - }); - } - } - /** * Returns a connection to the given node on the given remote cluster * @throws IllegalArgumentException if the remote cluster is unknown @@ -376,6 +323,13 @@ void ensureConnected(String clusterAlias, ActionListener listener) { getRemoteClusterConnection(clusterAlias).ensureConnected(listener); } + /** + * Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable + */ + public boolean isSkipUnavailable(String clusterAlias) { + return getRemoteClusterConnection(clusterAlias).isSkipUnavailable(); + } + public Transport.Connection getConnection(String cluster) { return getRemoteClusterConnection(cluster).getConnection(); } @@ -399,7 +353,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) { clusterSettings.addAffixUpdateConsumer(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {}); } - synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { + private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); if (remote != null) { remote.updateSkipUnavailable(skipUnavailable); @@ -510,5 +464,4 @@ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) Collection getConnections() { return remoteClusters.values(); } - } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 16ff4389d7c4a..1b99beee65e81 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -38,13 +39,19 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.RemoteClusterConnectionTests; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteClusterServiceTests; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -53,13 +60,22 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.startsWith; public class TransportSearchActionTests extends ESTestCase { @@ -304,41 +320,169 @@ public void close() { } } - public void testBuildClusters() { - OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices(); - Map remoteIndices = new HashMap<>(); - Map searchShardsResponses = new HashMap<>(); - int numRemoteClusters = randomIntBetween(0, 10); - boolean onlySuccessful = randomBoolean(); - int localClusters = localIndices == null ? 0 : 1; - int total = numRemoteClusters + localClusters; - int successful = localClusters; - int skipped = 0; - for (int i = 0; i < numRemoteClusters; i++) { - String cluster = randomAlphaOfLengthBetween(5, 10); - remoteIndices.put(cluster, randomOriginalIndices()); - if (onlySuccessful || randomBoolean()) { - //whatever response counts as successful as long as it's not the empty placeholder - searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null)); - successful++; - } else { - searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); - skipped++; - } - } - SearchResponse.Clusters clusters = TransportSearchAction.buildClusters(localIndices, remoteIndices, searchShardsResponses); - assertEquals(total, clusters.getTotal()); - assertEquals(successful, clusters.getSuccessful()); - assertEquals(skipped, clusters.getSkipped()); + private MockTransportService startTransport(String id, List knownNodes) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); } - private static OriginalIndices randomOriginalIndices() { - int numLocalIndices = randomIntBetween(0, 5); - String[] localIndices = new String[numLocalIndices]; - for (int i = 0; i < numLocalIndices; i++) { - localIndices[i] = randomAlphaOfLengthBetween(3, 10); + public void testCollectSearchShards() throws Exception { + int numClusters = randomIntBetween(2, 10); + MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + for (int i = 0; i < numClusters; i++) { + List knownNodes = new CopyOnWriteArrayList<>(); + MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes); + mockTransportServices[i] = remoteSeedTransport; + DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); + knownNodes.add(remoteSeedNode); + nodes[i] = remoteSeedNode; + builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); + remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); + } + Settings settings = builder.build(); + + try { + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertEquals(1, shardsResponse.getNodes().length); + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(0); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } + + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); + } + } + + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } + } + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference failure = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } + + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); + assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + if (disconnectedNodesIndices.contains(i)) { + assertFalse(map.containsKey(clusterAlias)); + } else { + assertNotNull(map.get(clusterAlias)); + } + } + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + assertNotNull(map.get(clusterAlias)); + } + } + assertEquals(0, service.getConnectionManager().size()); + } + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } } - return new OriginalIndices(localIndices, IndicesOptions.fromOptions(randomBoolean(), - randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java new file mode 100644 index 0000000000000..1a6eaff9e5a2e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class RemoteClusterAwareClientTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); + } + + public void testSearchShards() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + SearchRequest request = new SearchRequest("test-index"); + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, e -> fail("no failures expected")), responseLatch)); + responseLatch.await(); + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + } + } + } + } + + public void testSearchShardsThreadContextHeader() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + SearchRequest request = new SearchRequest("test-index"); + int numThreads = 10; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + final String threadId = Integer.toString(i); + executorService.submit(() -> { + ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); + threadContext.putHeader("threadId", threadId); + AtomicReference reference = new AtomicReference<>(); + final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + CountDownLatch responseLatch = new CountDownLatch(1); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap( + resp -> { + reference.set(resp); + assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); + }, + e -> fail("no failures expected")), responseLatch)); + try { + responseLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + }); + } + ThreadPool.terminate(executorService, 5, TimeUnit.SECONDS); + } + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 02e701ed4bc86..308d330d54f61 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -43,7 +42,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; @@ -558,7 +556,7 @@ public void run() { } } - private List>> seedNodes(final DiscoveryNode... seedNodes) { + private static List>> seedNodes(final DiscoveryNode... seedNodes) { if (seedNodes.length == 0) { return Collections.emptyList(); } else if (seedNodes.length == 1) { @@ -570,205 +568,6 @@ private List>> seedNodes(final DiscoveryNo } } - public void testFetchShards() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - final List>> seedNodes = seedNodes(seedNode); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { - if (randomBoolean()) { - updateSeedNodes(connection, seedNodes); - } - if (randomBoolean()) { - connection.updateSkipUnavailable(randomBoolean()); - } - SearchRequest request = new SearchRequest("test-index"); - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - responseLatch.await(); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); - assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); - assertTrue(connection.assertNoRunningConnections()); - } - } - } - } - - public void testFetchShardsThreadContextHeader() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - final List>> seedNodes = seedNodes(seedNode); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { - SearchRequest request = new SearchRequest("test-index"); - Thread[] threads = new Thread[10]; - for (int i = 0; i < threads.length; i++) { - final String threadId = Integer.toString(i); - threads[i] = new Thread(() -> { - ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); - threadContext.putHeader("threadId", threadId); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - CountDownLatch responseLatch = new CountDownLatch(1); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap( - resp -> { - reference.set(resp); - assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); - }, - failReference::set), responseLatch)); - try { - responseLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); - assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); - }); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - assertTrue(connection.assertNoRunningConnections()); - } - } - } - } - - public void testFetchShardsSkipUnavailable() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedNode); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { - ConnectionManager connectionManager = connection.getConnectionManager(); - - SearchRequest request = new SearchRequest("test-index"); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response != ClusterSearchShardsResponse.EMPTY); - assertEquals(knownNodes, Arrays.asList(response.getNodes())); - } - - CountDownLatch disconnectedLatch = new CountDownLatch(1); - connectionManager.addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (node.equals(seedNode)) { - disconnectedLatch.countDown(); - } - } - }); - - service.addFailToSendNoConnectRule(seedTransport); - - if (randomBoolean()) { - connection.updateSkipUnavailable(false); - } - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap((s) -> { - reference.set(s); - }, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNotNull(failReference.get()); - assertNull(reference.get()); - assertThat(failReference.get(), instanceOf(TransportException.class)); - } - - connection.updateSkipUnavailable(true); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response == ClusterSearchShardsResponse.EMPTY); - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); - - if (randomBoolean()) { - connection.updateSkipUnavailable(false); - } - - service.clearAllRules(); - //check that we reconnect once the node is back up - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response != ClusterSearchShardsResponse.EMPTY); - assertEquals(knownNodes, Arrays.asList(response.getNodes())); - } - } - } - } - } - public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index d5671eec21961..60f3ece86bcbe 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -20,9 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -33,7 +31,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; @@ -60,9 +57,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -711,172 +706,6 @@ public void onFailure(Exception e) { } } - public void testCollectSearchShards() throws Exception { - int numClusters = randomIntBetween(2, 10); - MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; - DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; - Map remoteIndicesByCluster = new HashMap<>(); - Settings.Builder builder = Settings.builder(); - for (int i = 0; i < numClusters; i++) { - List knownNodes = new CopyOnWriteArrayList<>(); - MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes, Version.CURRENT); - mockTransportServices[i] = remoteSeedTransport; - DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); - knownNodes.add(remoteSeedNode); - nodes[i] = remoteSeedNode; - builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); - remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); - } - Settings settings = builder.build(); - - try { - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) { - assertFalse(remoteClusterService.isCrossClusterSearchEnabled()); - remoteClusterService.initializeRemoteClusters(); - assertTrue(remoteClusterService.isCrossClusterSearchEnabled()); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertEquals(1, shardsResponse.getNodes().length); - } - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", - null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(response.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); - assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); - } - int numDisconnectedClusters = randomIntBetween(1, numClusters); - Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); - Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); - while(disconnectedNodes.size() < numDisconnectedClusters) { - int i = randomIntBetween(0, numClusters - 1); - if (disconnectedNodes.add(nodes[i])) { - assertTrue(disconnectedNodesIndices.add(i)); - } - } - - CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - for (RemoteClusterConnection connection : remoteClusterService.getConnections()) { - connection.getConnectionManager().addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); - } - } - }); - } - - for (DiscoveryNode disconnectedNode : disconnectedNodes) { - service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); - } - - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(response.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); - } - - //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again - for (int i : disconnectedNodesIndices) { - remoteClusterService.updateSkipUnavailable("remote" + i, true); - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - if (disconnectedNodesIndices.contains(i)) { - assertTrue(shardsResponse == ClusterSearchShardsResponse.EMPTY); - } else { - assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY); - } - } - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); - - service.clearAllRules(); - if (randomBoolean()) { - for (int i : disconnectedNodesIndices) { - if (randomBoolean()) { - remoteClusterService.updateSkipUnavailable("remote" + i, true); - } - - } - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertNotSame(ClusterSearchShardsResponse.EMPTY, shardsResponse); - } - } - assertEquals(0, service.getConnectionManager().size()); - } - } - } finally { - for (MockTransportService mockTransportService : mockTransportServices) { - mockTransportService.close(); - } - } - } - public void testRemoteClusterSkipIfDisconnectedSetting() { { Settings settings = Settings.builder() @@ -1079,7 +908,7 @@ public void testRemoteClusterWithProxy() throws Exception { } } - private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) + private static void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); @@ -1093,4 +922,40 @@ private void updateRemoteCluster(RemoteClusterService service, String clusterAli throw exceptionAtomicReference.get(); } } + + public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) { + RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); + connection.updateSkipUnavailable(skipUnavailable); + } + + public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) { + for (RemoteClusterConnection connection : service.getConnections()) { + ConnectionManager connectionManager = connection.getConnectionManager(); + connectionManager.addListener(listener); + } + } + + public void testSkipUnavailable() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedNode); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + + if (randomBoolean()) { + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + } + + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true); + assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + } + } + } }