From 34e0e32b21a9d4460e27c680b0d4c5ad92dd1603 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 26 Feb 2020 07:59:51 -0700 Subject: [PATCH] Fix RemoteConnectionManager size() method Currently the remote connection manager will delegate the size() call to the underlying cluster connection manager. This introduces the possibility that call will return 1 before the nodeConnection method has been triggered to add the connection to the remote connection list. This can cause issues, as the ensureConnected method checks the connection managers size and executes synchronously if the size is > 0. This leads to a potential cluster not connected exception while we are still waiting for the connection opened callback to be triggered. This commit fixes this issue by using the remote connection manager's size to report the connection manager's size. Fixes #52029. --- .../elasticsearch/transport/RemoteClusterAwareClient.java | 2 +- .../elasticsearch/transport/RemoteConnectionManager.java | 6 +++++- .../elasticsearch/transport/SniffConnectionStrategy.java | 3 +-- .../elasticsearch/transport/RemoteClusterClientTests.java | 1 - 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 69b8a1f68a682..0d7aa755746cd 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -45,7 +45,7 @@ final class RemoteClusterAwareClient extends AbstractClient { @Override protected void doExecute(ActionType action, Request request, ActionListener listener) { - remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { + remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> { Transport.Connection connection; if (request instanceof RemoteClusterAwareRequest) { DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index e9e294779f188..9ea0952551cef 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -110,7 +110,11 @@ public Transport.Connection getAnyRemoteConnection() { @Override public int size() { - return delegate.size(); + // Although we use a delegate instance, we report the connection manager size based on the + // RemoteConnectionManager's knowledge of the connections. This is because there is a brief window + // in between the time when the connection is added to the delegate map, and the time when + // nodeConnected is called. + return this.connections.size(); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 1c0c9b4f0b3bd..f5fa74ca612f9 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -349,8 +349,7 @@ public void onResponse(Void aVoid) { @Override public void onFailure(Exception e) { - if (e instanceof ConnectTransportException || - e instanceof IllegalStateException) { + if (e instanceof ConnectTransportException || e instanceof IllegalStateException) { // ISE if we fail the handshake with an version incompatible node // fair enough we can't connect just move on logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), e); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 9d2d7b8dd5782..ed71b7f85c863 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -74,7 +74,6 @@ public void testConnectAndExecuteRequest() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/52029") public void testEnsureWeReconnect() throws Exception { Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool,