From 42f5f1022d2a42f5af3b1ddd2410284061ae9b27 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 3 Aug 2022 13:54:38 +0530 Subject: [PATCH 01/10] pit segments service layer changes Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 3 + .../indices/segments/PitSegmentsAction.java | 24 ++ .../indices/segments/PitSegmentsRequest.java | 79 ++++++ .../segments/TransportPitSegmentsAction.java | 259 ++++++++++++++++++ .../node/TransportBroadcastByNodeAction.java | 9 +- .../java/org/opensearch/client/Client.java | 7 + .../client/support/AbstractClient.java | 7 + .../cluster/routing/ShardRouting.java | 4 +- .../search/internal/PitReaderContext.java | 23 ++ .../action/search/PitTestsUtil.java | 16 +- .../search/CreatePitSingleNodeTests.java | 16 +- .../opensearch/search/PitMultiNodeTests.java | 67 +++++ 12 files changed, 509 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 2f470d7603869..3a23f2578cb58 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -163,7 +163,9 @@ import org.opensearch.action.admin.indices.rollover.RolloverAction; import org.opensearch.action.admin.indices.rollover.TransportRolloverAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.TransportPitSegmentsAction; import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; import org.opensearch.action.admin.indices.settings.get.TransportGetSettingsAction; import org.opensearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; @@ -667,6 +669,7 @@ public void reg actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); + actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class); return unmodifiableMap(actions.getRegistry()); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java new file mode 100644 index 0000000000000..b52ef32a91b16 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.segments; + +import org.opensearch.action.ActionType; + +/** + * Action for retrieving segment information for PITs + */ +public class PitSegmentsAction extends ActionType { + + public static final PitSegmentsAction INSTANCE = new PitSegmentsAction(); + public static final String NAME = "indices:monitor/point_in_time/segments"; + + private PitSegmentsAction() { + super(NAME, IndicesSegmentResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java new file mode 100644 index 0000000000000..05186bb8e2d13 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.segments; + +import org.opensearch.action.support.broadcast.BroadcastRequest; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Transport request for retrieving PITs segment information + */ +public class PitSegmentsRequest extends BroadcastRequest { + + protected boolean verbose = false; + private Collection pitIds; + + public PitSegmentsRequest() { + this(Strings.EMPTY_ARRAY); + } + + public PitSegmentsRequest(StreamInput in) throws IOException { + super(in); + pitIds = Arrays.asList(in.readStringArray()); + verbose = in.readBoolean(); + } + + public PitSegmentsRequest(String... indices) { + super(indices); + pitIds = Collections.emptyList(); + } + + /** + * true if detailed information about each segment should be returned, + * false otherwise. + */ + public boolean verbose() { + return verbose; + } + + /** + * Sets the verbose option. + * @see #verbose() + */ + public void verbose(boolean v) { + verbose = v; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (pitIds == null) { + out.writeVInt(0); + } else { + out.writeStringArray(pitIds.toArray(new String[pitIds.size()])); + } + out.writeBoolean(verbose); + + } + + public Collection getPitIds() { + return pitIds; + } + + public void setPitIds(Collection pitIds) { + this.pitIds = pitIds; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java new file mode 100644 index 0000000000000..a8caefba9ac27 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -0,0 +1,259 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.action.admin.indices.segments; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.ListPitInfo; +import org.opensearch.action.search.PitService; +import org.opensearch.action.search.SearchContextId; +import org.opensearch.action.search.SearchContextIdForNode; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.DefaultShardOperationFailedException; +import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.PlainShardsIterator; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.ShardsIterator; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.search.SearchService; +import org.opensearch.search.internal.PitReaderContext; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.action.search.SearchContextId.decode; + +/** + * Transport action for retrieving segment information of PITs + */ +public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction { + private final ClusterService clusterService; + private final IndicesService indicesService; + private final SearchService searchService; + private final NamedWriteableRegistry namedWriteableRegistry; + private final TransportService transportService; + private final PitService pitService; + + @Inject + public TransportPitSegmentsAction( + ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + SearchService searchService, + NamedWriteableRegistry namedWriteableRegistry, + PitService pitService + ) { + super( + PitSegmentsAction.NAME, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + PitSegmentsRequest::new, + ThreadPool.Names.MANAGEMENT + ); + this.clusterService = clusterService; + this.indicesService = indicesService; + this.searchService = searchService; + this.namedWriteableRegistry = namedWriteableRegistry; + this.transportService = transportService; + this.pitService = pitService; + } + + /** + * Execute PIT segments flow for all PITs or request PIT IDs + */ + @Override + protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { + if (request.getPitIds().isEmpty()) { + pitService.getAllPits(ActionListener.wrap(response -> { + request.setPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); + getDoExecute(task, request, listener); + }, listener::onFailure)); + } else { + getDoExecute(task, request, listener); + } + } + + private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener listener) { + super.doExecute(task, request, listener); + } + + /** + * This adds list of shards on which we need to retrieve pit segments details + * @param clusterState the cluster state + * @param request the underlying request + * @param concreteIndices the concrete indices on which to execute the operation + */ + @Override + protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) { + final ArrayList iterators = new ArrayList<>(); + for (String pitId : request.getPitIds()) { + SearchContextId searchContext = decode(namedWriteableRegistry, pitId); + for (Map.Entry entry : searchContext.shards().entrySet()) { + final SearchContextIdForNode perNode = entry.getValue(); + if (Strings.isEmpty(perNode.getClusterAlias())) { + final ShardId shardId = entry.getKey(); + iterators.add( + new PitAwareShardRouting( + pitId, + shardId, + perNode.getNode(), + null, + true, + ShardRoutingState.STARTED, + null, + null, + null, + -1L + ) + ); + } + } + } + return new PlainShardsIterator(iterators); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); + } + + @Override + protected ShardSegments readShardResult(StreamInput in) throws IOException { + return new ShardSegments(in); + } + + @Override + protected IndicesSegmentResponse newResponse( + PitSegmentsRequest request, + int totalShards, + int successfulShards, + int failedShards, + List results, + List shardFailures, + ClusterState clusterState + ) { + return new IndicesSegmentResponse( + results.toArray(new ShardSegments[results.size()]), + totalShards, + successfulShards, + failedShards, + shardFailures + ); + } + + @Override + protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException { + return new PitSegmentsRequest(in); + } + + @Override + public List getShardsFromInputStream(StreamInput in) throws IOException { + return in.readList(PitAwareShardRouting::new); + } + + /** + * This retrieves segment details of PIT context + * @param request the node-level request + * @param shardRouting the shard on which to execute the operation + */ + @Override + protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { + PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; + SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards() + .get(shardRouting.shardId()); + PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); + if (pitReaderContext == null) return new ShardSegments(shardRouting, new ArrayList<>()); + return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + } + + /** + * This holds PIT id which is used to perform broadcast operation in PIT shards to retrieve segments information + */ + public class PitAwareShardRouting extends ShardRouting { + + private final String pitId; + + public PitAwareShardRouting(StreamInput in) throws IOException { + super(in); + this.pitId = in.readString(); + } + + public PitAwareShardRouting( + String pitId, + ShardId shardId, + String currentNodeId, + String relocatingNodeId, + boolean primary, + ShardRoutingState state, + RecoverySource recoverySource, + UnassignedInfo unassignedInfo, + AllocationId allocationId, + long expectedShardSize + ) { + super( + shardId, + currentNodeId, + relocatingNodeId, + primary, + state, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSize + ); + this.pitId = pitId; + } + + public String getPitId() { + return pitId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(pitId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + super.toXContent(builder, params); + builder.field("pitId", pitId); + return builder.endObject(); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index f849be4db4e2b..3d133cd1f7909 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -532,6 +532,13 @@ private void onShardOperation( } } + /** + * This method reads ShardRouting from input stream + */ + public List getShardsFromInputStream(StreamInput in) throws IOException { + return in.readList(ShardRouting::new); + } + /** * A node request * @@ -547,7 +554,7 @@ public class NodeRequest extends TransportRequest implements IndicesRequest { public NodeRequest(StreamInput in) throws IOException { super(in); indicesLevelRequest = readRequestFrom(in); - shards = in.readList(ShardRouting::new); + shards = getShardsFromInputStream(in); nodeId = in.readString(); } diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index 1d3bbfcba43f9..94043d5c3c89f 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -34,6 +34,8 @@ import org.opensearch.action.ActionFuture; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; @@ -339,6 +341,11 @@ public interface Client extends OpenSearchClient, Releasable { */ void deletePits(DeletePitRequest deletePITRequest, ActionListener listener); + /** + * Get information of segments of one or more PITs + */ + void pitSegments(PitSegmentsRequest pitSegmentsRequest, ActionListener listener); + /** * Performs multiple search requests. */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index f99454a8a8913..104ae723da2df 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -237,6 +237,8 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequestBuilder; @@ -590,6 +592,11 @@ public void deletePits(final DeletePitRequest deletePITRequest, final ActionList execute(DeletePitAction.INSTANCE, deletePITRequest, listener); } + @Override + public void pitSegments(final PitSegmentsRequest request, final ActionListener listener) { + execute(PitSegmentsAction.INSTANCE, request, listener); + } + @Override public ActionFuture multiSearch(MultiSearchRequest request) { return execute(MultiSearchAction.INSTANCE, request); diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index 7dec8f9c84a89..e3aa2a666d454 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -54,7 +54,7 @@ * * @opensearch.internal */ -public final class ShardRouting implements Writeable, ToXContentObject { +public class ShardRouting implements Writeable, ToXContentObject { /** * Used if shard size is not available @@ -78,7 +78,7 @@ public final class ShardRouting implements Writeable, ToXContentObject { * A constructor to internally create shard routing instances, note, the internal flag should only be set to true * by either this class or tests. Visible for testing. */ - ShardRouting( + protected ShardRouting( ShardId shardId, String currentNodeId, String relocatingNodeId, diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 98e84136a8847..4968e6f6fa7cf 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -9,12 +9,16 @@ package org.opensearch.search.internal; import org.apache.lucene.util.SetOnce; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import java.util.List; + /** * PIT reader context containing PIT specific information such as pit id, create time etc. */ @@ -24,6 +28,15 @@ public class PitReaderContext extends ReaderContext { private final SetOnce pitId = new SetOnce<>(); // Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts private final SetOnce creationTime = new SetOnce<>(); + /** + * Shard routing at the time of creation of PIT Reader Context + */ + private final ShardRouting shardRouting; + + /** + * Encapsulates segments constituting the shard at the time of creation of PIT Reader Context. + */ + private final List segments; public PitReaderContext( ShardSearchContextId id, @@ -34,6 +47,8 @@ public PitReaderContext( boolean singleSession ) { super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession); + shardRouting = indexShard.routingEntry(); + segments = indexShard.segments(true); } public String getPitId() { @@ -67,4 +82,12 @@ public long getCreationTime() { public void setCreationTime(final long creationTime) { this.creationTime.set(creationTime); } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public List getSegments() { + return segments; + } } diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index 433cd9dfa3e89..26a2b2daf47f9 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -14,6 +14,9 @@ import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.util.concurrent.AtomicArray; @@ -33,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; +import static org.junit.Assert.assertTrue; import static org.opensearch.test.OpenSearchTestCase.between; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; import static org.opensearch.test.OpenSearchTestCase.randomBoolean; @@ -107,7 +111,7 @@ public static void assertUsingGetAllPits(Client client, String id, long creation GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); GetAllPitNodesResponse getPitResponse = execute1.get(); - Assert.assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id)); + assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id)); Assert.assertEquals(getPitResponse.getPitInfos().get(0).getCreationTime(), creationTime); } @@ -128,4 +132,14 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio GetAllPitNodesResponse getPitResponse = execute1.get(); Assert.assertEquals(0, getPitResponse.getPitInfos().size()); } + + public static void assertSegments(boolean isEmpty, Client client) { + IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).actionGet(); + assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); + if (isEmpty) { + assertTrue(indicesSegmentResponse.getIndices().size() == 0); + } else { + assertTrue(indicesSegmentResponse.getIndices().size() != 0); + } + } } diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index b28423c3a8657..c7764a0f99099 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.CoreMatchers.equalTo; +import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; @@ -65,6 +66,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -75,6 +77,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + assertSegments(true, client()); } public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException { @@ -89,9 +92,11 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); + assertSegments(false, client()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); service.doClose(); + assertSegments(true, client()); } public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException { @@ -103,6 +108,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -113,6 +119,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); service.doClose(); + assertSegments(true, client()); } public void testCreatePITWithNonExistentIndex() { @@ -128,6 +135,7 @@ public void testCreatePITWithNonExistentIndex() { assertTrue(ex.getMessage().contains("no such index [index1]")); assertEquals(0, service.getActiveContexts()); + assertSegments(true, client()); service.doClose(); } @@ -148,6 +156,7 @@ public void testCreatePITOnCloseIndex() throws ExecutionException, InterruptedEx SearchService service = getInstanceFromNode(SearchService.class); assertEquals(0, service.getActiveContexts()); PitTestsUtil.assertGetAllPitsEmpty(client()); + assertSegments(true, client()); service.doClose(); } @@ -171,6 +180,7 @@ public void testPitSearchOnDeletedIndex() throws ExecutionException, Interrupted SearchService service = getInstanceFromNode(SearchService.class); PitTestsUtil.assertGetAllPitsEmpty(client()); assertEquals(0, service.getActiveContexts()); + assertSegments(true, client()); service.doClose(); } @@ -196,6 +206,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); client().admin().indices().prepareClose("index").get(); @@ -208,7 +219,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); assertEquals(0, service.getActiveContexts()); PitTestsUtil.assertGetAllPitsEmpty(client()); - + assertSegments(true, client()); // PIT reader contexts are lost after close, verifying it with open index api client().admin().indices().prepareOpen("index").get(); ex = expectThrows(SearchPhaseExecutionException.class, () -> { @@ -465,6 +476,7 @@ public void testPitAfterUpdateIndex() throws Exception { service.doClose(); assertEquals(0, service.getActiveContexts()); PitTestsUtil.assertGetAllPitsEmpty(client()); + assertSegments(true, client()); } } @@ -477,6 +489,7 @@ public void testConcurrentSearches() throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); @@ -508,5 +521,6 @@ public void testConcurrentSearches() throws Exception { service.doClose(); assertEquals(0, service.getActiveContexts()); PitTestsUtil.assertGetAllPitsEmpty(client()); + assertSegments(true, client()); } } diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index f93a43a027da7..a4391a5ee248b 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -17,6 +17,9 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; @@ -49,6 +52,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; +import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -82,6 +86,7 @@ public void testPit() throws Exception { assertEquals(2, searchResponse.getSuccessfulShards()); assertEquals(2, searchResponse.getTotalShards()); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); } public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { @@ -108,6 +113,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index") @@ -138,6 +144,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, client()); return super.onNodeStopped(nodeName); } }); @@ -454,4 +461,64 @@ public void onFailure(Exception e) {} } } + public void testConcurrentGetSegmentsWithDeletes() throws InterruptedException, ExecutionException { + CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true); + createPitRequest.setIndices(new String[] { "index" }); + List pitIds = new ArrayList<>(); + String id = client().execute(CreatePitAction.INSTANCE, createPitRequest).get().getId(); + pitIds.add(id); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + AtomicInteger numSuccess = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(PitMultiNodeTests.class.getName()); + int concurrentRuns = randomIntBetween(20, 50); + + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + long randomDeleteThread = randomLongBetween(0, concurrentRuns - 1); + for (int i = 0; i < concurrentRuns; i++) { + int currentThreadIteration = i; + Runnable thread = () -> { + if (currentThreadIteration == randomDeleteThread) { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { + if (indicesSegmentResponse.getShardFailures() == null + || indicesSegmentResponse.getShardFailures().length == 0) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest(), listener); + } else { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + } + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numSuccess.get()); + + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + } From 1c702de1dbdd67aaa741b07f2bc5d76c0e72819e Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 3 Aug 2022 22:03:05 +0530 Subject: [PATCH 02/10] Addressing comment Signed-off-by: Bharathwaj G --- .../indices/segments/PitSegmentsRequest.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index 05186bb8e2d13..bef69df6af595 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -17,14 +17,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; /** * Transport request for retrieving PITs segment information */ public class PitSegmentsRequest extends BroadcastRequest { - protected boolean verbose = false; - private Collection pitIds; + boolean verbose = false; + private List pitIds; public PitSegmentsRequest() { this(Strings.EMPTY_ARRAY); @@ -45,15 +46,15 @@ public PitSegmentsRequest(String... indices) { * true if detailed information about each segment should be returned, * false otherwise. */ - public boolean verbose() { + public boolean isVerbose() { return verbose; } /** * Sets the verbose option. - * @see #verbose() + * @see #isVerbose() */ - public void verbose(boolean v) { + public void isVerbose(boolean v) { verbose = v; } @@ -70,10 +71,10 @@ public void writeTo(StreamOutput out) throws IOException { } public Collection getPitIds() { - return pitIds; + return Collections.unmodifiableList(pitIds); } - public void setPitIds(Collection pitIds) { + public void setPitIds(List pitIds) { this.pitIds = pitIds; } } From babab923961153a2adba2ad67461de94ea96397a Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 4 Aug 2022 11:01:30 +0530 Subject: [PATCH 03/10] Addressing comment Signed-off-by: Bharathwaj G --- .../indices/segments/PitSegmentsRequest.java | 4 +- .../opensearch/search/PitMultiNodeTests.java | 65 ------------------- 2 files changed, 1 insertion(+), 68 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index bef69df6af595..0683c5a1424cb 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -23,7 +23,6 @@ * Transport request for retrieving PITs segment information */ public class PitSegmentsRequest extends BroadcastRequest { - boolean verbose = false; private List pitIds; @@ -54,7 +53,7 @@ public boolean isVerbose() { * Sets the verbose option. * @see #isVerbose() */ - public void isVerbose(boolean v) { + public void setVerbose(boolean v) { verbose = v; } @@ -67,7 +66,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(pitIds.toArray(new String[pitIds.size()])); } out.writeBoolean(verbose); - } public Collection getPitIds() { diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index a4391a5ee248b..326ac067afafc 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -17,9 +17,6 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.opensearch.action.admin.indices.segments.PitSegmentsAction; -import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; @@ -98,7 +95,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); ExecutionException ex = expectThrows(ExecutionException.class, execute::get); assertTrue(ex.getMessage().contains("Failed to execute phase [create_pit]")); - assertTrue(ex.getMessage().contains("Partial shards failure")); return super.onNodeStopped(nodeName); } }); @@ -144,7 +140,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); return super.onNodeStopped(nodeName); } }); @@ -461,64 +456,4 @@ public void onFailure(Exception e) {} } } - public void testConcurrentGetSegmentsWithDeletes() throws InterruptedException, ExecutionException { - CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true); - createPitRequest.setIndices(new String[] { "index" }); - List pitIds = new ArrayList<>(); - String id = client().execute(CreatePitAction.INSTANCE, createPitRequest).get().getId(); - pitIds.add(id); - DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); - AtomicInteger numSuccess = new AtomicInteger(); - TestThreadPool testThreadPool = null; - try { - testThreadPool = new TestThreadPool(PitMultiNodeTests.class.getName()); - int concurrentRuns = randomIntBetween(20, 50); - - List operationThreads = new ArrayList<>(); - CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); - long randomDeleteThread = randomLongBetween(0, concurrentRuns - 1); - for (int i = 0; i < concurrentRuns; i++) { - int currentThreadIteration = i; - Runnable thread = () -> { - if (currentThreadIteration == randomDeleteThread) { - LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { - if (indicesSegmentResponse.getShardFailures() == null - || indicesSegmentResponse.getShardFailures().length == 0) { - numSuccess.incrementAndGet(); - } - } - - @Override - public void onFailure(Exception e) {} - }, countDownLatch); - client().execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest(), listener); - } else { - LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(DeletePitResponse deletePitResponse) { - if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { - numSuccess.incrementAndGet(); - } - } - - @Override - public void onFailure(Exception e) {} - }, countDownLatch); - client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); - } - }; - operationThreads.add(thread); - } - TestThreadPool finalTestThreadPool = testThreadPool; - operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); - countDownLatch.await(); - assertEquals(concurrentRuns, numSuccess.get()); - - } finally { - ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); - } - } - } From adf80d67b51d93fdd3414216ebcb74e9e63d2088 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 4 Aug 2022 20:15:47 +0530 Subject: [PATCH 04/10] Addressing comment Signed-off-by: Bharathwaj G --- .../action/admin/indices/segments/PitSegmentsRequest.java | 2 +- .../org/opensearch/search/internal/PitReaderContext.java | 3 ++- .../java/org/opensearch/action/search/PitTestsUtil.java | 7 ++----- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index 0683c5a1424cb..3a277b565747e 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -23,7 +23,7 @@ * Transport request for retrieving PITs segment information */ public class PitSegmentsRequest extends BroadcastRequest { - boolean verbose = false; + private boolean verbose = false; private List pitIds; public PitSegmentsRequest() { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 4968e6f6fa7cf..b24a8a4172e29 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -17,6 +17,7 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import java.util.Collections; import java.util.List; /** @@ -88,6 +89,6 @@ public ShardRouting getShardRouting() { } public List getSegments() { - return segments; + return Collections.unmodifiableList(segments); } } diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index 26a2b2daf47f9..8f315261eb3bf 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.opensearch.test.OpenSearchTestCase.between; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; @@ -136,10 +137,6 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio public static void assertSegments(boolean isEmpty, Client client) { IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).actionGet(); assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); - if (isEmpty) { - assertTrue(indicesSegmentResponse.getIndices().size() == 0); - } else { - assertTrue(indicesSegmentResponse.getIndices().size() != 0); - } + assertEquals(indicesSegmentResponse.getIndices().isEmpty(), isEmpty); } } From 1eb68d749325266929e3ea5d3464e1c015da992f Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 5 Aug 2022 09:55:32 +0530 Subject: [PATCH 05/10] addressing review comments Signed-off-by: Bharathwaj G --- .../segments/TransportPitSegmentsAction.java | 28 ++++++++++--------- .../node/TransportBroadcastByNodeAction.java | 4 +-- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index a8caefba9ac27..165a3acb05499 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -96,17 +96,13 @@ protected void doExecute(Task task, PitSegmentsRequest request, ActionListener { request.setPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); - getDoExecute(task, request, listener); + super.doExecute(task, request, listener); }, listener::onFailure)); } else { - getDoExecute(task, request, listener); + super.doExecute(task, request, listener); } } - private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener listener) { - super.doExecute(task, request, listener); - } - /** * This adds list of shards on which we need to retrieve pit segments details * @param clusterState the cluster state @@ -120,6 +116,7 @@ protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest re SearchContextId searchContext = decode(namedWriteableRegistry, pitId); for (Map.Entry entry : searchContext.shards().entrySet()) { final SearchContextIdForNode perNode = entry.getValue(); + // check if node is part of local cluster if (Strings.isEmpty(perNode.getClusterAlias())) { final ShardId shardId = entry.getKey(); iterators.add( @@ -182,7 +179,7 @@ protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException } @Override - public List getShardsFromInputStream(StreamInput in) throws IOException { + public List getShardRoutingsFromInputStream(StreamInput in) throws IOException { return in.readList(PitAwareShardRouting::new); } @@ -193,12 +190,17 @@ public List getShardsFromInputStream(StreamInput in) throws IOExce */ @Override protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { - PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; - SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards() - .get(shardRouting.shardId()); - PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); - if (pitReaderContext == null) return new ShardSegments(shardRouting, new ArrayList<>()); - return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + if (shardRouting instanceof PitAwareShardRouting) { + PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; + SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards() + .get(shardRouting.shardId()); + PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); + if (pitReaderContext == null) { + return new ShardSegments(shardRouting, new ArrayList<>()); + } + return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + } + return new ShardSegments(shardRouting, new ArrayList<>()); } /** diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 3d133cd1f7909..9e353a35831d0 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -535,7 +535,7 @@ private void onShardOperation( /** * This method reads ShardRouting from input stream */ - public List getShardsFromInputStream(StreamInput in) throws IOException { + public List getShardRoutingsFromInputStream(StreamInput in) throws IOException { return in.readList(ShardRouting::new); } @@ -554,7 +554,7 @@ public class NodeRequest extends TransportRequest implements IndicesRequest { public NodeRequest(StreamInput in) throws IOException { super(in); indicesLevelRequest = readRequestFrom(in); - shards = getShardsFromInputStream(in); + shards = getShardRoutingsFromInputStream(in); nodeId = in.readString(); } From 14071beb1162f3fad260ed6d9c8c14d98569f26d Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 9 Aug 2022 08:19:08 +0530 Subject: [PATCH 06/10] addressing comment Signed-off-by: Bharathwaj G --- .../action/admin/indices/segments/PitSegmentsRequest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index 3a277b565747e..d0146c0c52f7b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -60,11 +60,7 @@ public void setVerbose(boolean v) { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (pitIds == null) { - out.writeVInt(0); - } else { - out.writeStringArray(pitIds.toArray(new String[pitIds.size()])); - } + out.writeStringArrayNullable((pitIds == null) ? null : pitIds.toArray(new String[pitIds.size()])); out.writeBoolean(verbose); } From 5e0647935c85f34343e582ffbc758384c4f60499 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 11 Aug 2022 17:16:27 +0530 Subject: [PATCH 07/10] Addressing comments Signed-off-by: Bharathwaj G --- .../indices/segments/TransportPitSegmentsAction.java | 2 +- .../opensearch/action/search/CreatePitResponse.java | 2 +- .../org/opensearch/action/search/PitTestsUtil.java | 11 ++++++++++- .../java/org/opensearch/search/PitMultiNodeTests.java | 2 +- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 165a3acb05499..a1c8818215f95 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -254,7 +254,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { super.toXContent(builder, params); - builder.field("pitId", pitId); + builder.field("pit_id", pitId); return builder.endObject(); } } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java b/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java index 25eb9aff9e3d7..dd197a37f8616 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java @@ -28,7 +28,7 @@ * Create point in time response with point in time id and shard success / failures */ public class CreatePitResponse extends ActionResponse implements StatusToXContentObject { - private static final ParseField ID = new ParseField("id"); + private static final ParseField ID = new ParseField("pit_id"); private static final ParseField CREATION_TIME = new ParseField("creation_time"); // point in time id diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index 8f315261eb3bf..efb5a7d23022d 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -134,9 +134,18 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio Assert.assertEquals(0, getPitResponse.getPitInfos().size()); } - public static void assertSegments(boolean isEmpty, Client client) { + public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client) { IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).actionGet(); assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); assertEquals(indicesSegmentResponse.getIndices().isEmpty(), isEmpty); + if (!isEmpty) { + assertTrue(indicesSegmentResponse.getIndices().get(index) != null); + assertTrue(indicesSegmentResponse.getIndices().get(index).getIndex().equalsIgnoreCase(index)); + assertEquals(expectedShardSize, indicesSegmentResponse.getIndices().get(index).getShards().size()); + } + } + + public static void assertSegments(boolean isEmpty, Client client) { + assertSegments(isEmpty, "index", 2, client); } } diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 229dc68453a24..d29ccf5b97138 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -114,7 +114,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, "index", 1, client()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index") From 51ef2401888bef37acb0fe7e12370d130e13feeb Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 16 Aug 2022 22:58:12 +0530 Subject: [PATCH 08/10] addressing comments Signed-off-by: Bharathwaj G --- .../admin/indices/segments/TransportPitSegmentsAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index a1c8818215f95..e089380bb5e6e 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -199,8 +199,9 @@ protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting return new ShardSegments(shardRouting, new ArrayList<>()); } return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + } else { + throw new IllegalArgumentException("Shard routing is not of PitAwareShardRouting type"); } - return new ShardSegments(shardRouting, new ArrayList<>()); } /** From 903588e1b2ae8588e32b257f464c8d0d74384f7d Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 18 Aug 2022 12:56:59 +0530 Subject: [PATCH 09/10] Addressing comments Signed-off-by: Bharathwaj G --- .../segments/TransportPitSegmentsAction.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index e089380bb5e6e..cd9f2014187ad 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -190,18 +191,15 @@ public List getShardRoutingsFromInputStream(StreamInput in) throws */ @Override protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { - if (shardRouting instanceof PitAwareShardRouting) { - PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; - SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards() - .get(shardRouting.shardId()); - PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); - if (pitReaderContext == null) { - return new ShardSegments(shardRouting, new ArrayList<>()); - } - return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); - } else { - throw new IllegalArgumentException("Shard routing is not of PitAwareShardRouting type"); + assert shardRouting instanceof PitAwareShardRouting; + PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; + SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards() + .get(shardRouting.shardId()); + PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); + if (pitReaderContext == null) { + return new ShardSegments(shardRouting, Collections.emptyList()); } + return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); } /** From 78c645bbb4f08586b92156d29271421a9ac2201f Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 22 Aug 2022 16:48:32 +0530 Subject: [PATCH 10/10] Adding '_all' as option to get all segments Signed-off-by: Bharathwaj G --- .../indices/segments/PitSegmentsRequest.java | 31 +++++++++++++------ .../segments/TransportPitSegmentsAction.java | 5 +-- .../action/search/PitTestsUtil.java | 3 +- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index d0146c0c52f7b..84f5e5ad6a1e8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -8,23 +8,26 @@ package org.opensearch.action.admin.indices.segments; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.broadcast.BroadcastRequest; import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; +import static org.opensearch.action.ValidateActions.addValidationError; + /** * Transport request for retrieving PITs segment information */ public class PitSegmentsRequest extends BroadcastRequest { private boolean verbose = false; - private List pitIds; + private final List pitIds = new ArrayList<>(); public PitSegmentsRequest() { this(Strings.EMPTY_ARRAY); @@ -32,13 +35,13 @@ public PitSegmentsRequest() { public PitSegmentsRequest(StreamInput in) throws IOException { super(in); - pitIds = Arrays.asList(in.readStringArray()); + pitIds.addAll(Arrays.asList(in.readStringArray())); verbose = in.readBoolean(); } - public PitSegmentsRequest(String... indices) { - super(indices); - pitIds = Collections.emptyList(); + public PitSegmentsRequest(String... pitIds) { + super(pitIds); + this.pitIds.addAll(Arrays.asList(pitIds)); } /** @@ -64,11 +67,21 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(verbose); } - public Collection getPitIds() { + public List getPitIds() { return Collections.unmodifiableList(pitIds); } - public void setPitIds(List pitIds) { - this.pitIds = pitIds; + public void clearAndSetPitIds(List pitIds) { + this.pitIds.clear(); + this.pitIds.addAll(pitIds); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (pitIds == null || pitIds.isEmpty()) { + validationException = addValidationError("no pit ids specified", validationException); + } + return validationException; } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index cd9f2014187ad..9d4ece74a7270 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -94,9 +94,10 @@ public TransportPitSegmentsAction( */ @Override protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { - if (request.getPitIds().isEmpty()) { + List pitIds = request.getPitIds(); + if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { pitService.getAllPits(ActionListener.wrap(response -> { - request.setPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); + request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); super.doExecute(task, request, listener); }, listener::onFailure)); } else { diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index efb5a7d23022d..60a31c62dc32d 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -135,7 +135,8 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio } public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client) { - IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).actionGet(); + PitSegmentsRequest pitSegmentsRequest = new PitSegmentsRequest("_all"); + IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest).actionGet(); assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); assertEquals(indicesSegmentResponse.getIndices().isEmpty(), isEmpty); if (!isEmpty) {