From 246f021500a59092b8dc7339bc750e4c5def56c1 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 26 Oct 2018 09:42:32 -0400 Subject: [PATCH 1/2] Fix line length for bootstrap/client/discovery/gateway files Removes the checkstyle suppressions for files in org.elasticsearch.bootstrap/client/discovery/gateway packages. Relates to #34884 --- .../resources/checkstyle_suppressions.xml | 25 -------- .../elasticsearch/bootstrap/JNANatives.java | 3 +- .../client/support/AbstractClient.java | 3 +- .../client/transport/TransportClient.java | 6 +- .../elasticsearch/discovery/Discovery.java | 3 +- .../discovery/DiscoverySettings.java | 6 +- .../discovery/zen/ZenDiscovery.java | 61 +++++++++++++------ .../gateway/GatewayAllocator.java | 18 ++++-- .../gateway/GatewayMetaState.java | 19 ++++-- .../elasticsearch/gateway/GatewayService.java | 6 +- .../gateway/LocalAllocateDangledIndices.java | 18 ++++-- .../gateway/PrimaryShardAllocator.java | 17 ++++-- .../gateway/ReplicaShardAllocator.java | 12 ++-- .../client/AbstractClientHeadersTestCase.java | 12 ++-- ...usterStatePublishResponseHandlerTests.java | 6 +- .../discovery/zen/ZenDiscoveryUnitTests.java | 23 ++++--- .../gateway/GatewayServiceTests.java | 3 +- .../gateway/MetaDataStateFormatTests.java | 27 +++++--- .../gateway/MetaDataWriteDataNodesIT.java | 24 +++++--- .../gateway/PrimaryShardAllocatorTests.java | 37 +++++++---- .../gateway/PriorityComparatorTests.java | 15 +++-- .../gateway/QuorumGatewayIT.java | 6 +- .../gateway/RecoveryFromGatewayIT.java | 45 +++++++++----- .../gateway/ReplicaShardAllocatorTests.java | 34 +++++++---- .../gateway/ReusePeerRecoverySharedTest.java | 9 ++- 25 files changed, 276 insertions(+), 162 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 592c1512d60cf..05117f46bc20b 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -178,10 +178,6 @@ - - - - @@ -239,15 +235,6 @@ - - - - - - - - - @@ -395,7 +382,6 @@ - @@ -477,20 +463,9 @@ - - - - - - - - - - - diff --git a/server/src/main/java/org/elasticsearch/bootstrap/JNANatives.java b/server/src/main/java/org/elasticsearch/bootstrap/JNANatives.java index 8e86f6aa4b779..232da7e253420 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/JNANatives.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/JNANatives.java @@ -95,7 +95,8 @@ static void tryMlockall() { logger.warn("This can result in part of the JVM being swapped out."); if (errno == JNACLibrary.ENOMEM) { if (rlimitSuccess) { - logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit), rlimitToString(hardLimit)); + logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit), + rlimitToString(hardLimit)); if (Constants.LINUX) { // give specific instructions for the linux case to make it easy String user = System.getProperty("user.name"); diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 105fd3fcc498b..7f2e6681294a7 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -752,7 +752,8 @@ public ActionFuture updateSettings(final ClusterU } @Override - public void updateSettings(final ClusterUpdateSettingsRequest request, final ActionListener listener) { + public void updateSettings(final ClusterUpdateSettingsRequest request, + final ActionListener listener) { execute(ClusterUpdateSettingsAction.INSTANCE, request, listener); } diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 39829615fb3fe..b6c57a39a5aee 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -76,7 +76,8 @@ /** * The transport client allows to create a client that is not part of the cluster, but simply connects to one - * or more nodes directly by adding their respective addresses using {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}. + * or more nodes directly by adding their respective addresses using + * {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}. *

