Skip to content

Commit

Permalink
Faster access to INITIALIZING/RELOCATING shards (#47817)
Browse files Browse the repository at this point in the history
Today a couple of allocation deciders iterate through all the shards on a node
to find the `INITIALIZING` or `RELOCATING` ones, and this can slow down cluster
state updates in clusters with very high-density nodes holding many thousands
of shards even if those shards belong to closed or frozen indices. This commit
pre-computes the sets of `INITIALIZING` and `RELOCATING` shards to speed up
this search.

Closes #46941
Relates #48579

Co-authored-by: "hongju.xhj" <hongju.xhj@alibaba-inc.com>
  • Loading branch information
2 people authored and DaveCTurner committed Oct 31, 2019
1 parent 32d511d commit 30b0a4e
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Collectors;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
Expand All @@ -41,6 +44,10 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order

private final LinkedHashSet<ShardRouting> initializingShards;

private final LinkedHashSet<ShardRouting> relocatingShards;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}
Expand All @@ -49,6 +56,16 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
for (ShardRouting shardRouting : shards.values()) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
}
assert invariant();
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
Expand Down Expand Up @@ -99,26 +116,58 @@ public int size() {
* @param shard Shard to crate on this Node
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.containsKey(shard.shardId())) {
throw new IllegalStateException("Trying to add a shard " + shard.shardId() + " to a node [" + nodeId
+ "] where it already exists. current [" + shards.get(shard.shardId()) + "]. new [" + shard + "]");
}
shards.put(shard.shardId(), shard);

if (shard.initializing()) {
initializingShards.add(shard);
} else if (shard.relocating()) {
relocatingShards.add(shard);
}
assert invariant();
}

void update(ShardRouting oldShard, ShardRouting newShard) {
assert invariant();
if (shards.containsKey(oldShard.shardId()) == false) {
// Shard was already removed by routing nodes iterator
// TODO: change caller logic in RoutingNodes so that this check can go away
return;
}
ShardRouting previousValue = shards.put(newShard.shardId(), newShard);
assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard;

if (oldShard.initializing()) {
boolean exist = initializingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in initializingShards";
} else if (oldShard.relocating()) {
boolean exist = relocatingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
}
if (newShard.initializing()) {
initializingShards.add(newShard);
} else if (newShard.relocating()) {
relocatingShards.add(newShard);
}
assert invariant();
}

void remove(ShardRouting shard) {
assert invariant();
ShardRouting previousValue = shards.remove(shard.shardId());
assert previousValue == shard : "expected shard " + previousValue + " but was " + shard;
if (shard.initializing()) {
boolean exist = initializingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in initializingShards";
} else if (shard.relocating()) {
boolean exist = relocatingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in relocatingShards";
}
assert invariant();
}

/**
Expand All @@ -127,6 +176,14 @@ void remove(ShardRouting shard) {
* @return number of shards
*/
public int numberOfShardsWithState(ShardRoutingState... states) {
if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
return initializingShards.size();
} else if (states[0] == ShardRoutingState.RELOCATING) {
return relocatingShards.size();
}
}

int count = 0;
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
Expand All @@ -144,6 +201,14 @@ public int numberOfShardsWithState(ShardRoutingState... states) {
* @return List of shards
*/
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
return new ArrayList<>(initializingShards);
} else if (states[0] == ShardRoutingState.RELOCATING) {
return new ArrayList<>(relocatingShards);
}
}

List<ShardRouting> shards = new ArrayList<>();
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
Expand All @@ -164,6 +229,26 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {
List<ShardRouting> shards = new ArrayList<>();

if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
for (ShardRouting shardEntry : initializingShards) {
if (shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
} else if (states[0] == ShardRoutingState.RELOCATING) {
for (ShardRouting shardEntry : relocatingShards) {
if (shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
}
}

for (ShardRouting shardEntry : this) {
if (!shardEntry.getIndexName().equals(index)) {
continue;
Expand All @@ -181,14 +266,7 @@ public List<ShardRouting> shardsWithState(String index, ShardRoutingState... sta
* The number of shards on this node that will not be eventually relocated.
*/
public int numberOfOwningShards() {
int count = 0;
for (ShardRouting shardEntry : this) {
if (shardEntry.state() != ShardRoutingState.RELOCATING) {
count++;
}
}

return count;
return shards.size() - relocatingShards.size();
}

public String prettyPrint() {
Expand Down Expand Up @@ -223,4 +301,21 @@ public List<ShardRouting> copyShards() {
public boolean isEmpty() {
return shards.isEmpty();
}

private boolean invariant() {

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing =
shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList());
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating =
shards.values().stream().filter(ShardRouting::relocating).collect(Collectors.toList());
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -122,10 +123,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries

int primariesInRecovery = 0;
for (ShardRouting shard : node) {
for (ShardRouting shard : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) {
if (shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

import java.net.InetAddress;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class RoutingNodeTests extends ESTestCase {
private ShardRouting unassignedShard0 =
TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED);
private ShardRouting initializingShard0 =
TestShardRouting.newShardRouting("test", 1, "node-1", false, ShardRoutingState.INITIALIZING);
private ShardRouting relocatingShard0 =
TestShardRouting.newShardRouting("test", 2, "node-1", "node-2", false, ShardRoutingState.RELOCATING);
private RoutingNode routingNode;

@Override
public void setUp() throws Exception {
super.setUp();
InetAddress inetAddress = InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1});
TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535));
DiscoveryNode discoveryNode = new DiscoveryNode("name1", "node-1", transportAddress, emptyMap(), emptySet(), Version.CURRENT);
routingNode = new RoutingNode("node1", discoveryNode, unassignedShard0, initializingShard0, relocatingShard0);
}

public void testAdd() {
ShardRouting initializingShard1 =
TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING);
ShardRouting relocatingShard0 =
TestShardRouting.newShardRouting("test", 4, "node-1", "node-2",false, ShardRoutingState.RELOCATING);
routingNode.add(initializingShard1);
routingNode.add(relocatingShard0);
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 3)), equalTo(initializingShard1));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0));
}

public void testUpdate() {
ShardRouting startedShard0 =
TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED);
ShardRouting startedShard1 =
TestShardRouting.newShardRouting("test", 1, "node-1", "node-2",false, ShardRoutingState.RELOCATING);
ShardRouting startedShard2 =
TestShardRouting.newShardRouting("test", 2, "node-1", false, ShardRoutingState.INITIALIZING);
routingNode.update(unassignedShard0, startedShard0);
routingNode.update(initializingShard0, startedShard1);
routingNode.update(relocatingShard0, startedShard2);
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)).state(),
equalTo(ShardRoutingState.STARTED));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)).state(),
equalTo(ShardRoutingState.RELOCATING));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)).state(),
equalTo(ShardRoutingState.INITIALIZING));
}

public void testRemove() {
routingNode.remove(unassignedShard0);
routingNode.remove(initializingShard0);
routingNode.remove(relocatingShard0);
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)), is(nullValue()));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)), is(nullValue()));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)), is(nullValue()));
}

public void testNumberOfShardsWithState() {
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED), equalTo(2));
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1));
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
}

public void testShardsWithState() {
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
}

public void testShardsWithStateInIndex() {
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).size(), equalTo(1));
}

public void testNumberOfOwningShards() {
assertThat(routingNode.numberOfOwningShards(), equalTo(2));
}

}

0 comments on commit 30b0a4e

Please sign in to comment.