Skip to content

Commit

Permalink
Make RoutingNodes behave like a collection
Browse files Browse the repository at this point in the history
Extend it from `AbstractCollection` instead of `Iterable` analogous to #83453
  • Loading branch information
arteam committed Feb 4, 2022
1 parent eb055bc commit 78913c5
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public ClusterState measureAllocation() {
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
clusterState = strategy.applyStartedShards(
clusterState,
StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
clusterState.getRoutingNodes()
.stream()
.flatMap(shardRoutings -> StreamSupport.stream(shardRoutings.spliterator(), false))
.filter(ShardRouting::initializing)
.collect(Collectors.toList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
Expand Down Expand Up @@ -74,10 +73,15 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
}

final List<String> nodeIds = StreamSupport.stream(
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
false
).map(RoutingNode::nodeId).collect(Collectors.toList());
final List<String> nodeIds = client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getRoutingNodes()
.stream()
.map(RoutingNode::nodeId)
.collect(Collectors.toList());

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
Expand Down Expand Up @@ -149,10 +153,15 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception {
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
}

final List<String> nodeIds = StreamSupport.stream(
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
false
).map(RoutingNode::nodeId).collect(Collectors.toList());
final List<String> nodeIds = client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getRoutingNodes()
.stream()
.map(RoutingNode::nodeId)
.collect(Collectors.toList());

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
Expand Down Expand Up @@ -270,10 +279,15 @@ public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception
)
);

final List<String> nodeIds = StreamSupport.stream(
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
false
).map(RoutingNode::nodeId).collect(Collectors.toList());
final List<String> nodeIds = client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getRoutingNodes()
.stream()
.map(RoutingNode::nodeId)
.collect(Collectors.toList());

assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));

Expand Down Expand Up @@ -329,10 +343,15 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();

final List<String> nodeIds = StreamSupport.stream(
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
false
).map(RoutingNode::nodeId).collect(Collectors.toList());
final List<String> nodeIds = client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getRoutingNodes()
.stream()
.map(RoutingNode::nodeId)
.collect(Collectors.toList());

internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1));
Expand Down Expand Up @@ -437,10 +456,15 @@ public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception
)
);

final List<String> nodeIds = StreamSupport.stream(
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
false
).map(RoutingNode::nodeId).collect(Collectors.toList());
final List<String> nodeIds = client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getRoutingNodes()
.stream()
.map(RoutingNode::nodeId)
.collect(Collectors.toList());

assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.client.internal.Requests.clusterHealthRequest;
import static org.elasticsearch.client.internal.Requests.createIndexRequest;
Expand Down Expand Up @@ -239,9 +238,7 @@ private String getLocalNodeId(String name) {
}

private void assertNodesPresent(RoutingNodes routingNodes, String... nodes) {
final Set<String> keySet = StreamSupport.stream(routingNodes.spliterator(), false)
.map(RoutingNode::nodeId)
.collect(Collectors.toSet());
final Set<String> keySet = routingNodes.stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
assertThat(keySet, containsInAnyOrder(nodes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;

import java.util.AbstractCollection;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -44,7 +45,6 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
Expand All @@ -60,7 +60,7 @@
* <li> {@link #failShard} fails/cancels an assigned shard.
* </ul>
*/
public class RoutingNodes implements Iterable<RoutingNode> {
public class RoutingNodes extends AbstractCollection<RoutingNode> {

private final Map<String, RoutingNode> nodesToShards;

Expand Down Expand Up @@ -299,10 +299,7 @@ public Set<String> getAttributeValues(String attributeName) {
: Thread.currentThread().getName() + " should be the master service thread";
return attributeValuesByAttribute.computeIfAbsent(
attributeName,
ignored -> StreamSupport.stream(this.spliterator(), false)
.map(r -> r.node().getAttributes().get(attributeName))
.filter(Objects::nonNull)
.collect(Collectors.toSet())
ignored -> stream().map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet())
);
}

Expand Down Expand Up @@ -869,6 +866,7 @@ private ShardRouting movePrimaryToUnassignedAndDemoteToReplica(ShardRouting shar
/**
* Returns the number of routing nodes
*/
@Override
public int size() {
return nodesToShards.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Listens for a node to go over the high watermark and kicks off an empty
Expand Down Expand Up @@ -375,7 +374,8 @@ public void onNewInfo(ClusterInfo info) {
}

// Generate a map of node name to ID so we can use it to look up node replacement targets
final Map<String, String> nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false)
final Map<String, String> nodeNameToId = state.getRoutingNodes()
.stream()
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));

// Calculate both the source node id and the target node id of a "replace" type shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.StreamSupport;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -1098,7 +1097,8 @@ private void testExplanation(
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
final RoutingNode emptyNode = clusterState.getRoutingNodes()
.stream()
.filter(RoutingNode::isEmpty)
.findFirst()
.orElseThrow(AssertionError::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import java.util.Collections;
import java.util.List;
import java.util.stream.StreamSupport;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -205,12 +204,14 @@ public void testSameHostCheckWithExplain() {
new ClusterSettings(sameHostSetting, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
final RoutingNode emptyNode = clusterState.getRoutingNodes()
.stream()
.filter(node -> node.getByShardId(unassignedShard.shardId()) == null)
.findFirst()
.orElseThrow(AssertionError::new);

final RoutingNode otherNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
final RoutingNode otherNode = clusterState.getRoutingNodes()
.stream()
.filter(node -> node != emptyNode)
.findFirst()
.orElseThrow(AssertionError::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
return false;
}
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
Set<Decision.Type> decisionTypes = StreamSupport.stream(allocation.routingNodes().spliterator(), false)
Set<Decision.Type> decisionTypes = allocation.routingNodes()
.stream()
.map(
node -> dataTierAllocationDecider.shouldFilter(
indexMetadata,
Expand All @@ -369,7 +370,8 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
allocation.debugDecision(true);
try {
// check that it does not belong on any existing node, i.e., there must be only a tier like reason it cannot be allocated
return StreamSupport.stream(allocation.routingNodes().spliterator(), false)
return allocation.routingNodes()
.stream()
.anyMatch(node -> isFilterTierOnlyDecision(allocationDeciders.canAllocate(shard, node, allocation), indexMetadata));
} finally {
allocation.debugDecision(false);
Expand Down

0 comments on commit 78913c5

Please sign in to comment.