Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into decommission-api/pr
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Sep 27, 2022
2 parents fbe0e7d + 9b2202a commit ef33115
Show file tree
Hide file tree
Showing 42 changed files with 1,648 additions and 221 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001))
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
Expand Down Expand Up @@ -44,6 +45,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ static Request deleteAllPits() {
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request getAllPits() {
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen
);
}

/**
* Get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<GetAllPitNodesResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Tests point in time API with rest high level client
Expand Down Expand Up @@ -52,21 +54,24 @@ public void indexDocuments() throws IOException {

public void testCreateAndDeletePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(createPitResponse.getId() != null);
assertEquals(1, createPitResponse.getTotalShards());
assertEquals(1, createPitResponse.getSuccessfulShards());
assertEquals(0, createPitResponse.getFailedShards());
assertEquals(0, createPitResponse.getSkippedShards());
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(createPitResponse.getId()));
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
pitIds.add(createPitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
public void testDeleteAllAndListAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
Expand All @@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException {
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);

List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse.getId()));
assertTrue(pits.contains(pitResponse1.getId()));
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
Expand All @@ -95,8 +105,27 @@ public void onFailure(Exception e) {
}
}
};
final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

ActionListener<GetAllPitNodesResponse> getPitsListener = new ActionListener<GetAllPitNodesResponse>() {
@Override
public void onResponse(GetAllPitNodesResponse response) {
List<String> pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse3.getId()));
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("List all PITs failed", e);
}
}
};
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
// validate no pits case
getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
assertTrue(getAllPitResponse.getPitInfos().size() == 0);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase {
"ping",
"info",
"delete_all_pits",
"get_all_pits",
// security
"security.get_ssl_certificates",
"security.authenticate",
Expand Down Expand Up @@ -888,6 +889,7 @@ public void testApiNamingConventions() throws Exception {
"nodes.reload_secure_settings",
"search_shards",
"remote_store.restore",
"cluster.put_weighted_routing",
"cluster.put_decommission_awareness",
"cluster.get_decommission_awareness", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cluster.put_weighted_routing": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/put",
"description": "Updates weighted shard routing weights"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/{attribute}/weights",
"methods": [
"PUT"
],
"parts": {
"attribute": {
"type": "string",
"description": "Awareness attribute name"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"get_all_pits":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Lists all active point in time searches."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time/_all",
"methods":[
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@
- match: {hits.total: 3 }
- length: {hits.hits: 1 }

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_pit:
body:
Expand Down Expand Up @@ -119,6 +125,12 @@
- set: {pit_id: pit_id}
- match: { _shards.failed: 0}

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_all_pits: {}

Expand Down
16 changes: 11 additions & 5 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
Expand Down Expand Up @@ -242,16 +244,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.NodesGetAllPitsAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -300,6 +300,7 @@
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction;
import org.opensearch.rest.action.admin.cluster.RestClusterSearchShardsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterStateAction;
Expand Down Expand Up @@ -391,6 +392,7 @@
import org.opensearch.rest.action.cat.RestClusterManagerAction;
import org.opensearch.rest.action.cat.RestNodeAttrsAction;
import org.opensearch.rest.action.cat.RestNodesAction;
import org.opensearch.rest.action.cat.RestPitSegmentsAction;
import org.opensearch.rest.action.cat.RestPluginsAction;
import org.opensearch.rest.action.cat.RestRepositoriesAction;
import org.opensearch.rest.action.cat.RestSegmentsAction;
Expand Down Expand Up @@ -419,6 +421,7 @@
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPitsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
Expand Down Expand Up @@ -569,6 +572,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -681,10 +685,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);
Expand Down Expand Up @@ -755,6 +758,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestClusterPutWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down Expand Up @@ -868,6 +872,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.ActionType;

/**
* Action to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public final class ClusterAddWeightedRoutingAction extends ActionType<ClusterPutWeightedRoutingResponse> {

public static final ClusterAddWeightedRoutingAction INSTANCE = new ClusterAddWeightedRoutingAction();
public static final String NAME = "cluster:admin/routing/awareness/weights/put";

private ClusterAddWeightedRoutingAction() {
super(NAME, ClusterPutWeightedRoutingResponse::new);
}
}
Loading

0 comments on commit ef33115

Please sign in to comment.