Skip to content

Commit

Permalink
Auto-release of read-only-allow-delete block when disk utilization fa…
Browse files Browse the repository at this point in the history
…lls below the low watermark. Relates to elastic#39334
  • Loading branch information
Bukhtawar committed May 26, 2019
1 parent 5d837fa commit c90c84f
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 41 deletions.
6 changes: 4 additions & 2 deletions docs/reference/modules/cluster/disk_allocator.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ Elasticsearch enforces a read-only index block
(`index.blocks.read_only_allow_delete`) on every index that has one or more
shards allocated on the node that has at least one disk exceeding the flood
stage. This is a last resort to prevent nodes from running out of disk space.
The index block must be released manually once there is enough disk space
available to allow indexing operations to continue.
The index block is automatically released once the disk utilization falls below
the high watermark.
The automatic release can however be disabled in 7.x through a system property
`es.disk.auto_release_flood_stage_block`

NOTE: You can not mix the usage of percentage values and byte values within
these settings. Either all are set to percentage values, or all are set to byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.elasticsearch.cluster.routing.allocation;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
Expand All @@ -34,9 +37,11 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand All @@ -55,6 +60,7 @@ public class DiskThresholdMonitor {
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
private final Supplier<ClusterState> clusterStateSupplier;
private long lastRunNS;
private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);

public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client) {
Expand Down Expand Up @@ -109,13 +115,24 @@ public void onNewInfo(ClusterInfo info) {
}
ClusterState state = clusterStateSupplier.get();
Set<String> indicesToMarkReadOnly = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes();
Map<String, Boolean> indexAutoReleaseEligibility = new HashMap<>();
// Ensure we release indices on nodes that have a usage response from node stats
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indexAutoReleaseEligibility);
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
RoutingNode routingNode = state.getRoutingNodes().node(node);
// Only unblock index if all nodes that contain shards of it are below the high disk watermark
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
markEligiblityForAutoRelease(routingNode, indexAutoReleaseEligibility, false);
} else {
markEligiblityForAutoRelease(routingNode, indexAutoReleaseEligibility, true);
}
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
RoutingNode routingNode = state.getRoutingNodes().node(node);
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index().getName());
Expand Down Expand Up @@ -159,17 +176,59 @@ public void onNewInfo(ClusterInfo info) {
logger.info("rerouting shards: [{}]", explanation);
reroute();
}

// Get set of indices that are eligible to be automatically unblocked
// Only collect indices that are currently blocked
Set<String> indicesToAutoRelease = indexAutoReleaseEligibility.entrySet().stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.collect(Collectors.toCollection(HashSet::new));

if (indicesToAutoRelease.isEmpty() == false) {
if (diskThresholdSettings.isAutoReleaseIndexEnabled()) {
logger.info("Releasing read-only allow delete block on indices: [{}]", indicesToAutoRelease);
updateIndicesReadOnly(indicesToAutoRelease, false);
} else {
deprecationLogger.deprecated("es.disk.auto_release_flood_stage_block will be removed in 8.0.0");
}
}
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
if (indicesToMarkReadOnly.isEmpty() == false) {
markIndicesReadOnly(indicesToMarkReadOnly);
updateIndicesReadOnly(indicesToMarkReadOnly, true);
}
}
}


private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages,
Map<String, Boolean> indexAutoReleaseEligibility) {
if (routingNodes.size() != usages.size()) {
for (RoutingNode routingNode : routingNodes) {
if (!usages.keys().contains(routingNode.nodeId())) {
markEligiblityForAutoRelease(routingNode, indexAutoReleaseEligibility, false);
}
}
}

}

private void markEligiblityForAutoRelease(RoutingNode routingNode, Map<String, Boolean> indexAutoReleaseEligibility,
boolean eligible) {
if (routingNode != null) {
for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName();
boolean value = indexAutoReleaseEligibility.getOrDefault(indexName, true);
indexAutoReleaseEligibility.put(indexName, value && eligible);
}
}
}

protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, boolean readOnly) {
// set read-only block but don't block on the response
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
String value = readOnly ? Boolean.TRUE.toString() : null;
client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, value).build()).execute();
}

protected void reroute() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ public class DiskThresholdSettings {
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
private static boolean autoReleaseIndexEnabled;

static {
final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block";
final String property = System.getProperty(AUTO_RELEASE_INDEX_ENABLED_KEY);
if (property == null) {
autoReleaseIndexEnabled = true;
} else if (Boolean.FALSE.toString().equals(property)){
autoReleaseIndexEnabled = false;
} else {
throw new IllegalArgumentException(AUTO_RELEASE_INDEX_ENABLED_KEY + " may only be unset or set to [false] but was [" +
property + "]");
}
}

public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
Expand Down Expand Up @@ -286,6 +300,10 @@ public ByteSizeValue getFreeBytesThresholdFloodStage() {
return freeBytesThresholdFloodStage;
}

public boolean isAutoReleaseIndexEnabled() {
return autoReleaseIndexEnabled;
}

public boolean includeRelocations() {
return includeRelocations;
}
Expand Down
Loading

0 comments on commit c90c84f

Please sign in to comment.