* The transport client important modules used is the {@link org.elasticsearch.common.network.NetworkModule} which is * started in client mode (only connects, no bind). @@ -223,7 +224,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings transportService.start(); transportService.acceptIncomingRequests(); - ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry); + ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, + namedWriteableRegistry); resourcesToClose.clear(); return transportClient; } finally { diff --git a/server/src/main/java/org/elasticsearch/discovery/Discovery.java b/server/src/main/java/org/elasticsearch/discovery/Discovery.java index b58f61bac89bb..a2035a93a4f2f 100644 --- a/server/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -43,7 +43,8 @@ public interface Discovery extends LifecycleComponent { * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether * they updated their own cluster state or not. * - * The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be rejected. + * The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not + * committed and should be rejected. * Any other exception signals the something wrong happened but the change is committed. */ void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener); diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java b/server/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java index e9a83678f8a2c..5b7613587cd6f 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java @@ -37,8 +37,10 @@ public class DiscoverySettings extends AbstractComponent { public static final int NO_MASTER_BLOCK_ID = 2; - public static final ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); - public static final ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE)); + public static final ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, false, + RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); + public static final ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, false, + RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE)); /** * sets the timeout for a complete publishing cycle, including both sending and committing. the master * will continue to process the next cluster state update after this time has elapsed diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index a68557adb9d03..a6d9970b0008b 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -302,7 +302,8 @@ protected void doStop() { try { membership.sendLeaveRequest(nodes.getLocalNode(), possibleMaster); } catch (Exception e) { - logger.debug(() -> new ParameterizedMessage("failed to send leave request from master [{}] to possible master [{}]", nodes.getMasterNode(), possibleMaster), e); + logger.debug(() -> new ParameterizedMessage("failed to send leave request from master [{}] to possible master [{}]", + nodes.getMasterNode(), possibleMaster), e); } } } @@ -520,16 +521,19 @@ private boolean joinElectedMaster(DiscoveryNode masterNode) { final Throwable unwrap = ExceptionsHelper.unwrapCause(e); if (unwrap instanceof NotMasterException) { if (++joinAttempt == this.joinRetryAttempts) { - logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt); + logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, + ExceptionsHelper.detailedMessage(e), joinAttempt); return false; } else { - logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt); + logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, + ExceptionsHelper.detailedMessage(e), joinAttempt); } } else { if (logger.isTraceEnabled()) { logger.trace(() -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e); } else { - logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e)); + logger.info("failed to send join request to master [{}], reason [{}]", masterNode, + ExceptionsHelper.detailedMessage(e)); } return false; } @@ -557,7 +561,8 @@ void setCommittedState(ClusterState clusterState) { } // visible for testing - public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor, + ClusterStateTaskListener { private final AllocationService allocationService; private final ElectMasterService electMasterService; @@ -696,7 +701,8 @@ private void handleMinimumMasterNodesChanged(final int minimumMasterNodes) { synchronized (stateMutex) { // check if we have enough master nodes, if not, we need to move into joining the cluster again if (!electMaster.hasEnoughMasterNodes(committedState.get().nodes())) { - rejoin("not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + minimumMasterNodes + "]"); + rejoin("not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + + minimumMasterNodes + "]"); } } } @@ -734,10 +740,12 @@ boolean processNextCommittedClusterState(String reason) { } assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master"; - assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; + assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : + "received a cluster state with a master block"; if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) { - handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(), "via a new cluster state"); + handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(), + "via a new cluster state"); return false; } @@ -826,15 +834,18 @@ public static boolean shouldIgnoreOrRejectNewClusterState(Logger logger, Cluster // reject cluster states that are not new from the same master if (currentState.supersedes(newClusterState) || - (newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) { + (newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && + currentState.version() == newClusterState.version())) { // if the new state has a smaller version, and it has the same master node, then no need to process it - logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); + logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", + newClusterState.version(), currentState.version()); return true; } // reject older cluster states if we are following a master if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) { - logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); + logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", + newClusterState.version(), currentState.version()); return true; } return false; @@ -850,8 +861,10 @@ public static void validateStateIsFromCurrentMaster(Logger logger, DiscoveryNode return; } if (!currentNodes.getMasterNodeId().equals(newClusterState.nodes().getMasterNodeId())) { - logger.warn("received a cluster state from a different master than the current one, rejecting (received {}, current {})", newClusterState.nodes().getMasterNode(), currentNodes.getMasterNode()); - throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + newClusterState.nodes().getMasterNode() + ", current " + currentNodes.getMasterNode() + ")"); + logger.warn("received a cluster state from a different master than the current one, rejecting (received {}, current {})", + newClusterState.nodes().getMasterNode(), currentNodes.getMasterNode()); + throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + + newClusterState.nodes().getMasterNode() + ", current " + currentNodes.getMasterNode() + ")"); } } @@ -941,13 +954,15 @@ private DiscoveryNode findMaster() { return null; } } else { - assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; + assert !activeMasters.contains(localNode) : + "local node should never be elected as master when other nodes indicate an active master"; // lets tie break between discovered nodes return electMaster.tieBreakActiveMasters(activeMasters); } } - static List filterPingResponses(List fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) { + static List filterPingResponses(List fullPingResponses, + boolean masterElectionIgnoreNonMasters, Logger logger) { List pingResponses; if (masterElectionIgnoreNonMasters) { pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList()); @@ -1004,7 +1019,8 @@ private boolean localNodeMaster() { return clusterState().nodes().isLocalNodeElectedMaster(); } - private void handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) { + private void handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, + String reason) { assert localClusterState.nodes().isLocalNodeElectedMaster() : "handleAnotherMaster called but current node is not a master"; assert Thread.holdsLock(stateMutex); @@ -1012,13 +1028,16 @@ private void handleAnotherMaster(ClusterState localClusterState, final Discovery rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]"); } else { // TODO: do this outside mutex - logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason); + logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", + otherMaster, otherMaster, reason); try { // make sure we're connected to this node (connect to node does nothing if we're already connected) // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node // in the past (after a master failure, for example) transportService.connectToNode(otherMaster); - transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, + new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), + new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { @@ -1140,10 +1159,12 @@ public void onPingReceived(final NodesFaultDetection.PingRequest pingRequest) { } if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) { - logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get()); + logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), + pingsWhileMaster.get()); return; } - logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get()); + logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", + pingRequest.masterNode(), pingsWhileMaster.get()); synchronized (stateMutex) { ClusterState currentState = committedState.get(); if (currentState.nodes().isLocalNodeElectedMaster()) { diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index c616716b86a97..4e7266a68320d 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -48,8 +48,10 @@ public class GatewayAllocator extends AbstractComponent { private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; - private final ConcurrentMap> asyncFetchStarted = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> + asyncFetchStarted = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> + asyncFetchStore = ConcurrentCollections.newConcurrentMap(); @Inject public GatewayAllocator(Settings settings, ClusterService clusterService, RoutingService routingService, @@ -161,9 +163,11 @@ class InternalPrimaryShardAllocator extends PrimaryShardAllocator { } @Override - protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + protected AsyncShardFetch.FetchResult + fetchData(ShardRouting shard, RoutingAllocation allocation) { AsyncShardFetch fetch = - asyncFetchStarted.computeIfAbsent(shard.shardId(), shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, startedAction)); + asyncFetchStarted.computeIfAbsent(shard.shardId(), + shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, startedAction)); AsyncShardFetch.FetchResult shardState = fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); @@ -184,9 +188,11 @@ class InternalReplicaShardAllocator extends ReplicaShardAllocator { } @Override - protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + protected AsyncShardFetch.FetchResult + fetchData(ShardRouting shard, RoutingAllocation allocation) { AsyncShardFetch fetch = - asyncFetchStore.computeIfAbsent(shard.shardId(), shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction)); + asyncFetchStore.computeIfAbsent(shard.shardId(), + shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction)); AsyncShardFetch.FetchResult shardStores = fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); if (shardStores.hasData()) { diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 46ff2f960e7cf..fd65d17b1d8cc 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -166,7 +166,8 @@ public void applyClusterState(ClusterChangedEvent event) { relevantIndices = getRelevantIndices(event.state(), event.previousState(), previouslyWrittenIndices); - final Iterable writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData()); + final Iterable writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, + previousMetaData, event.state().metaData()); // check and write changes in indices for (IndexMetaWriteInfo indexMetaWrite : writeInfo) { try { @@ -303,11 +304,14 @@ private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws IOExceptio * * @param previouslyWrittenIndices A list of indices for which the state was already written before * @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written - * @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now + * @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is + * persisted now * @param newMetaData The new metadata * @return iterable over all indices states that should be written to disk */ - public static Iterable resolveStatesToBeWritten(Set previouslyWrittenIndices, Set potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) { + public static Iterable resolveStatesToBeWritten(Set previouslyWrittenIndices, + Set potentiallyUnwrittenIndices, + MetaData previousMetaData, MetaData newMetaData) { List indicesToWrite = new ArrayList<>(); for (Index index : potentiallyUnwrittenIndices) { IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index); @@ -316,7 +320,8 @@ public static Iterable resolveStatesToBeWri if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) { writeReason = "freshly created"; } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) { - writeReason = "version changed from [" + previousIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]"; + writeReason = "version changed from [" + previousIndexMetaData.getVersion() + "] to [" + + newIndexMetaData.getVersion() + "]"; } if (writeReason != null) { indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason)); @@ -325,7 +330,8 @@ public static Iterable resolveStatesToBeWri return indicesToWrite; } - public static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set previouslyWrittenIndices) { + public static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, + Set previouslyWrittenIndices) { RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); if (newRoutingNode == null) { throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); @@ -334,7 +340,8 @@ public static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, Cl for (ShardRouting routing : newRoutingNode) { indices.add(routing.index()); } - // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously + // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if + // we have it written on disk previously for (IndexMetaData indexMetaData : state.metaData()) { boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE); // if the index is open we might still have to write the state if it just transitioned from closed to open diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index d77031218179c..ff42e278a7568 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -64,7 +64,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste public static final Setting RECOVER_AFTER_MASTER_NODES_SETTING = Setting.intSetting("gateway.recover_after_master_nodes", 0, 0, Property.NodeScope); - public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); + public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, + false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); public static final TimeValue DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET = TimeValue.timeValueMinutes(5); @@ -185,7 +186,8 @@ public void clusterChanged(final ClusterChangedEvent event) { } else if (expectedDataNodes != -1 && (nodes.getDataNodes().size() < expectedDataNodes)) { // does not meet the expected... enforceRecoverAfterTime = true; reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]"; - } else if (expectedMasterNodes != -1 && (nodes.getMasterNodes().size() < expectedMasterNodes)) { // does not meet the expected... + } else if (expectedMasterNodes != -1 && (nodes.getMasterNodes().size() < expectedMasterNodes)) { // does not meet the + // expected... enforceRecoverAfterTime = true; reason = "expecting [" + expectedMasterNodes + "] master nodes, but only have [" + nodes.getMasterNodes().size() + "]"; } diff --git a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java index 7bc2e38dde024..52d0e12cd21e1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java +++ b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java @@ -71,7 +71,8 @@ public LocalAllocateDangledIndices(Settings settings, TransportService transport this.clusterService = clusterService; this.allocationService = allocationService; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; - transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest::new, ThreadPool.Names.SAME, new AllocateDangledRequestHandler()); + transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest::new, ThreadPool.Names.SAME, + new AllocateDangledRequestHandler()); } public void allocateDangled(Collection indices, final Listener listener) { @@ -81,7 +82,8 @@ public void allocateDangled(Collection indices, final Listener li listener.onFailure(new MasterNotDiscoveredException("no master to send allocate dangled request")); return; } - AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices.toArray(new IndexMetaData[indices.size()])); + AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), + indices.toArray(new IndexMetaData[indices.size()])); transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler() { @Override public AllocateDangledResponse newInstance() { @@ -157,15 +159,18 @@ public ClusterState execute(ClusterState currentState) { minIndexCompatibilityVersion); } catch (Exception ex) { // upgrade failed - adding index as closed - logger.warn(() -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex); - upgradedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE).version(indexMetaData.getVersion() + 1).build(); + logger.warn(() -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be " + + "upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex); + upgradedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE) + .version(indexMetaData.getVersion() + 1).build(); } metaData.put(upgradedIndexMetaData, false); blocks.addBlocks(upgradedIndexMetaData); if (upgradedIndexMetaData.getState() == IndexMetaData.State.OPEN) { routingTableBuilder.addAsFromDangling(upgradedIndexMetaData); } - sb.append("[").append(upgradedIndexMetaData.getIndex()).append("/").append(upgradedIndexMetaData.getState()).append("]"); + sb.append("[").append(upgradedIndexMetaData.getIndex()).append("/").append(upgradedIndexMetaData.getState()) + .append("]"); } if (!importNeeded) { return currentState; @@ -173,7 +178,8 @@ public ClusterState execute(ClusterState currentState) { logger.info("auto importing dangled indices {} from [{}]", sb.toString(), request.fromNode); RoutingTable routingTable = routingTableBuilder.build(); - ClusterState updatedState = ClusterState.builder(currentState).metaData(metaData).blocks(blocks).routingTable(routingTable).build(); + ClusterState updatedState = ClusterState.builder(currentState).metaData(metaData).blocks(blocks) + .routingTable(routingTable).build(); // now, reroute return allocationService.reroute( diff --git a/server/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index e63f349b55c3e..d79f23be72bf7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -257,9 +257,13 @@ protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, bool } else { final String finalAllocationId = allocationId; if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { - logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); + logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " + + "opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), + nodeShardState.storeException()); } else { - logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); + logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " + + "opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), + nodeShardState.storeException()); allocationId = null; } } @@ -267,7 +271,8 @@ protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, bool if (allocationId != null) { assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException : - "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException(); + "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + + "store throwing " + nodeShardState.storeException(); numberOfAllocationsFound++; if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { nodeShardStates.add(nodeShardState); @@ -280,7 +285,8 @@ protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, bool // prefer shards with matching allocation ids Comparator matchingAllocationsFirst = Comparator.comparing( (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed(); - comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR); + comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) + .thenComparing(PRIMARY_FIRST_COMPARATOR); } else { comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); } @@ -288,7 +294,8 @@ protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, bool nodeShardStates.sort(comparator); if (logger.isTraceEnabled()) { - logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))); + logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()) + .collect(Collectors.joining(", "))); } return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index b91637e072fa7..777e8e31505d7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -121,10 +121,13 @@ public void processExistingRecoveries(RoutingAllocation allocation) { logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", currentNode, nodeWithHighestMatch); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT); + "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ + nodeWithHighestMatch + "]", + null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT); // don't cancel shard in the loop as it will cause a ConcurrentModificationException - shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); + shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, + metaData.getIndexSafe(shard.index()), allocation.changes())); } } } @@ -298,7 +301,8 @@ private List augmentExplanationsWithStoreInfo(Map data) { + private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation, + AsyncShardFetch.FetchResult data) { assert shard.currentNodeId() != null; DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId()); if (primaryNode == null) { diff --git a/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java index 31f6963536c50..e7f32aa524972 100644 --- a/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java +++ b/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java @@ -106,18 +106,22 @@ public void testActions() { client.prepareGet("idx", "type", "id").execute(new AssertingActionListener<>(GetAction.NAME, client.threadPool())); client.prepareSearch().execute(new AssertingActionListener<>(SearchAction.NAME, client.threadPool())); client.prepareDelete("idx", "type", "id").execute(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool())); - client.admin().cluster().prepareDeleteStoredScript("id").execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool())); - client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); + client.admin().cluster().prepareDeleteStoredScript("id") + .execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool())); + client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON) + .execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); // choosing arbitrary cluster admin actions to test client.admin().cluster().prepareClusterStats().execute(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool())); - client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); + client.admin().cluster().prepareCreateSnapshot("repo", "bck") + .execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); client.admin().cluster().prepareReroute().execute(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool())); // choosing arbitrary indices admin actions to test client.admin().indices().prepareCreate("idx").execute(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool())); client.admin().indices().prepareStats().execute(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool())); - client.admin().indices().prepareClearCache("idx1", "idx2").execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); + client.admin().indices().prepareClearCache("idx1", "idx2") + .execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); client.admin().indices().prepareFlush().execute(new AssertingActionListener<>(FlushAction.NAME, client.threadPool())); } diff --git a/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java b/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java index 9504344236b86..da9a4d6c2bf19 100644 --- a/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java @@ -47,7 +47,8 @@ private static class PublishResponder extends AbstractRunnable { final Logger logger; final BlockingClusterStatePublishResponseHandler handler; - PublishResponder(boolean fail, DiscoveryNode node, CyclicBarrier barrier, Logger logger, BlockingClusterStatePublishResponseHandler handler) { + PublishResponder(boolean fail, DiscoveryNode node, CyclicBarrier barrier, Logger logger, + BlockingClusterStatePublishResponseHandler handler) { this.fail = fail; this.node = node; @@ -80,7 +81,8 @@ public void testConcurrentAccess() throws InterruptedException { allNodes[i] = node; } - BlockingClusterStatePublishResponseHandler handler = new BlockingClusterStatePublishResponseHandler(new HashSet<>(Arrays.asList(allNodes))); + BlockingClusterStatePublishResponseHandler handler = + new BlockingClusterStatePublishResponseHandler(new HashSet<>(Arrays.asList(allNodes))); int firstRound = randomIntBetween(5, nodeCount - 1); Thread[] threads = new Thread[firstRound]; diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 2c4fb0c7e8db2..b82ddde7eca71 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -106,13 +106,16 @@ public void testShouldIgnoreNewClusterState() { currentState.version(2); newState.version(1); - assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); + assertTrue("should ignore, because new state's version is lower to current state's version", + shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentState.version(1); newState.version(1); - assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); + assertTrue("should ignore, because new state's version is equal to current state's version", + shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentState.version(1); newState.version(2); - assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); + assertFalse("should not ignore, because new state's version is higher to current state's version", + shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentNodes = DiscoveryNodes.builder(); currentNodes.masterNodeId("b").add(new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)); @@ -144,7 +147,8 @@ public void testShouldIgnoreNewClusterState() { currentState.version(1); newState.version(2); } - assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); + assertFalse("should not ignore, because current state doesn't have a master", + shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); } public void testFilterNonMasterPingResponse() { @@ -311,8 +315,10 @@ public void onNewClusterState(String source, Supplier clusterState listener.onSuccess(source); } }; - ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), ESAllocationTestCase.createAllocationService(), + ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), + ESAllocationTestCase.createAllocationService(), Collections.emptyList()); zenDiscovery.start(); return zenDiscovery; @@ -341,8 +347,9 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception { (() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList())); final boolean incompatible = randomBoolean(); IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder() - .put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion()) - : VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT)) + .put(SETTING_VERSION_CREATED, + incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion()) + : VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT)) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) .put(SETTING_CREATION_DATE, System.currentTimeMillis())) .state(IndexMetaData.State.OPEN) diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index 3e4a3dce09153..7cf8bf5e1644b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -58,7 +58,8 @@ public void testDefaultRecoverAfterTime() throws IOException { // ensure settings override default TimeValue timeValue = TimeValue.timeValueHours(3); // ensure default is set when setting expected_nodes - service = createService(Settings.builder().put("gateway.expected_nodes", 1).put("gateway.recover_after_time", timeValue.toString())); + service = createService(Settings.builder().put("gateway.expected_nodes", 1).put("gateway.recover_after_time", + timeValue.toString())); assertThat(service.recoverAfterTime().millis(), Matchers.equalTo(timeValue.millis())); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index 8f89e59003cb1..330947b21e983 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -102,7 +102,8 @@ public void testReadWriteState() throws IOException { } final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); - DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), + randomDouble(), randomBoolean()); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -116,7 +117,8 @@ public void testReadWriteState() throws IOException { DummyState read = format.read(NamedXContentRegistry.EMPTY, list[0]); assertThat(read, equalTo(state)); } - DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), + randomDouble(), randomBoolean()); format.write(state2, dirs); for (Path file : dirs) { @@ -142,7 +144,8 @@ public void testVersionMismatch() throws IOException { final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); - DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), + randomDouble(), randomBoolean()); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -165,7 +168,8 @@ public void testCorruption() throws IOException { } final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); - DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), + randomDouble(), randomBoolean()); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -207,7 +211,8 @@ public static void corruptFile(Path file, Logger logger) throws IOException { byte newValue = (byte) ~oldValue; bb.put(0, newValue); raf.write(bb, filePointer); - logger.debug("Corrupting file {} -- flipping at position {} from {} to {} ", fileToCorrupt.getFileName().toString(), filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue)); + logger.debug("Corrupting file {} -- flipping at position {} from {} to {} ", fileToCorrupt.getFileName().toString(), + filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue)); } long checksumAfterCorruption; long actualChecksumAfterCorruption; @@ -221,7 +226,8 @@ public static void corruptFile(Path file, Logger logger) throws IOException { msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]"); msg.append(" after: [").append(checksumAfterCorruption).append("]"); msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]"); - msg.append(" file: ").append(fileToCorrupt.getFileName().toString()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString())); + msg.append(" file: ").append(fileToCorrupt.getFileName().toString()).append(" length: ") + .append(dir.fileLength(fileToCorrupt.getFileName().toString())); logger.debug("{}", msg.toString()); assumeTrue("Checksum collision - " + msg.toString(), checksumAfterCorruption != checksumBeforeCorruption // collision @@ -243,7 +249,8 @@ public void testLoadState() throws IOException { Files.createDirectories(dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME)); for (int j = 0; j < numStates; j++) { format.write(meta.get(j), dirs[i]); - if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily need here.... + if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily + // need here.... Path file = dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + j + ".st"); corruptedFiles.add(file); MetaDataStateFormatTests.corruptFile(file, logger); @@ -320,7 +327,8 @@ private MetaData randomMeta() throws IOException { private IndexMetaData.Builder indexBuilder(String index) throws IOException { return IndexMetaData.builder(index) - .settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5))); + .settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5))); } @@ -471,7 +479,8 @@ public long addDummyFiles(String prefix, Path... paths) throws IOException { } else { realId = Math.max(realId, id); } - try (OutputStream stream = Files.newOutputStream(stateDir.resolve(actualPrefix + id + MetaDataStateFormat.STATE_FILE_EXTENSION))) { + try (OutputStream stream = + Files.newOutputStream(stateDir.resolve(actualPrefix + id + MetaDataStateFormat.STATE_FILE_EXTENSION))) { stream.write(0); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java index f2bacc154bf46..81d1442727de7 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java @@ -63,7 +63,8 @@ public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception { String node2 = nodeNames.get(1); String index = "index"; - assertAcked(prepareCreate(index).setSettings(Settings.builder().put("index.number_of_replicas", 0).put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node1))); + assertAcked(prepareCreate(index).setSettings(Settings.builder().put("index.number_of_replicas", 0) + .put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node1))); index(index, "_doc", "1", jsonBuilder().startObject().field("text", "some text").endObject()); ensureGreen(); assertIndexInMetaState(node1, index); @@ -72,7 +73,8 @@ public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception { assertIndexInMetaState(masterNode, index); logger.debug("relocating index..."); - client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node2)).get(); + client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node2)).get(); client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).get(); ensureGreen(); assertIndexDirectoryDeleted(node1, resolveIndex); @@ -109,11 +111,13 @@ public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception { .endObject()).get(); GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(index).addTypes("_doc").get(); - assertNotNull(((Map) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties"))).get("integer_field")); + assertNotNull(((Map) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties"))) + .get("integer_field")); // make sure it was also written on red node although index is closed ImmutableOpenMap indicesMetaData = getIndicesMetaDataOnNode(dataNode); - assertNotNull(((Map) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))).get("integer_field")); + assertNotNull(((Map) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))) + .get("integer_field")); assertThat(indicesMetaData.get(index).getState(), equalTo(IndexMetaData.State.CLOSE)); /* Try the same and see if this also works if node was just restarted. @@ -134,11 +138,13 @@ public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception { .endObject()).get(); getMappingsResponse = client().admin().indices().prepareGetMappings(index).addTypes("_doc").get(); - assertNotNull(((Map) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties"))).get("float_field")); + assertNotNull(((Map) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties"))) + .get("float_field")); // make sure it was also written on red node although index is closed indicesMetaData = getIndicesMetaDataOnNode(dataNode); - assertNotNull(((Map) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))).get("float_field")); + assertNotNull(((Map) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))) + .get("float_field")); assertThat(indicesMetaData.get(index).getState(), equalTo(IndexMetaData.State.CLOSE)); // finally check that meta data is also written of index opened again @@ -152,7 +158,8 @@ public void testMetaWrittenWhenIndexIsClosedAndMetaUpdated() throws Exception { protected void assertIndexDirectoryDeleted(final String nodeName, final Index index) throws Exception { assertBusy(() -> { logger.info("checking if index directory exists..."); - assertFalse("Expecting index directory of " + index + " to be deleted from node " + nodeName, indexDirectoryExists(nodeName, index)); + assertFalse("Expecting index directory of " + index + " to be deleted from node " + nodeName, + indexDirectoryExists(nodeName, index)); } ); } @@ -161,7 +168,8 @@ protected void assertIndexInMetaState(final String nodeName, final String indexN assertBusy(() -> { logger.info("checking if meta state exists..."); try { - assertTrue("Expecting meta state of index " + indexName + " to be on node " + nodeName, getIndicesMetaDataOnNode(nodeName).containsKey(indexName)); + assertTrue("Expecting meta state of index " + indexName + " to be on node " + nodeName, + getIndicesMetaDataOnNode(nodeName).containsKey(indexName)); } catch (Exception e) { logger.info("failed to load meta state", e); fail("could not load meta state"); diff --git a/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index ae643b7f094c2..ac35d8d136e27 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -118,7 +118,8 @@ public void testNoAllocationFound() { } /** - * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned. + * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore + * unassigned. */ public void testNoMatchingAllocationIdFound() { RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, "id2"); @@ -155,9 +156,11 @@ public void testShardLockObtainFailedException() { assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node1.getId())); // check that allocation id is reused - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1")); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo("allocId1")); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -177,9 +180,11 @@ public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId())); // check that allocation id is reused - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId2)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -187,16 +192,18 @@ public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { * Tests that when there is a node to allocate the shard to, it will be allocated to it. */ public void testFoundAllocationAndAllocating() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), - "allocId1"); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), + randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), "allocId1"); testAllocator.addData(node1, "allocId1", randomBoolean()); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node1.getId())); // check that allocation id is reused - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1")); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo("allocId1")); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -284,7 +291,8 @@ public void testPreferAllocatingPreviousPrimary() { assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2; - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(allocatedNode.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(allocatedNode.getId())); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -315,7 +323,8 @@ public void testFoundAllocationButNoDecider() { assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node1.getId())); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -475,12 +484,14 @@ public TestAllocator addData(DiscoveryNode node, String allocationId, boolean pr if (data == null) { data = new HashMap<>(); } - data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException)); + data.put(node, + new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException)); return this; } @Override - protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + protected AsyncShardFetch.FetchResult + fetchData(ShardRouting shard, RoutingAllocation allocation) { return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.emptySet()); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java b/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java index 3de96448a4a3e..d5ce6644f0ec5 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java @@ -111,7 +111,8 @@ public void testPriorityComparatorSort() { for (int i = 0; i < indices.length; i++) { if (frequently()) { - indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i), randomIntBetween(1, 1000), randomIntBetween(1, 10000)); + indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i), randomIntBetween(1, 1000), + randomIntBetween(1, 10000)); } else { // sometimes just use defaults indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i)); } @@ -121,7 +122,8 @@ public void testPriorityComparatorSort() { for (int i = 0; i < numShards; i++) { IndexMeta indexMeta = randomFrom(indices); shards.add(TestShardRouting.newShardRouting(indexMeta.name, randomIntBetween(1, 5), null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), + "foobar"))); } shards.sort(new PriorityComparator() { @Override @@ -138,13 +140,16 @@ protected Settings getIndexSettings(Index index) { if (prevMeta.priority == currentMeta.priority) { if (prevMeta.creationDate == currentMeta.creationDate) { if (prevMeta.name.equals(currentMeta.name) == false) { - assertTrue("indexName mismatch, expected:" + currentMeta.name + " after " + prevMeta.name + " " + prevMeta.name.compareTo(currentMeta.name), prevMeta.name.compareTo(currentMeta.name) > 0); + assertTrue("indexName mismatch, expected:" + currentMeta.name + " after " + prevMeta.name + " " + + prevMeta.name.compareTo(currentMeta.name), prevMeta.name.compareTo(currentMeta.name) > 0); } } else { - assertTrue("creationDate mismatch, expected:" + currentMeta.creationDate + " after " + prevMeta.creationDate, prevMeta.creationDate > currentMeta.creationDate); + assertTrue("creationDate mismatch, expected:" + currentMeta.creationDate + " after " + prevMeta.creationDate, + prevMeta.creationDate > currentMeta.creationDate); } } else { - assertTrue("priority mismatch, expected:" + currentMeta.priority + " after " + prevMeta.priority, prevMeta.priority > currentMeta.priority); + assertTrue("priority mismatch, expected:" + currentMeta.priority + " after " + prevMeta.priority, + prevMeta.priority > currentMeta.priority); } } previous = routing; diff --git a/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index 3abaff3295924..b16e2e2f6c505 100644 --- a/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -75,12 +75,14 @@ public void doAfterNodes(int numNodes, final Client activeClient) throws Excepti if (numNodes == 1) { assertTrue(awaitBusy(() -> { logger.info("--> running cluster_health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet(); + ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest() + .waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet(); logger.info("--> done cluster_health, status {}", clusterHealth.getStatus()); return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW; }, 30, TimeUnit.SECONDS)); logger.info("--> one node is closed -- index 1 document into the remaining nodes"); - activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).get(); + activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3") + .endObject()).get(); assertNoFailures(activeClient.admin().indices().prepareRefresh().get()); for (int i = 0; i < 10; i++) { assertHitCount(activeClient.prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 3L); diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index b0b6c35f92a1a..a8f2cfab2b79b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -145,7 +145,8 @@ private Map assertAndCapturePrimaryTerms(Map pre } else { assertThat("number of terms changed for index [" + index + "]", current.length, equalTo(previous.length)); for (int shard = 0; shard < current.length; shard++) { - assertThat("primary term didn't increase for [" + index + "][" + shard + "]", current[shard], greaterThan(previous[shard])); + assertThat("primary term didn't increase for [" + index + "][" + shard + "]", current[shard], + greaterThan(previous[shard])); } result.put(index, current); } @@ -158,7 +159,8 @@ public void testSingleNodeNoFlush() throws Exception { internalCluster().startNode(); String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type1") - .startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject() + .startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer") + .endObject().endObject() .endObject().endObject()); // note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test. int numberOfShards = numberOfShards(); @@ -243,9 +245,11 @@ public void testSingleNodeNoFlush() throws Exception { public void testSingleNodeWithFlush() throws Exception { internalCluster().startNode(); - client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute() + .actionGet(); flush(); - client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute() + .actionGet(); refresh(); assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2); @@ -280,9 +284,11 @@ public void testTwoNodeFirstNodeCleared() throws Exception { final String firstNode = internalCluster().startNode(); internalCluster().startNode(); - client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute() + .actionGet(); flush(); - client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute() + .actionGet(); refresh(); logger.info("Running Cluster Health (wait for the shards to startup)"); @@ -321,9 +327,11 @@ public void testLatestVersionLoaded() throws Exception { internalCluster().startNodes(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()); assertAcked(client().admin().indices().prepareCreate("test")); - client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute() + .actionGet(); client().admin().indices().prepareFlush().execute().actionGet(); - client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute() + .actionGet(); client().admin().indices().prepareRefresh().execute().actionGet(); logger.info("--> running cluster_health (wait for the shards to startup)"); @@ -340,7 +348,8 @@ public void testLatestVersionLoaded() throws Exception { internalCluster().stopRandomDataNode(); logger.info("--> one node is closed - start indexing data into the second one"); - client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); + client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute() + .actionGet(); // TODO: remove once refresh doesn't fail immediately if there a master block: // https://github.com/elastic/elasticsearch/issues/9997 // client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); @@ -361,7 +370,8 @@ public void testLatestVersionLoaded() throws Exception { .startObject("field2").field("type", "keyword").field("store", true).endObject() .endObject().endObject().endObject()) .execute().actionGet(); - client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet(); + client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute() + .actionGet(); logger.info("--> stopping the second node"); internalCluster().stopRandomDataNode(); @@ -476,10 +486,13 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat("bytes should have been recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered)); assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L)); // we have to recover the segments file since we commit the translog ID on engine startup - assertThat("all existing files should be reused, byte count mismatch", recoveryState.getIndex().reusedBytes(), equalTo(reused)); + assertThat("all existing files should be reused, byte count mismatch", recoveryState.getIndex().reusedBytes(), + equalTo(reused)); assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered)); - assertThat("the segment from the last round of indexing should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(filesRecovered)); - assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused)); + assertThat("the segment from the last round of indexing should be recovered", recoveryState.getIndex().recoveredFileCount(), + equalTo(filesRecovered)); + assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), + equalTo(filesReused)); assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered)); assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0)); assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0)); @@ -498,12 +511,14 @@ public void testRecoveryDifferentNodeOrderStartup() throws Exception { // we need different data paths so we make sure we start the second node fresh final Path pathNode1 = createTempDir(); - final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build()); + final String node_1 = + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build()); client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet(); final Path pathNode2 = createTempDir(); - final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build()); + final String node_2 = + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build()); ensureGreen(); Map primaryTerms = assertAndCapturePrimaryTerms(null); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index f53c8da2f2d96..a63a76e7154f1 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -95,7 +95,8 @@ public void testNoAsyncFetchData() { * the shard allocator to allocate it. There isn't a copy around to find anyhow. */ public void testNoAsyncFetchOnIndexCreation() { - RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED); + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY, + UnassignedInfo.Reason.INDEX_CREATED); testAllocator.clean(); testAllocator.allocateUnassigned(allocation); assertThat(testAllocator.getFetchDataCalledAndClean(), equalTo(false)); @@ -108,7 +109,8 @@ public void testNoAsyncFetchOnIndexCreation() { * and find a better copy for the shard. */ public void testAsyncFetchOnAnythingButIndexCreation() { - UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), EnumSet.complementOf(EnumSet.of(UnassignedInfo.Reason.INDEX_CREATED))); + UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), + EnumSet.complementOf(EnumSet.of(UnassignedInfo.Reason.INDEX_CREATED))); RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY, reason); testAllocator.clean(); testAllocator.allocateUnassigned(allocation); @@ -125,7 +127,8 @@ public void testSimpleFullMatchAllocation() { .addData(nodeToMatch, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(nodeToMatch.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(nodeToMatch.getId())); } /** @@ -138,7 +141,8 @@ public void testSyncIdMatch() { .addData(nodeToMatch, "MATCH", new StoreFileMetaData("file1", 10, "NO_MATCH_CHECKSUM" ,MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(nodeToMatch.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(nodeToMatch.getId())); } /** @@ -151,7 +155,8 @@ public void testFileChecksumMatch() { .addData(nodeToMatch, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(nodeToMatch.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(nodeToMatch.getId())); } /** @@ -198,7 +203,8 @@ public void testNoMatchingFilesForReplicaOnAnyNode() { * moves to the ignore unassigned list. */ public void testNoOrThrottleDecidersRemainsInUnassigned() { - RoutingAllocation allocation = onePrimaryOnNode1And1Replica(randomBoolean() ? noAllocationDeciders() : throttleAllocationDeciders()); + RoutingAllocation allocation = + onePrimaryOnNode1And1Replica(randomBoolean() ? noAllocationDeciders() : throttleAllocationDeciders()); testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) .addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.allocateUnassigned(allocation); @@ -246,12 +252,14 @@ public void testDelayedAllocation() { assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), - Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT); + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), + TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT); testAllocator.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId())); } public void testCancelRecoveryBetterSyncId() { @@ -330,7 +338,9 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid .add(IndexRoutingTable.builder(shardId.getIndex()) .addIndexShard(new IndexShardRoutingTable.Builder(shardId) .addShard(primaryShard) - .addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false, ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null))) + .addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false, + ShardRoutingState.INITIALIZING, + new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null))) .build()) ) .build(); @@ -380,13 +390,15 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat } @Override - protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + protected AsyncShardFetch.FetchResult + fetchData(ShardRouting shard, RoutingAllocation allocation) { fetchDataCalled.set(true); Map tData = null; if (data != null) { tData = new HashMap<>(); for (Map.Entry entry : data.entrySet()) { - tData.put(entry.getKey(), new TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData(entry.getKey(), entry.getValue())); + tData.put(entry.getKey(), + new TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData(entry.getKey(), entry.getValue())); } } return new AsyncShardFetch.FetchResult<>(shardId, tData, Collections.emptySet()); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java b/server/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java index 81be3057b0161..847c1801510a1 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java @@ -87,7 +87,8 @@ public static void testCase(Settings indexSettings, Runnable restartCluster, Log // Disable allocations while we are closing nodes client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)).get(); + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), + EnableAllocationDecider.Allocation.NONE)).get(); logger.info("--> full cluster restart"); restartCluster.run(); @@ -102,7 +103,8 @@ public static void testCase(Settings indexSettings, Runnable restartCluster, Log logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time"); // Disable allocations while we are closing nodes client().admin().cluster().prepareUpdateSettings().setTransientSettings( - Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)) + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), + EnableAllocationDecider.Allocation.NONE)) .get(); logger.info("--> full cluster restart"); restartCluster.run(); @@ -138,7 +140,8 @@ public static void testCase(Settings indexSettings, Runnable restartCluster, Log } else { if (useSyncIds && !recoveryState.getPrimary()) { logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}", - recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(), + recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), + recoveryState.getTargetNode().getName(), recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); } assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L)); From 7c92de8cdd4599800efb2139dcae20e71e3b3c35 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 26 Oct 2018 14:22:01 -0400 Subject: [PATCH 2/2] Address review comments --- .../java/org/elasticsearch/discovery/zen/ZenDiscovery.java | 4 ++-- .../main/java/org/elasticsearch/gateway/GatewayService.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index a6d9970b0008b..398dd4088e51a 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -561,8 +561,8 @@ void setCommittedState(ClusterState clusterState) { } // visible for testing - public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor, - ClusterStateTaskListener { + public static class NodeRemovalClusterStateTaskExecutor + implements ClusterStateTaskExecutor, ClusterStateTaskListener { private final AllocationService allocationService; private final ElectMasterService electMasterService; diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index ff42e278a7568..e8e442ab64b84 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -186,8 +186,8 @@ public void clusterChanged(final ClusterChangedEvent event) { } else if (expectedDataNodes != -1 && (nodes.getDataNodes().size() < expectedDataNodes)) { // does not meet the expected... enforceRecoverAfterTime = true; reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]"; - } else if (expectedMasterNodes != -1 && (nodes.getMasterNodes().size() < expectedMasterNodes)) { // does not meet the - // expected... + } else if (expectedMasterNodes != -1 && (nodes.getMasterNodes().size() < expectedMasterNodes)) { + // does not meet the expected... enforceRecoverAfterTime = true; reason = "expecting [" + expectedMasterNodes + "] master nodes, but only have [" + nodes.getMasterNodes().size() + "]"; }