diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java index a02a9b971a1cf..a0065662faee5 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java @@ -156,7 +156,8 @@ public ClusterState measureAllocation() { while (clusterState.getRoutingNodes().hasUnassignedShards()) { clusterState = strategy.applyStartedShards( clusterState, - StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false) + clusterState.getRoutingNodes() + .stream() .flatMap(shardRoutings -> StreamSupport.stream(shardRoutings.spliterator(), false)) .filter(ShardRouting::initializing) .collect(Collectors.toList()) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 501089910a479..6c4f30a77692a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; @@ -74,10 +73,15 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); } - final List nodeIds = StreamSupport.stream( - client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(), - false - ).map(RoutingNode::nodeId).collect(Collectors.toList()); + final List nodeIds = client().admin() + .cluster() + .prepareState() + .get() + .getState() + .getRoutingNodes() + .stream() + .map(RoutingNode::nodeId) + .collect(Collectors.toList()); final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); @@ -149,10 +153,15 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception { internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir())); } - final List nodeIds = StreamSupport.stream( - client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(), - false - ).map(RoutingNode::nodeId).collect(Collectors.toList()); + final List nodeIds = client().admin() + .cluster() + .prepareState() + .get() + .getState() + .getRoutingNodes() + .stream() + .map(RoutingNode::nodeId) + .collect(Collectors.toList()); final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); @@ -270,10 +279,15 @@ public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception ) ); - final List nodeIds = StreamSupport.stream( - client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(), - false - ).map(RoutingNode::nodeId).collect(Collectors.toList()); + final List nodeIds = client().admin() + .cluster() + .prepareState() + .get() + .getState() + .getRoutingNodes() + .stream() + .map(RoutingNode::nodeId) + .collect(Collectors.toList()); assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0))); @@ -329,10 +343,15 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception { final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); - final List nodeIds = StreamSupport.stream( - client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(), - false - ).map(RoutingNode::nodeId).collect(Collectors.toList()); + final List nodeIds = client().admin() + .cluster() + .prepareState() + .get() + .getState() + .getRoutingNodes() + .stream() + .map(RoutingNode::nodeId) + .collect(Collectors.toList()); internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1)); @@ -437,10 +456,15 @@ public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception ) ); - final List nodeIds = StreamSupport.stream( - client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(), - false - ).map(RoutingNode::nodeId).collect(Collectors.toList()); + final List nodeIds = client().admin() + .cluster() + .prepareState() + .get() + .getState() + .getRoutingNodes() + .stream() + .map(RoutingNode::nodeId) + .collect(Collectors.toList()); assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0))); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java index 59346a5f4eebe..e9b84453594a8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java @@ -25,7 +25,6 @@ import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.client.internal.Requests.clusterHealthRequest; import static org.elasticsearch.client.internal.Requests.createIndexRequest; @@ -239,9 +238,7 @@ private String getLocalNodeId(String name) { } private void assertNodesPresent(RoutingNodes routingNodes, String... nodes) { - final Set keySet = StreamSupport.stream(routingNodes.spliterator(), false) - .map(RoutingNode::nodeId) - .collect(Collectors.toSet()); + final Set keySet = routingNodes.stream().map(RoutingNode::nodeId).collect(Collectors.toSet()); assertThat(keySet, containsInAnyOrder(nodes)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index a5f3f696576c3..30bf51d0f801a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import java.util.AbstractCollection; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -44,7 +45,6 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. @@ -60,7 +60,7 @@ *
  • {@link #failShard} fails/cancels an assigned shard. * */ -public class RoutingNodes implements Iterable { +public class RoutingNodes extends AbstractCollection { private final Map nodesToShards; @@ -299,10 +299,7 @@ public Set getAttributeValues(String attributeName) { : Thread.currentThread().getName() + " should be the master service thread"; return attributeValuesByAttribute.computeIfAbsent( attributeName, - ignored -> StreamSupport.stream(this.spliterator(), false) - .map(r -> r.node().getAttributes().get(attributeName)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()) + ignored -> stream().map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet()) ); } @@ -869,6 +866,7 @@ private ShardRouting movePrimaryToUnassignedAndDemoteToReplica(ShardRouting shar /** * Returns the number of routing nodes */ + @Override public int size() { return nodesToShards.size(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 0226ee84d94a4..81609a010a5c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -47,7 +47,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** * Listens for a node to go over the high watermark and kicks off an empty @@ -375,7 +374,8 @@ public void onNewInfo(ClusterInfo info) { } // Generate a map of node name to ID so we can use it to look up node replacement targets - final Map nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false) + final Map nodeNameToId = state.getRoutingNodes() + .stream() .collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2)); // Calculate both the source node id and the target node id of a "replace" type shutdown diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java index 9e725a398445a..c94be662ffea7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -34,7 +34,6 @@ import java.util.HashMap; import java.util.Map; import java.util.function.UnaryOperator; -import java.util.stream.StreamSupport; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -1098,7 +1097,8 @@ private void testExplanation( new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false) + final RoutingNode emptyNode = clusterState.getRoutingNodes() + .stream() .filter(RoutingNode::isEmpty) .findFirst() .orElseThrow(AssertionError::new); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SameShardRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SameShardRoutingTests.java index 419c1729d3329..050b5d4da56ce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SameShardRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/SameShardRoutingTests.java @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.List; -import java.util.stream.StreamSupport; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -205,12 +204,14 @@ public void testSameHostCheckWithExplain() { new ClusterSettings(sameHostSetting, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false) + final RoutingNode emptyNode = clusterState.getRoutingNodes() + .stream() .filter(node -> node.getByShardId(unassignedShard.shardId()) == null) .findFirst() .orElseThrow(AssertionError::new); - final RoutingNode otherNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false) + final RoutingNode otherNode = clusterState.getRoutingNodes() + .stream() .filter(node -> node != emptyNode) .findFirst() .orElseThrow(AssertionError::new); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 7c387b729d98f..c5188e22aecdf 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -344,7 +344,8 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) { return false; } IndexMetadata indexMetadata = indexMetadata(shard, allocation); - Set decisionTypes = StreamSupport.stream(allocation.routingNodes().spliterator(), false) + Set decisionTypes = allocation.routingNodes() + .stream() .map( node -> dataTierAllocationDecider.shouldFilter( indexMetadata, @@ -369,7 +370,8 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) { allocation.debugDecision(true); try { // check that it does not belong on any existing node, i.e., there must be only a tier like reason it cannot be allocated - return StreamSupport.stream(allocation.routingNodes().spliterator(), false) + return allocation.routingNodes() + .stream() .anyMatch(node -> isFilterTierOnlyDecision(allocationDeciders.canAllocate(shard, node, allocation), indexMetadata)); } finally { allocation.debugDecision(false);