From bb47419eeaa3932f6b7f7c6d2b485e896ff420cc Mon Sep 17 00:00:00 2001 From: Bharathwaj G <58062316+bharath-techie@users.noreply.github.com> Date: Sun, 25 Sep 2022 18:23:58 +0530 Subject: [PATCH 1/2] Added rest layer changes for List all PITs and PIT segments (#4388) * Changes for list all and pit segments Signed-off-by: Bharathwaj G --- CHANGELOG.md | 1 + .../opensearch/client/RequestConverters.java | 4 + .../client/RestHighLevelClient.java | 35 ++++ .../java/org/opensearch/client/PitIT.java | 47 ++++- .../client/RestHighLevelClientTests.java | 1 + .../rest-api-spec/api/get_all_pits.json | 19 ++ .../rest-api-spec/test/pit/10_basic.yml | 12 ++ .../org/opensearch/action/ActionModule.java | 11 +- .../indices/segments/PitSegmentsRequest.java | 34 ++++ .../action/search/CreatePitController.java | 31 +++- .../action/search/GetAllPitNodesRequest.java | 11 -- .../action/search/GetAllPitNodesResponse.java | 53 +++++- .../action/search/GetAllPitsAction.java | 2 +- .../opensearch/action/search/ListPitInfo.java | 40 +++- .../action/search/NodesGetAllPitsAction.java | 23 --- .../opensearch/action/search/PitService.java | 13 +- .../search/TransportDeletePitAction.java | 16 +- .../search/TransportGetAllPitsAction.java | 80 ++++++-- .../TransportNodesGetAllPitsAction.java | 86 --------- .../java/org/opensearch/client/Client.java | 7 + .../client/support/AbstractClient.java | 8 + .../action/cat/RestPitSegmentsAction.java | 171 ++++++++++++++++++ .../action/search/RestDeletePitAction.java | 1 - .../action/search/RestGetAllPitsAction.java | 90 +++++++++ .../action/search/PitTestsUtil.java | 21 +++ .../search/TransportDeletePitActionTests.java | 24 --- .../search/CreatePitSingleNodeTests.java | 21 ++- .../opensearch/search/PitMultiNodeTests.java | 4 +- 28 files changed, 645 insertions(+), 221 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json delete mode 100644 server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java delete mode 100644 server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java diff --git a/CHANGELOG.md b/CHANGELOG.md index acb8ded704a7b..b6af9b7041db3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001)) - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) - Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064)) +- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388)) - Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342)) - Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343)) - Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344)) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java index eedc27d1d2ea7..91c339cc92c1b 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java @@ -498,6 +498,10 @@ static Request deleteAllPits() { return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all"); } + static Request getAllPits() { + return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all"); + } + static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException { Request request = new Request(HttpPost.METHOD_NAME, "/_msearch"); diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index 0c73c65f6175f..0a5880b778942 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -63,6 +63,7 @@ import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen ); } + /** + * Get all point in time searches using list all PITs API + * + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException { + return performRequestAndParseEntity( + new MainRequest(), + (request) -> RequestConverters.getAllPits(), + options, + GetAllPitNodesResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously get all point in time searches using list all PITs API + * + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return the response + */ + public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener listener) { + return performRequestAsyncAndParseEntity( + new MainRequest(), + (request) -> RequestConverters.getAllPits(), + options, + GetAllPitNodesResponse::fromXContent, + listener, + emptySet() + ); + } + /** * Clears one or more scroll ids using the Clear Scroll API. * diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java index 395ec6e46a7b3..cbb4db10cd519 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java @@ -18,12 +18,14 @@ import org.opensearch.action.search.DeletePitInfo; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.common.unit.TimeValue; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Tests point in time API with rest high level client @@ -52,21 +54,24 @@ public void indexDocuments() throws IOException { public void testCreateAndDeletePit() throws IOException { CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); - CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); - assertTrue(pitResponse.getId() != null); - assertEquals(1, pitResponse.getTotalShards()); - assertEquals(1, pitResponse.getSuccessfulShards()); - assertEquals(0, pitResponse.getFailedShards()); - assertEquals(0, pitResponse.getSkippedShards()); + CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + assertTrue(createPitResponse.getId() != null); + assertEquals(1, createPitResponse.getTotalShards()); + assertEquals(1, createPitResponse.getSuccessfulShards()); + assertEquals(0, createPitResponse.getFailedShards()); + assertEquals(0, createPitResponse.getSkippedShards()); + GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT); + List pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()); + assertTrue(pits.contains(createPitResponse.getId())); List pitIds = new ArrayList<>(); - pitIds.add(pitResponse.getId()); + pitIds.add(createPitResponse.getId()); DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds); DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); - assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId())); + assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId())); } - public void testDeleteAllPits() throws IOException { + public void testDeleteAllAndListAllPits() throws IOException { CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); @@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException { pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); assertTrue(pitResponse.getId() != null); assertTrue(pitResponse1.getId() != null); + GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT); + + List pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()); + assertTrue(pits.contains(pitResponse.getId())); + assertTrue(pits.contains(pitResponse1.getId())); ActionListener deletePitListener = new ActionListener<>() { @Override public void onResponse(DeletePitResponse response) { @@ -95,8 +105,27 @@ public void onFailure(Exception e) { } } }; + final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + + ActionListener getPitsListener = new ActionListener() { + @Override + public void onResponse(GetAllPitNodesResponse response) { + List pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()); + assertTrue(pits.contains(pitResponse3.getId())); + } + + @Override + public void onFailure(Exception e) { + if (!(e instanceof OpenSearchStatusException)) { + throw new AssertionError("List all PITs failed", e); + } + } + }; + highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener); highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener); // validate no pits case + getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT); + assertTrue(getAllPitResponse.getPitInfos().size() == 0); highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener); } } diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index cdd63743f2644..ad8da7244eae0 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -135,6 +135,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase { "ping", "info", "delete_all_pits", + "get_all_pits", // security "security.get_ssl_certificates", "security.authenticate", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json new file mode 100644 index 0000000000000..544a8cb11b002 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json @@ -0,0 +1,19 @@ +{ + "get_all_pits":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Lists all active point in time searches." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_search/point_in_time/_all", + "methods":[ + "GET" + ] + } + ] + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml index 2023bcc8f5c87..cd0c5b9126a9d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml @@ -79,6 +79,12 @@ - match: {hits.total: 3 } - length: {hits.hits: 1 } + - do: + get_all_pits: {} + + - match: {pits.0.pit_id: $pit_id} + - match: {pits.0.keep_alive: 82800000 } + - do: delete_pit: body: @@ -119,6 +125,12 @@ - set: {pit_id: pit_id} - match: { _shards.failed: 0} + - do: + get_all_pits: {} + + - match: {pits.0.pit_id: $pit_id} + - match: {pits.0.keep_alive: 82800000 } + - do: delete_all_pits: {} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 74be544123d9f..c5fcfdd047a09 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -238,16 +238,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.DeletePitAction; -import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.NodesGetAllPitsAction; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; import org.opensearch.action.search.TransportGetAllPitsAction; -import org.opensearch.action.search.TransportNodesGetAllPitsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -385,6 +383,7 @@ import org.opensearch.rest.action.cat.RestClusterManagerAction; import org.opensearch.rest.action.cat.RestNodeAttrsAction; import org.opensearch.rest.action.cat.RestNodesAction; +import org.opensearch.rest.action.cat.RestPitSegmentsAction; import org.opensearch.rest.action.cat.RestPluginsAction; import org.opensearch.rest.action.cat.RestRepositoriesAction; import org.opensearch.rest.action.cat.RestSegmentsAction; @@ -413,6 +412,7 @@ import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestDeletePitAction; import org.opensearch.rest.action.search.RestExplainAction; +import org.opensearch.rest.action.search.RestGetAllPitsAction; import org.opensearch.rest.action.search.RestMultiSearchAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; @@ -675,10 +675,9 @@ public void reg // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); - actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class); - actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class); + actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); // Remote Store actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class); @@ -858,6 +857,8 @@ public void initRestHandlers(Supplier nodesInCluster) { // Point in time API registerHandler.accept(new RestCreatePitAction()); registerHandler.accept(new RestDeletePitAction()); + registerHandler.accept(new RestGetAllPitsAction(nodesInCluster)); + registerHandler.accept(new RestPitSegmentsAction(nodesInCluster)); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index 84f5e5ad6a1e8..de0d390cddc4a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -13,6 +13,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; @@ -84,4 +85,37 @@ public ActionRequestValidationException validate() { } return validationException; } + + public void fromXContent(XContentParser parser) throws IOException { + pitIds.clear(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Malformed content, must start with an object"); + } else { + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if ("pit_id".equals(currentFieldName)) { + if (token == XContentParser.Token.START_ARRAY) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token.isValue() == false) { + throw new IllegalArgumentException("pit_id array element should only contain PIT identifier"); + } + pitIds.add(parser.text()); + } + } else { + if (token.isValue() == false) { + throw new IllegalArgumentException("pit_id element should only contain PIT identifier"); + } + pitIds.add(parser.text()); + } + } else { + throw new IllegalArgumentException( + "Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] " + ); + } + } + } + } } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index f64dd3d7efae6..745139fd1f1e8 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -98,6 +98,11 @@ public void executeCreatePit( task.getParentTaskId(), Collections.emptyMap() ); + /** + * This is needed for cross cluster functionality to work with PITs and current ccsMinimizeRoundTrips is + * not supported for point in time + */ + searchRequest.setCcsMinimizeRoundtrips(false); /** * Phase 1 of create PIT */ @@ -193,6 +198,29 @@ void executeUpdatePitId( ); for (Map.Entry entry : contextId.shards().entrySet()) { DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); + if (node == null) { + node = this.clusterService.state().getNodes().get(entry.getValue().getNode()); + } + if (node == null) { + logger.error( + () -> new ParameterizedMessage( + "Create pit update phase for PIT ID [{}] failed " + "because node [{}] not found", + searchResponse.pointInTimeId(), + entry.getValue().getNode() + ) + ); + groupedActionListener.onFailure( + new OpenSearchException( + "Create pit update phase for PIT ID [" + + searchResponse.pointInTimeId() + + "] failed because node[" + + entry.getValue().getNode() + + "] " + + "not found" + ) + ); + return; + } try { final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node); searchTransportService.updatePitContext( @@ -206,11 +234,12 @@ void executeUpdatePitId( groupedActionListener ); } catch (Exception e) { + String nodeName = node.getName(); logger.error( () -> new ParameterizedMessage( "Create pit update phase failed for PIT ID [{}] on node [{}]", searchResponse.pointInTimeId(), - node + nodeName ), e ); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java index 340f9b842adbf..b4ad2f6641087 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -21,22 +21,11 @@ */ public class GetAllPitNodesRequest extends BaseNodesRequest { - // Security plugin intercepts and sets the response with permitted PIT contexts - private GetAllPitNodesResponse getAllPitNodesResponse; - @Inject public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) { super(concreteNodes); } - public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) { - this.getAllPitNodesResponse = getAllPitNodesResponse; - } - - public GetAllPitNodesResponse getGetAllPitNodesResponse() { - return getAllPitNodesResponse; - } - public GetAllPitNodesRequest(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java index 091447798cf5f..610520a4c1f9d 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -11,10 +11,13 @@ import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.ClusterName; +import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ConstructingObjectParser; import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; @@ -24,6 +27,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; + /** * This class transforms active PIT objects from all nodes to unique PIT objects */ @@ -40,13 +45,13 @@ public GetAllPitNodesResponse(StreamInput in) throws IOException { public GetAllPitNodesResponse( ClusterName clusterName, - List getAllPitNodeResponse, + List getAllPitNodeResponseList, List failures ) { - super(clusterName, getAllPitNodeResponse, failures); + super(clusterName, getAllPitNodeResponseList, failures); Set uniquePitIds = new HashSet<>(); pitInfos.addAll( - getAllPitNodeResponse.stream() + getAllPitNodeResponseList.stream() .flatMap(p -> p.getPitInfos().stream().filter(t -> uniquePitIds.add(t.getPitId()))) .collect(Collectors.toList()) ); @@ -60,14 +65,30 @@ public GetAllPitNodesResponse(List listPitInfos, GetAllPitNodesResp pitInfos.addAll(listPitInfos); } + public GetAllPitNodesResponse( + List listPitInfos, + ClusterName clusterName, + List getAllPitNodeResponseList, + List failures + ) { + super(clusterName, getAllPitNodeResponseList, failures); + pitInfos.addAll(listPitInfos); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startArray("pitInfos"); + builder.startArray("pits"); for (ListPitInfo pit : pitInfos) { pit.toXContent(builder, params); } builder.endArray(); + if (!failures().isEmpty()) { + builder.startArray("failures"); + for (FailedNodeException e : failures()) { + e.toXContent(builder, params); + } + } builder.endObject(); return builder; } @@ -85,4 +106,28 @@ public void writeNodesTo(StreamOutput out, List nodes) th public List getPitInfos() { return Collections.unmodifiableList(new ArrayList<>(pitInfos)); } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "get_all_pits_response", + true, + (Object[] parsedObjects) -> { + @SuppressWarnings("unchecked") + List listPitInfos = (List) parsedObjects[0]; + List failures = null; + if (parsedObjects.length > 1) { + failures = (List) parsedObjects[1]; + } + if (failures == null) { + failures = new ArrayList<>(); + } + return new GetAllPitNodesResponse(listPitInfos, new ClusterName(""), new ArrayList<>(), failures); + } + ); + static { + PARSER.declareObjectArray(constructorArg(), ListPitInfo.PARSER, new ParseField("pits")); + } + + public static GetAllPitNodesResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java index 16e65cb785a7d..8fe901add5e3a 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -11,7 +11,7 @@ import org.opensearch.action.ActionType; /** - * Action type for listing all PIT reader contexts + * Action type for retrieving all PIT reader contexts from nodes */ public class GetAllPitsAction extends ActionType { public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); diff --git a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java index 4499e7d6e8ef5..249b0a9ab3baa 100644 --- a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java @@ -8,14 +8,18 @@ package org.opensearch.action.search; +import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ConstructingObjectParser; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; import java.io.IOException; +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; + /** * This holds information about pit reader context such as pit id and creation time */ @@ -36,16 +40,6 @@ public ListPitInfo(StreamInput in) throws IOException { this.keepAlive = in.readLong(); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field("pitId", pitId); - builder.field("creationTime", creationTime); - builder.field("keepAlive", keepAlive); - builder.endObject(); - return builder; - } - public String getPitId() { return pitId; } @@ -60,4 +54,30 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(creationTime); out.writeLong(keepAlive); } + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "list_pit_info", + true, + args -> new ListPitInfo((String) args[0], (long) args[1], (long) args[2]) + ); + + private static final ParseField CREATION_TIME = new ParseField("creation_time"); + private static final ParseField PIT_ID = new ParseField("pit_id"); + private static final ParseField KEEP_ALIVE = new ParseField("keep_alive"); + static { + PARSER.declareString(constructorArg(), PIT_ID); + PARSER.declareLong(constructorArg(), CREATION_TIME); + PARSER.declareLong(constructorArg(), KEEP_ALIVE); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(PIT_ID.getPreferredName(), pitId); + builder.field(CREATION_TIME.getPreferredName(), creationTime); + builder.field(KEEP_ALIVE.getPreferredName(), keepAlive); + builder.endObject(); + return builder; + } + } diff --git a/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java deleted file mode 100644 index af41f7d49551c..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.ActionType; - -/** - * Action type for retrieving all PIT reader contexts from nodes - */ -public class NodesGetAllPitsAction extends ActionType { - public static final NodesGetAllPitsAction INSTANCE = new NodesGetAllPitsAction(); - public static final String NAME = "cluster:admin/point_in_time/read_from_nodes"; - - private NodesGetAllPitsAction() { - super(NAME, GetAllPitNodesResponse::new); - } -} diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index ff068397ad94e..f42d84477f9a3 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -93,7 +93,10 @@ public void deletePitContexts( for (Map.Entry> entry : nodeToContextsMap.entrySet()) { String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias(); - final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + if (node == null) { + node = this.clusterService.state().getNodes().get(entry.getValue().get(0).getSearchContextIdForNode().getNode()); + } if (node == null) { logger.error( () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) @@ -108,7 +111,8 @@ public void deletePitContexts( final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node); searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener); } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e); + String nodeName = node.getName(); + logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", nodeName), e); List deletePitInfos = new ArrayList<>(); for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); @@ -173,10 +177,11 @@ public void getAllPits(ActionListener getAllPitsListener nodes.add(node); } DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]); + GetAllPitNodesRequest getAllPitNodesRequest = new GetAllPitNodesRequest(disNodesArr); transportService.sendRequest( transportService.getLocalNode(), - NodesGetAllPitsAction.NAME, - new GetAllPitNodesRequest(disNodesArr), + GetAllPitsAction.NAME, + getAllPitNodesRequest, new TransportResponseHandler() { @Override diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index 19abe2361290d..b85fe302a748f 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -11,7 +11,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.tasks.Task; @@ -28,9 +27,6 @@ */ public class TransportDeletePitAction extends HandledTransportAction { private final NamedWriteableRegistry namedWriteableRegistry; - private TransportSearchAction transportSearchAction; - private final ClusterService clusterService; - private final SearchTransportService searchTransportService; private final PitService pitService; @Inject @@ -38,16 +34,10 @@ public TransportDeletePitAction( TransportService transportService, ActionFilters actionFilters, NamedWriteableRegistry namedWriteableRegistry, - TransportSearchAction transportSearchAction, - ClusterService clusterService, - SearchTransportService searchTransportService, PitService pitService ) { super(DeletePitAction.NAME, transportService, actionFilters, DeletePitRequest::new); this.namedWriteableRegistry = namedWriteableRegistry; - this.transportSearchAction = transportSearchAction; - this.clusterService = clusterService; - this.searchTransportService = searchTransportService; this.pitService = pitService; } @@ -57,11 +47,7 @@ public TransportDeletePitAction( @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { List pitIds = request.getPitIds(); - // when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty - // and in this case return empty response - if (pitIds.isEmpty()) { - listener.onResponse(new DeletePitResponse(new ArrayList<>())); - } else if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { deleteAllPits(listener); } else { deletePits(listener, request); diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java index c8529c5b02bd4..39299f9a33b18 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -8,31 +8,79 @@ package org.opensearch.action.search; -import org.opensearch.action.ActionListener; +import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; -import org.opensearch.tasks.Task; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; +import java.util.List; + /** - * Transport action to get all active PIT contexts across the cluster + * Transport action to get all active PIT contexts across all nodes */ -public class TransportGetAllPitsAction extends HandledTransportAction { - private final PitService pitService; +public class TransportGetAllPitsAction extends TransportNodesAction< + GetAllPitNodesRequest, + GetAllPitNodesResponse, + GetAllPitNodeRequest, + GetAllPitNodeResponse> { + private final SearchService searchService; @Inject - public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) { - super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in)); - this.pitService = pitService; + public TransportGetAllPitsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService + ) { + super( + GetAllPitsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPitNodesRequest::new, + GetAllPitNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPitNodeResponse.class + ); + this.searchService = searchService; + } + + @Override + protected GetAllPitNodesResponse newResponse( + GetAllPitNodesRequest request, + List getAllPitNodeResponses, + List failures + ) { + return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeResponses, failures); + } + + @Override + protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { + return new GetAllPitNodeRequest(); + } + + @Override + protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPitNodeResponse(in); } - protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener listener) { - // If security plugin intercepts the request, it'll replace all PIT IDs with permitted PIT IDs - if (request.getGetAllPitNodesResponse() != null) { - listener.onResponse(request.getGetAllPitNodesResponse()); - } else { - pitService.getAllPits(listener); - } + /** + * This retrieves all active PITs in the node + */ + @Override + protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { + GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( + transportService.getLocalNode(), + searchService.getAllPITReaderContexts() + ); + return nodeResponse; } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java deleted file mode 100644 index 520830cd293f0..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.FailedNodeException; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.nodes.TransportNodesAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.search.SearchService; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -import java.io.IOException; -import java.util.List; - -/** - * Transport action to get all active PIT contexts across all nodes - */ -public class TransportNodesGetAllPitsAction extends TransportNodesAction< - GetAllPitNodesRequest, - GetAllPitNodesResponse, - GetAllPitNodeRequest, - GetAllPitNodeResponse> { - private final SearchService searchService; - - @Inject - public TransportNodesGetAllPitsAction( - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - SearchService searchService - ) { - super( - NodesGetAllPitsAction.NAME, - threadPool, - clusterService, - transportService, - actionFilters, - GetAllPitNodesRequest::new, - GetAllPitNodeRequest::new, - ThreadPool.Names.SAME, - GetAllPitNodeResponse.class - ); - this.searchService = searchService; - } - - @Override - protected GetAllPitNodesResponse newResponse( - GetAllPitNodesRequest request, - List getAllPitNodeRespons, - List failures - ) { - return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); - } - - @Override - protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { - return new GetAllPitNodeRequest(); - } - - @Override - protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { - return new GetAllPitNodeResponse(in); - } - - /** - * This retrieves all active PITs in the node - */ - @Override - protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { - GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( - transportService.getLocalNode(), - searchService.getAllPITReaderContexts() - ); - return nodeResponse; - } -} diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index 94043d5c3c89f..f20f0b4246cb6 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -64,6 +64,8 @@ import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; @@ -341,6 +343,11 @@ public interface Client extends OpenSearchClient, Releasable { */ void deletePits(DeletePitRequest deletePITRequest, ActionListener listener); + /** + * Get all active point in time searches + */ + void getAllPits(GetAllPitNodesRequest getAllPitNodesRequest, ActionListener listener); + /** * Get information of segments of one or more PITs */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index bc80a2ba92bf8..21cd01bf65a45 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -335,10 +335,13 @@ import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchRequestBuilder; @@ -595,6 +598,11 @@ public void deletePits(final DeletePitRequest deletePITRequest, final ActionList execute(DeletePitAction.INSTANCE, deletePITRequest, listener); } + @Override + public void getAllPits(final GetAllPitNodesRequest getAllPitNodesRequest, final ActionListener listener) { + execute(GetAllPitsAction.INSTANCE, getAllPitNodesRequest, listener); + } + @Override public void pitSegments(final PitSegmentsRequest request, final ActionListener listener) { execute(PitSegmentsAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java new file mode 100644 index 0000000000000..ba9606e8eb444 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.cat; + +import org.opensearch.action.admin.indices.segments.IndexSegments; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.Table; +import org.opensearch.index.engine.Segment; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for pit segments + */ +public class RestPitSegmentsAction extends AbstractCatAction { + private final Supplier nodesInCluster; + + public RestPitSegmentsAction(Supplier nodesInCluster) { + super(); + this.nodesInCluster = nodesInCluster; + } + + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_cat/pit_segments/_all"), new Route(GET, "/_cat/pit_segments"))); + } + + @Override + public String getName() { + return "cat_pit_segments_action"; + } + + @Override + public boolean allowSystemIndexAccessByDefault() { + return true; + } + + @Override + protected BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { + String allPitIdsQualifier = "_all"; + final PitSegmentsRequest pitSegmentsRequest; + if (request.path().contains(allPitIdsQualifier)) { + pitSegmentsRequest = new PitSegmentsRequest(allPitIdsQualifier); + } else { + pitSegmentsRequest = new PitSegmentsRequest(); + try { + request.withContentOrSourceParamParserOrNull((xContentParser -> { + if (xContentParser != null) { + pitSegmentsRequest.fromXContent(xContentParser); + } + })); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse request body", e); + } + } + return channel -> client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest, new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentResponse) throws Exception { + final Map indicesSegments = indicesSegmentResponse.getIndices(); + Table tab = buildTable(request, indicesSegments); + return RestTable.buildResponse(tab, channel); + } + }); + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_cat/pit_segments\n"); + sb.append("/_cat/pit_segments/{pit_id}\n"); + } + + @Override + protected Table getTableWithHeader(RestRequest request) { + Table table = new Table(); + table.startHeaders(); + table.addCell("index", "default:true;alias:i,idx;desc:index name"); + table.addCell("shard", "default:true;alias:s,sh;desc:shard name"); + table.addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica"); + table.addCell("ip", "default:true;desc:ip of node where it lives"); + table.addCell("id", "default:false;desc:unique id of node where it lives"); + table.addCell("segment", "default:true;alias:seg;desc:segment name"); + table.addCell("generation", "default:true;alias:g,gen;text-align:right;desc:segment generation"); + table.addCell("docs.count", "default:true;alias:dc,docsCount;text-align:right;desc:number of docs in segment"); + table.addCell("docs.deleted", "default:true;alias:dd,docsDeleted;text-align:right;desc:number of deleted docs in segment"); + table.addCell("size", "default:true;alias:si;text-align:right;desc:segment size in bytes"); + table.addCell("size.memory", "default:true;alias:sm,sizeMemory;text-align:right;desc:segment memory in bytes"); + table.addCell("committed", "default:true;alias:ic,isCommitted;desc:is segment committed"); + table.addCell("searchable", "default:true;alias:is,isSearchable;desc:is segment searched"); + table.addCell("version", "default:true;alias:v,ver;desc:version"); + table.addCell("compound", "default:true;alias:ico,isCompound;desc:is segment compound"); + table.endHeaders(); + return table; + } + + private Table buildTable(final RestRequest request, Map indicesSegments) { + Table table = getTableWithHeader(request); + + DiscoveryNodes nodes = this.nodesInCluster.get(); + table.startRow(); + table.addCell("index", "default:true;alias:i,idx;desc:index name"); + table.addCell("shard", "default:true;alias:s,sh;desc:shard name"); + table.addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica"); + table.addCell("ip", "default:true;desc:ip of node where it lives"); + table.addCell("id", "default:false;desc:unique id of node where it lives"); + table.addCell("segment", "default:true;alias:seg;desc:segment name"); + table.addCell("generation", "default:true;alias:g,gen;text-align:right;desc:segment generation"); + table.addCell("docs.count", "default:true;alias:dc,docsCount;text-align:right;desc:number of docs in segment"); + table.addCell("docs.deleted", "default:true;alias:dd,docsDeleted;text-align:right;desc:number of deleted docs in segment"); + table.addCell("size", "default:true;alias:si;text-align:right;desc:segment size in bytes"); + table.addCell("size.memory", "default:true;alias:sm,sizeMemory;text-align:right;desc:segment memory in bytes"); + table.addCell("committed", "default:true;alias:ic,isCommitted;desc:is segment committed"); + table.addCell("searchable", "default:true;alias:is,isSearchable;desc:is segment searched"); + table.addCell("version", "default:true;alias:v,ver;desc:version"); + table.addCell("compound", "default:true;alias:ico,isCompound;desc:is segment compound"); + table.endRow(); + for (IndexSegments indexSegments : indicesSegments.values()) { + Map shards = indexSegments.getShards(); + for (IndexShardSegments indexShardSegments : shards.values()) { + ShardSegments[] shardSegments = indexShardSegments.getShards(); + for (ShardSegments shardSegment : shardSegments) { + List segments = shardSegment.getSegments(); + for (Segment segment : segments) { + table.startRow(); + table.addCell(shardSegment.getShardRouting().getIndexName()); + table.addCell(shardSegment.getShardRouting().getId()); + table.addCell(shardSegment.getShardRouting().primary() ? "p" : "r"); + table.addCell(nodes.get(shardSegment.getShardRouting().currentNodeId()).getHostAddress()); + table.addCell(shardSegment.getShardRouting().currentNodeId()); + table.addCell(segment.getName()); + table.addCell(segment.getGeneration()); + table.addCell(segment.getNumDocs()); + table.addCell(segment.getDeletedDocs()); + table.addCell(segment.getSize()); + table.addCell(0L); + table.addCell(segment.isCommitted()); + table.addCell(segment.isSearch()); + table.addCell(segment.getVersion()); + table.addCell(segment.isCompound()); + table.endRow(); + + } + } + } + } + return table; + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java index 452e66f8f5018..b19a7505741cc 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java @@ -26,7 +26,6 @@ * Rest action for deleting PIT contexts */ public class RestDeletePitAction extends BaseRestHandler { - @Override public String getName() { return "delete_pit_action"; diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java new file mode 100644 index 0000000000000..0e1febe9d2a61 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java @@ -0,0 +1,90 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.action.RestBuilderListener; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for retrieving all active PIT IDs across all nodes + */ +public class RestGetAllPitsAction extends BaseRestHandler { + + private final Supplier nodesInCluster; + + public RestGetAllPitsAction(Supplier nodesInCluster) { + super(); + this.nodesInCluster = nodesInCluster; + } + + @Override + public String getName() { + return "get_all_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final List nodes = new ArrayList<>(); + for (DiscoveryNode node : nodesInCluster.get()) { + nodes.add(node); + } + DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]); + GetAllPitNodesRequest getAllPitNodesRequest = new GetAllPitNodesRequest(disNodesArr); + return channel -> client.getAllPits(getAllPitNodesRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(final GetAllPitNodesResponse getAllPITNodesResponse, XContentBuilder builder) + throws Exception { + builder.startObject(); + if (getAllPITNodesResponse.hasFailures()) { + builder.startArray("failures"); + for (int idx = 0; idx < getAllPITNodesResponse.failures().size(); idx++) { + builder.startObject(); + builder.field( + getAllPITNodesResponse.failures().get(idx).nodeId(), + getAllPITNodesResponse.failures().get(idx).getDetailedMessage() + ); + builder.endObject(); + } + builder.endArray(); + } + builder.field("pits", getAllPITNodesResponse.getPitInfos()); + builder.endObject(); + if (getAllPITNodesResponse.hasFailures() && getAllPITNodesResponse.getPitInfos().isEmpty()) { + return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder); + } + return new BytesRestResponse(RestStatus.OK, builder); + } + }); + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList(new Route(GET, "/_search/point_in_time/_all"))); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index 60a31c62dc32d..3962a4a11fc90 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -30,6 +30,7 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.ShardSearchContextId; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -134,6 +135,22 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio Assert.assertEquals(0, getPitResponse.getPitInfos().size()); } + public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client, String pitId) { + PitSegmentsRequest pitSegmentsRequest; + pitSegmentsRequest = new PitSegmentsRequest(); + List pitIds = new ArrayList<>(); + pitIds.add(pitId); + pitSegmentsRequest.clearAndSetPitIds(pitIds); + IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest).actionGet(); + assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); + assertEquals(indicesSegmentResponse.getIndices().isEmpty(), isEmpty); + if (!isEmpty) { + assertTrue(indicesSegmentResponse.getIndices().get(index) != null); + assertTrue(indicesSegmentResponse.getIndices().get(index).getIndex().equalsIgnoreCase(index)); + assertEquals(expectedShardSize, indicesSegmentResponse.getIndices().get(index).getShards().size()); + } + } + public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client) { PitSegmentsRequest pitSegmentsRequest = new PitSegmentsRequest("_all"); IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest).actionGet(); @@ -149,4 +166,8 @@ public static void assertSegments(boolean isEmpty, String index, long expectedSh public static void assertSegments(boolean isEmpty, Client client) { assertSegments(isEmpty, "index", 2, client); } + + public static void assertSegments(boolean isEmpty, Client client, String pitId) { + assertSegments(isEmpty, "index", 2, client, pitId); + } } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index bdc0440a89f69..d6de562d616fa 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -172,9 +172,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -250,9 +247,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); @@ -319,9 +313,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -378,9 +369,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -446,9 +434,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -526,9 +511,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); @@ -602,9 +584,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); @@ -682,9 +661,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 9a28f1800847e..ae7f795f57ee7 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -32,6 +32,8 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -75,7 +77,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -105,7 +107,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), response.getId()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); @@ -126,7 +128,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -228,7 +230,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); validatePitStats("index", 1, 0, 0); @@ -295,14 +297,16 @@ public void testCreatePitMoreThanMaxOpenPitContexts() throws Exception { CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); SearchService service = getInstanceFromNode(SearchService.class); + List pitIds = new ArrayList<>(); try { for (int i = 0; i < 1000; i++) { - client().execute(CreatePitAction.INSTANCE, request).get(); + CreatePitResponse cpr = client().execute(CreatePitAction.INSTANCE, request).actionGet(); + if (cpr.getId() != null) pitIds.add(cpr.getId()); } } catch (Exception ex) { assertTrue( - ex.getMessage() + ((SearchPhaseExecutionException) ex).getDetailedMessage() .contains( "Trying to create too many Point In Time contexts. " + "Must be less than or equal to: [" @@ -315,7 +319,7 @@ public void testCreatePitMoreThanMaxOpenPitContexts() throws Exception { final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); validatePitStats("index", maxPitContexts, 0, 0); // deleteall - DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds.toArray(new String[pitIds.size()])); /** * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context @@ -567,7 +571,7 @@ public void testConcurrentSearches() throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); @@ -603,7 +607,6 @@ public void testConcurrentSearches() throws Exception { validatePitStats("index", 0, 1, 0); validatePitStats("index", 0, 1, 1); PitTestsUtil.assertGetAllPitsEmpty(client()); - assertSegments(true, client()); } public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException, diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index d29ccf5b97138..a23e4141a78e4 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -86,7 +86,7 @@ public void testPit() throws Exception { assertEquals(2, searchResponse.getTotalShards()); validatePitStats("index", 2, 2); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); } public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { @@ -114,7 +114,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, "index", 1, client()); + assertSegments(false, "index", 1, client(), pitResponse.getId()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index") From 9b2202aeb3a881c0c2ad72522fa1e2656d5d4d1e Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 27 Sep 2022 21:32:29 +0530 Subject: [PATCH 2/2] Add PUT api to update shard routing weights (#4272) * Add PUT api to update shard routing weights Signed-off-by: Anshu Agarwal --- CHANGELOG.md | 2 +- .../client/RestHighLevelClientTests.java | 3 +- .../api/cluster.put_weighted_routing.json | 25 ++ .../org/opensearch/action/ActionModule.java | 5 + .../put/ClusterAddWeightedRoutingAction.java | 26 ++ .../put/ClusterPutWeightedRoutingRequest.java | 173 +++++++++++++ ...usterPutWeightedRoutingRequestBuilder.java | 33 +++ .../ClusterPutWeightedRoutingResponse.java | 29 +++ .../TransportAddWeightedRoutingAction.java | 128 ++++++++++ .../routing/weighted/put/package-info.java | 10 + .../opensearch/client/ClusterAdminClient.java | 19 ++ .../java/org/opensearch/client/Requests.java | 10 + .../client/support/AbstractClient.java | 22 ++ .../routing/WeightedRoutingService.java | 88 +++++++ .../RestClusterPutWeightedRoutingAction.java | 58 +++++ ...ClusterPutWeightedRoutingRequestTests.java | 65 +++++ .../routing/WeightedRoutingServiceTests.java | 234 ++++++++++++++++++ ...tClusterAddWeightedRoutingActionTests.java | 76 ++++++ 18 files changed, 1004 insertions(+), 2 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b6af9b7041db3..3b15a28c55c43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,10 +45,10 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) - [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) +- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272)) - Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580)) - Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) - ### Deprecated ### Removed diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index ad8da7244eae0..c0eb344a64dba 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -888,7 +888,8 @@ public void testApiNamingConventions() throws Exception { "nodes.usage", "nodes.reload_secure_settings", "search_shards", - "remote_store.restore", }; + "remote_store.restore", + "cluster.put_weighted_routing", }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); Set deprecatedMethods = new HashSet<>(); deprecatedMethods.add("indices.force_merge"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json new file mode 100644 index 0000000000000..88498517ba336 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json @@ -0,0 +1,25 @@ +{ + "cluster.put_weighted_routing": { + "documentation": { + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/put", + "description": "Updates weighted shard routing weights" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_cluster/routing/awareness/{attribute}/weights", + "methods": [ + "PUT" + ], + "parts": { + "attribute": { + "type": "string", + "description": "Awareness attribute name" + } + } + } + ] + } + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index c5fcfdd047a09..e745e505d9fe5 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -79,6 +79,8 @@ import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; @@ -294,6 +296,7 @@ import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction; import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction; +import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction; import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction; import org.opensearch.rest.action.admin.cluster.RestClusterSearchShardsAction; import org.opensearch.rest.action.admin.cluster.RestClusterStateAction; @@ -563,6 +566,7 @@ public void reg actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); + actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); @@ -744,6 +748,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestCloseIndexAction()); registerHandler.accept(new RestOpenIndexAction()); registerHandler.accept(new RestAddIndexBlockAction()); + registerHandler.accept(new RestClusterPutWeightedRoutingAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java new file mode 100644 index 0000000000000..65c5ccca71461 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.ActionType; + +/** + * Action to update weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public final class ClusterAddWeightedRoutingAction extends ActionType { + + public static final ClusterAddWeightedRoutingAction INSTANCE = new ClusterAddWeightedRoutingAction(); + public static final String NAME = "cluster:admin/routing/awareness/weights/put"; + + private ClusterAddWeightedRoutingAction() { + super(NAME, ClusterPutWeightedRoutingResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java new file mode 100644 index 0000000000000..af229fb12b4f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchGenerationException; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Request to update weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest { + private static final Logger logger = LogManager.getLogger(ClusterPutWeightedRoutingRequest.class); + + private WeightedRouting weightedRouting; + private String attributeName; + + public ClusterPutWeightedRoutingRequest() {} + + public WeightedRouting getWeightedRouting() { + return weightedRouting; + } + + public ClusterPutWeightedRoutingRequest setWeightedRouting(WeightedRouting weightedRouting) { + this.weightedRouting = weightedRouting; + return this; + } + + public void attributeName(String attributeName) { + this.attributeName = attributeName; + } + + public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException { + super(in); + weightedRouting = new WeightedRouting(in); + } + + public ClusterPutWeightedRoutingRequest(String attributeName) { + this.attributeName = attributeName; + } + + public void setWeightedRouting(Map source) { + try { + if (source.isEmpty()) { + throw new OpenSearchParseException(("Empty request body")); + } + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(source); + setWeightedRouting(BytesReference.bytes(builder), builder.contentType()); + } catch (IOException e) { + throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e); + } + } + + public void setWeightedRouting(BytesReference source, XContentType contentType) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) + ) { + String attrValue = null; + Map weights = new HashMap<>(); + Double attrWeight = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrValue = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrWeight = Double.parseDouble(parser.text()); + weights.put(attrValue, attrWeight); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + this.weightedRouting = new WeightedRouting(this.attributeName, weights); + } catch (IOException e) { + logger.error("error while parsing put for weighted routing request object", e); + } + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (weightedRouting == null) { + validationException = addValidationError("Weighted routing request object is null", validationException); + } + if (weightedRouting.attributeName() == null || weightedRouting.attributeName().isEmpty()) { + validationException = addValidationError("Attribute name is missing", validationException); + } + if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { + validationException = addValidationError("Weights are missing", validationException); + } + int countValueWithZeroWeights = 0; + double weight; + try { + for (Object value : weightedRouting.weights().values()) { + if (value == null) { + validationException = addValidationError(("Weight is null"), validationException); + } else { + weight = Double.parseDouble(value.toString()); + countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights; + } + } + } catch (NumberFormatException e) { + validationException = addValidationError(("Weight is not a number"), validationException); + } + if (countValueWithZeroWeights > 1) { + validationException = addValidationError( + (String.format(Locale.ROOT, "More than one [%d] value has weight set as 0", countValueWithZeroWeights)), + validationException + ); + } + return validationException; + } + + /** + * @param source weights definition from request body + * @return this request + */ + public ClusterPutWeightedRoutingRequest source(Map source) { + setWeightedRouting(source); + return this; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + weightedRouting.writeTo(out); + } + + @Override + public String toString() { + return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}"; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java new file mode 100644 index 0000000000000..b437f4c54d8d6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder; +import org.opensearch.client.OpenSearchClient; +import org.opensearch.cluster.routing.WeightedRouting; + +/** + * Request builder to update weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterPutWeightedRoutingRequestBuilder extends ClusterManagerNodeOperationRequestBuilder< + ClusterPutWeightedRoutingRequest, + ClusterPutWeightedRoutingResponse, + ClusterPutWeightedRoutingRequestBuilder> { + public ClusterPutWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterAddWeightedRoutingAction action) { + super(client, action, new ClusterPutWeightedRoutingRequest()); + } + + public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRouting weightedRouting) { + request.setWeightedRouting(weightedRouting); + return this; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java new file mode 100644 index 0000000000000..b0154aceef0c2 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Response from updating weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterPutWeightedRoutingResponse extends AcknowledgedResponse { + public ClusterPutWeightedRoutingResponse(boolean acknowledged) { + super(acknowledged); + } + + public ClusterPutWeightedRoutingResponse(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java new file mode 100644 index 0000000000000..8c29ab2199848 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.WeightedRoutingService; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Transport action for updating weights for weighted round-robin search routing policy + * + * @opensearch.internal + */ +public class TransportAddWeightedRoutingAction extends TransportClusterManagerNodeAction< + ClusterPutWeightedRoutingRequest, + ClusterPutWeightedRoutingResponse> { + + private final WeightedRoutingService weightedRoutingService; + private volatile List awarenessAttributes; + + @Inject + public TransportAddWeightedRoutingAction( + Settings settings, + ClusterSettings clusterSettings, + TransportService transportService, + ClusterService clusterService, + WeightedRoutingService weightedRoutingService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + ClusterAddWeightedRoutingAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + ClusterPutWeightedRoutingRequest::new, + indexNameExpressionResolver + ); + this.weightedRoutingService = weightedRoutingService; + this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, + this::setAwarenessAttributes + ); + } + + List getAwarenessAttributes() { + return awarenessAttributes; + } + + private void setAwarenessAttributes(List awarenessAttributes) { + this.awarenessAttributes = awarenessAttributes; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected ClusterPutWeightedRoutingResponse read(StreamInput in) throws IOException { + return new ClusterPutWeightedRoutingResponse(in); + } + + @Override + protected void clusterManagerOperation( + ClusterPutWeightedRoutingRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + verifyAwarenessAttribute(request.getWeightedRouting().attributeName()); + weightedRoutingService.registerWeightedRoutingMetadata( + request, + ActionListener.delegateFailure( + listener, + (delegatedListener, response) -> { + delegatedListener.onResponse(new ClusterPutWeightedRoutingResponse(response.isAcknowledged())); + } + ) + ); + } + + private void verifyAwarenessAttribute(String attributeName) { + if (getAwarenessAttributes().contains(attributeName) == false) { + ActionRequestValidationException validationException = null; + + validationException = addValidationError( + String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + validationException + ); + throw validationException; + } + } + + @Override + protected ClusterBlockException checkBlock(ClusterPutWeightedRoutingRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java new file mode 100644 index 0000000000000..4f18b220cd343 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** add/update weighted-round robin shard routing weights. */ +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 7a7b98bf724f6..2f62ab13b131c 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -86,6 +86,9 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; @@ -791,4 +794,20 @@ public interface ClusterAdminClient extends OpenSearchClient { * Delete specified dangling indices. */ ActionFuture deleteDanglingIndex(DeleteDanglingIndexRequest request); + + /** + * Updates weights for weighted round-robin search routing policy. + */ + ActionFuture putWeightedRouting(ClusterPutWeightedRoutingRequest request); + + /** + * Updates weights for weighted round-robin search routing policy. + */ + void putWeightedRouting(ClusterPutWeightedRoutingRequest request, ActionListener listener); + + /** + * Updates weights for weighted round-robin search routing policy. + */ + ClusterPutWeightedRoutingRequestBuilder prepareWeightedRouting(); + } diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index b04de7830a780..7154742de04fb 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -47,6 +47,7 @@ import org.opensearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; @@ -548,4 +549,13 @@ public static DeleteSnapshotRequest deleteSnapshotRequest(String repository, Str public static SnapshotsStatusRequest snapshotsStatusRequest(String repository) { return new SnapshotsStatusRequest(repository); } + + /** + * Updates weights for weighted round-robin search routing policy + * + * @return update weight request + */ + public static ClusterPutWeightedRoutingRequest putWeightedRoutingRequest(String attributeName) { + return new ClusterPutWeightedRoutingRequest(attributeName); + } } diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 21cd01bf65a45..efd18dd4947ad 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -110,6 +110,10 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder; @@ -1272,6 +1276,24 @@ public ActionFuture deleteDanglingIndex(DeleteDanglingInde return execute(DeleteDanglingIndexAction.INSTANCE, request); } + @Override + public ActionFuture putWeightedRouting(ClusterPutWeightedRoutingRequest request) { + return execute(ClusterAddWeightedRoutingAction.INSTANCE, request); + } + + @Override + public void putWeightedRouting( + ClusterPutWeightedRoutingRequest request, + ActionListener listener + ) { + execute(ClusterAddWeightedRoutingAction.INSTANCE, request, listener); + } + + @Override + public ClusterPutWeightedRoutingRequestBuilder prepareWeightedRouting() { + return new ClusterPutWeightedRoutingRequestBuilder(this, ClusterAddWeightedRoutingAction.INSTANCE); + } + @Override public void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener listener) { execute(DeleteDanglingIndexAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java new file mode 100644 index 0000000000000..da454865ac866 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.inject.Inject; + +import org.opensearch.threadpool.ThreadPool; + +/** + * * Service responsible for updating cluster state metadata with weighted routing weights + */ +public class WeightedRoutingService { + private static final Logger logger = LogManager.getLogger(WeightedRoutingService.class); + private final ClusterService clusterService; + private final ThreadPool threadPool; + + @Inject + public WeightedRoutingService(ClusterService clusterService, ThreadPool threadPool) { + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + public void registerWeightedRoutingMetadata( + final ClusterPutWeightedRoutingRequest request, + final ActionListener listener + ) { + final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + if (weightedRoutingMetadata == null) { + logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting()); + weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + } else { + if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) { + logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting()); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting()); + } else { + return currentState; + } + } + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting()); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to update cluster state for weighted routing weights [{}]", e)); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.debug("cluster weighted routing weights metadata change is processed by all the nodes"); + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + }); + } + + private boolean checkIfSameWeightsInMetadata( + WeightedRoutingMetadata newWeightedRoutingMetadata, + WeightedRoutingMetadata oldWeightedRoutingMetadata + ) { + return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java new file mode 100644 index 0000000000000..1cf44e665cf84 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.client.Requests; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Update Weighted Round Robin based shard routing weights + * + * @opensearch.api + * + */ +public class RestClusterPutWeightedRoutingAction extends BaseRestHandler { + + private static final Logger logger = LogManager.getLogger(RestClusterPutWeightedRoutingAction.class); + + @Override + public List routes() { + return singletonList(new Route(PUT, "/_cluster/routing/awareness/{attribute}/weights")); + } + + @Override + public String getName() { + return "put_weighted_routing_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = createRequest(request); + return channel -> client.admin().cluster().putWeightedRouting(putWeightedRoutingRequest, new RestToXContentListener<>(channel)); + } + + public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { + ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); + request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); + return putWeightedRoutingRequest; + } + +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java new file mode 100644 index 0000000000000..186e7e8638f17 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Map; + +public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { + + public void testSetWeightedRoutingWeight() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + assertEquals(request.getWeightedRouting(), weightedRouting); + } + + public void testValidate_ValuesAreProper() { + String reqString = "{\"us-east-1c\" : \"1\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNull(actionRequestValidationException); + } + + public void testValidate_TwoZonesWithZeroWeight() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("More than one [2] value has weight set as " + "0")); + } + + public void testValidate_MissingWeights() { + String reqString = "{}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Weights are missing")); + } + + public void testValidate_AttributeMissing() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Attribute name is missing")); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java new file mode 100644 index 0000000000000..e5cca998d3f06 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -0,0 +1,234 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class WeightedRoutingServiceTests extends OpenSearchTestCase { + private ThreadPool threadPool; + private ClusterService clusterService; + private TransportService transportService; + private WeightedRoutingService weightedRoutingService; + private ClusterSettings clusterSettings; + NodeClient client; + + final private static Set CLUSTER_MANAGER_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + ); + + final private static Set DATA_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.DATA_ROLE)) + ); + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("test", Settings.EMPTY); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + } + + @Before + public void setUpService() { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + clusterState = addClusterManagerNodes(clusterState); + clusterState = addDataNodes(clusterState); + clusterState = setLocalNode(clusterState, "nodeA1"); + + ClusterState.Builder builder = ClusterState.builder(clusterState); + ClusterServiceUtils.setState(clusterService, builder); + + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService( + Settings.EMPTY, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> clusterService.state().nodes().get("nodes1"), + null, + Collections.emptySet() + + ); + + Settings.Builder settingsBuilder = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone"); + + clusterSettings = new ClusterSettings(settingsBuilder.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + transportService.start(); + transportService.acceptIncomingRequests(); + + this.weightedRoutingService = new WeightedRoutingService(clusterService, threadPool); + client = new NodeClient(Settings.EMPTY, threadPool); + } + + @After + public void shutdown() { + clusterService.stop(); + threadPool.shutdown(); + } + + private ClusterState addDataNodes(ClusterState clusterState) { + clusterState = addDataNodeForAZone(clusterState, "zone_A", "nodeA1", "nodeA2", "nodeA3"); + clusterState = addDataNodeForAZone(clusterState, "zone_B", "nodeB1", "nodeB2", "nodeB3"); + clusterState = addDataNodeForAZone(clusterState, "zone_C", "nodeC1", "nodeC2", "nodeC3"); + return clusterState; + } + + private ClusterState addClusterManagerNodes(ClusterState clusterState) { + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_A", "nodeMA"); + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_B", "nodeMB"); + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_C", "nodeMC"); + return clusterState; + } + + private ClusterState addDataNodeForAZone(ClusterState clusterState, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach( + nodeId -> nodeBuilder.add( + new DiscoveryNode( + nodeId, + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", zone), + DATA_ROLE, + Version.CURRENT + ) + ) + ); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState addClusterManagerNodeForAZone(ClusterState clusterState, String zone, String... nodeIds) { + + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach( + nodeId -> nodeBuilder.add( + new DiscoveryNode( + nodeId, + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", zone), + CLUSTER_MANAGER_ROLE, + Version.CURRENT + ) + ) + ); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + nodeBuilder.localNodeId(nodeId); + nodeBuilder.clusterManagerNodeId(nodeId); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + return clusterState; + } + + public void testRegisterWeightedRoutingMetadataWithChangedWeights() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 0.0)); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("request should not fail"); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + public void testRegisterWeightedRoutingMetadataWithSameWeights() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("request should not fail"); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..a4cd6224217b7 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import com.fasterxml.jackson.core.JsonParseException; +import org.junit.Before; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.singletonMap; + +public class RestClusterAddWeightedRoutingActionTests extends RestActionTestCase { + private RestClusterPutWeightedRoutingAction action; + + @Before + public void setupAction() { + action = new RestClusterPutWeightedRoutingAction(); + controller().registerHandler(action); + } + + public void testCreateRequest_SupportedRequestBody() throws IOException { + String req = "{\"us-east-1c\" : \"1\", \"us-east-1d\":\"1.0\", \"us-east-1a\":\"0.0\"}"; + RestRequest restRequest = buildRestRequest(req); + ClusterPutWeightedRoutingRequest clusterPutWeightedRoutingRequest = RestClusterPutWeightedRoutingAction.createRequest(restRequest); + assertEquals("zone", clusterPutWeightedRoutingRequest.getWeightedRouting().attributeName()); + assertNotNull(clusterPutWeightedRoutingRequest.getWeightedRouting().weights()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1d").toString()); + assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + } + + public void testCreateRequest_UnsupportedRequestBody() throws IOException { + Map params = new HashMap<>(); + String req = "[\"us-east-1c\" : \"1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"]"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); + } + + public void testCreateRequest_MalformedRequestBody() throws IOException { + Map params = new HashMap<>(); + + String req = "{\"us-east-1c\" : \1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(JsonParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); + } + + public void testCreateRequest_EmptyRequestBody() throws IOException { + String req = "{}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); + } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath("/_cluster/routing/awareness/zone/weights") + .withParams(singletonMap("attribute", "zone")) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + +}