Skip to content

Commit

Permalink
Add changes to block non-paginated calls in cat shards, indices and s…
Browse files Browse the repository at this point in the history
…egments

Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Sep 19, 2024
1 parent b2a7136 commit 973f3be
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 37 deletions.
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ActionPlugin.ActionHandler;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestHeaderDefinition;
Expand Down Expand Up @@ -528,6 +529,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ExtensionsManager extensionsManager;
private final RequestLimitSettings requestLimitSettings;

public ActionModule(
Settings settings,
Expand Down Expand Up @@ -580,6 +582,7 @@ public ActionModule(
);

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, identityService);
requestLimitSettings = new RequestLimitSettings(clusterSettings, settings);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -960,8 +963,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClusterManagerAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction());
registerHandler.accept(new RestSegmentsAction());
registerHandler.accept(new RestIndicesAction(requestLimitSettings));
registerHandler.accept(new RestSegmentsAction(requestLimitSettings));
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
Expand Down Expand Up @@ -1048,6 +1051,8 @@ protected void configure() {

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);

bind(RequestLimitSettings.class).toInstance(requestLimitSettings);
}

public ActionFilters getActionFilters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private boolean requestLimitCheckSupported;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
this.requestLimitCheckSupported = false;
}

@Override
Expand All @@ -55,6 +57,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
this.requestLimitCheckSupported = requestLimitCheckSupported;
}

public boolean isRequestLimitCheckSupported() {
return this.requestLimitCheckSupported;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SHARDS;

/**
* Perform cat shards action
*
Expand All @@ -31,11 +36,18 @@
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {

private final NodeClient client;
private final RequestLimitSettings requestLimitSettings;

@Inject
public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) {
public TransportCatShardsAction(
NodeClient client,
TransportService transportService,
ActionFilters actionFilters,
RequestLimitSettings requestLimitSettings
) {
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
this.client = client;
this.requestLimitSettings = requestLimitSettings;
}

@Override
Expand Down Expand Up @@ -73,6 +85,10 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (shardsRequest.isRequestLimitCheckSupported()
&& requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) {
listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT));
}
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
Expand Down Expand Up @@ -793,7 +794,11 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,

RequestLimitSettings.CAT_INDICES_LIMIT_SETTING,
RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING,
RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING
)
)
);
Expand Down
149 changes: 149 additions & 0 deletions server/src/main/java/org/opensearch/rest/RequestLimitSettings.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rest;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.action.cat.RestIndicesAction;
import org.opensearch.rest.action.cat.RestSegmentsAction;
import org.opensearch.rest.action.cat.RestShardsAction;

import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required.
*/
public class RequestLimitSettings {

/**
* Enum to represent action names against whom we need to perform limit checks.
*/
public enum BlockAction {
CAT_INDICES,
CAT_SHARDS,
CAT_SEGMENTS
}

private volatile int catIndicesLimit;
private volatile int catShardsLimit;
private volatile int catSegmentsLimit;

/**
* Setting to enable circuit breaker on {@link RestIndicesAction}. The limit will be applied on number of indices.
*/
public static final Setting<Integer> CAT_INDICES_LIMIT_SETTING = Setting.intSetting(
"cat.indices.limit",
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Setting to enable circuit breaker on {@link RestShardsAction}. The limit will be applied on number of shards.
*/
public static final Setting<Integer> CAT_SHARDS_LIMIT_SETTING = Setting.intSetting(
"cat.shards.limit",
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Setting to enable circuit breaker on {@link RestSegmentsAction}. The limit will be applied on number of indices.
*/
public static final Setting<Integer> CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting(
"cat.segments.limit",
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) {
setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings));
setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings));
setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings));

clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting);
}

/**
* Method to check if the circuit breaker limit has reached for an action.
* The limits are controlled via dynamic settings.
*
* @param clusterState {@link ClusterState}
* @param actionToCheck {@link BlockAction}
* @return True/False
*/
public boolean isCircuitLimitBreached(final ClusterState clusterState, final BlockAction actionToCheck) {
if (Objects.isNull(clusterState)) return false;
switch (actionToCheck) {
case CAT_INDICES:
if (catIndicesLimit <= 0) return false;
int indicesCount = getTotalIndices(clusterState);
if (indicesCount > catIndicesLimit) return true;
break;
case CAT_SHARDS:
if (catShardsLimit <= 0) return false;
int totalShards = getTotalShards(clusterState);
if (totalShards > catShardsLimit) return true;
break;
case CAT_SEGMENTS:
if (catSegmentsLimit <= 0) return false;
if (getTotalIndices(clusterState) > catSegmentsLimit) return true;
break;
}
return false;
}

private void setCatShardsLimitSetting(final int catShardsLimit) {
this.catShardsLimit = catShardsLimit;
}

private void setCatIndicesLimitSetting(final int catIndicesLimit) {
this.catIndicesLimit = catIndicesLimit;
}

private void setCatSegmentsLimitSetting(final int catSegmentsLimit) {
this.catSegmentsLimit = catSegmentsLimit;
}

private static int getTotalIndices(final ClusterState clusterState) {
return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0);
}

private static int getTotalShards(final ClusterState clusterState) {
final RoutingTable routingTable = clusterState.getRoutingTable();
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
int totalShards = 0;
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
for (final Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) {
totalShards += indexShardRoutingTableEntry.getValue().getShards().size();
}
}
return totalShards;
}

// TODO: Evaluate if we can move this to common util.
private static <T> T chainWalk(Supplier<T> supplier, T defaultValue) {
try {
return supplier.get();
} catch (NullPointerException e) {
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
Expand Down Expand Up @@ -98,4 +99,12 @@ protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

/**
* Method to check if limits defined in {@link RequestLimitSettings} are applicable to an action.
*
* @return True / False status
*/
protected boolean isRequestLimitCheckSupported() {
return false;
}
}
Loading

0 comments on commit 973f3be

Please sign in to comment.