Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add changes to Point in time segments API service layer #4105

Merged
merged 18 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportPitSegmentsAction;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
import org.opensearch.action.admin.indices.settings.get.TransportGetSettingsAction;
import org.opensearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
Expand Down Expand Up @@ -667,6 +669,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action is not clear from the name. Why not GetPitSegmentsAction? It will make it consistent with previous action names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming is consistent with 'IndicesSegmentsAction' - which is similar to this API. Should we still consider renaming ?


return unmodifiableMap(actions.getRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionType;

/**
* Action for retrieving segment information for PITs
*/
public class PitSegmentsAction extends ActionType<IndicesSegmentResponse> {

public static final PitSegmentsAction INSTANCE = new PitSegmentsAction();
public static final String NAME = "indices:monitor/point_in_time/segments";

private PitSegmentsAction() {
super(NAME, IndicesSegmentResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Transport request for retrieving PITs segment information
*/
public class PitSegmentsRequest extends BroadcastRequest<PitSegmentsRequest> {
private boolean verbose = false;
private List<String> pitIds;

public PitSegmentsRequest() {
this(Strings.EMPTY_ARRAY);
}

public PitSegmentsRequest(StreamInput in) throws IOException {
super(in);
pitIds = Arrays.asList(in.readStringArray());
verbose = in.readBoolean();
}

public PitSegmentsRequest(String... indices) {
super(indices);
pitIds = Collections.emptyList();
}

/**
* <code>true</code> if detailed information about each segment should be returned,
* <code>false</code> otherwise.
*/
public boolean isVerbose() {
return verbose;
}

/**
* Sets the <code>verbose</code> option.
* @see #isVerbose()
*/
public void setVerbose(boolean v) {
verbose = v;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (pitIds == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use out.writeStringArrayNullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it works only on array though, this is a list.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is what you do anyway: pitIds.toArray(new String[pitIds.size()])?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if pitIds == null , pitIds.toArray will throw npe , am i understanding this wrong ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will: out.writeStringArrayNullable((pitIds == null) ? null : pitIds.toArray(new String[pitIds.size()]));

out.writeVInt(0);
} else {
out.writeStringArray(pitIds.toArray(new String[pitIds.size()]));
}
out.writeBoolean(verbose);
}

public Collection<String> getPitIds() {
Copy link
Contributor

@pranikum pranikum Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnmodifiableList ? Also i don't see anywhere we are using it apart from List type. We can use List itself.

return Collections.unmodifiableList(pitIds);
}

public void setPitIds(List<String> pitIds) {
this.pitIds = pitIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitService;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.DefaultShardOperationFailedException;
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.cluster.routing.PlainShardsIterator;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.search.SearchService;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.SearchContextId.decode;

/**
* Transport action for retrieving segment information of PITs
*/
public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction<PitSegmentsRequest, IndicesSegmentResponse, ShardSegments> {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final SearchService searchService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final TransportService transportService;
private final PitService pitService;

@Inject
public TransportPitSegmentsAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchService searchService,
NamedWriteableRegistry namedWriteableRegistry,
PitService pitService
) {
super(
PitSegmentsAction.NAME,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
PitSegmentsRequest::new,
ThreadPool.Names.MANAGEMENT
);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.searchService = searchService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.transportService = transportService;
this.pitService = pitService;
}

/**
* Execute PIT segments flow for all PITs or request PIT IDs
*/
@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
if (request.getPitIds().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about supporting all PIT IDs for this API. While calling this API, user knows which PIT ID/s need to be queries to get the segments info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to 'indices segments api' which supports list or all indices, we've structured this as well to be consistent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we decide whether we should return results for all PITs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on request , if pit id is part of api , then we return only for that respective PIT. Otherwise we return for all PITs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't all be explicit instead?

pitService.getAllPits(ActionListener.wrap(response -> {
request.setPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
super.doExecute(task, request, listener);
}, listener::onFailure));
} else {
super.doExecute(task, request, listener);
}
}

/**
* This adds list of shards on which we need to retrieve pit segments details
* @param clusterState the cluster state
* @param request the underlying request
* @param concreteIndices the concrete indices on which to execute the operation
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) {
final ArrayList<ShardRouting> iterators = new ArrayList<>();
for (String pitId : request.getPitIds()) {
SearchContextId searchContext = decode(namedWriteableRegistry, pitId);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
// check if node is part of local cluster
if (Strings.isEmpty(perNode.getClusterAlias())) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
final ShardId shardId = entry.getKey();
iterators.add(
new PitAwareShardRouting(
pitId,
shardId,
perNode.getNode(),
null,
true,
ShardRoutingState.STARTED,
null,
null,
null,
-1L
)
);
}
}
}
return new PlainShardsIterator(iterators);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
Comment on lines +144 to +147
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be READ instead of METADATA_READ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

METADATA_READ is used in index segments as well. Read is used in operations like search, multisearch etc.


@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

@Override
protected ShardSegments readShardResult(StreamInput in) throws IOException {
return new ShardSegments(in);
}

@Override
protected IndicesSegmentResponse newResponse(
PitSegmentsRequest request,
int totalShards,
int successfulShards,
int failedShards,
List<ShardSegments> results,
List<DefaultShardOperationFailedException> shardFailures,
ClusterState clusterState
) {
return new IndicesSegmentResponse(
results.toArray(new ShardSegments[results.size()]),
totalShards,
successfulShards,
failedShards,
shardFailures
);
}

@Override
protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException {
return new PitSegmentsRequest(in);
}

@Override
public List<ShardRouting> getShardRoutingsFromInputStream(StreamInput in) throws IOException {
return in.readList(PitAwareShardRouting::new);
}

/**
* This retrieves segment details of PIT context
* @param request the node-level request
* @param shardRouting the shard on which to execute the operation
*/
@Override
protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) {
if (shardRouting instanceof PitAwareShardRouting) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an assertion instead of an if condition

PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting;
SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards()
.get(shardRouting.shardId());
PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId());
if (pitReaderContext == null) {
return new ShardSegments(shardRouting, new ArrayList<>());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.emptyList() instead?

}
return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments());
}
return new ShardSegments(shardRouting, new ArrayList<>());
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* This holds PIT id which is used to perform broadcast operation in PIT shards to retrieve segments information
*/
public class PitAwareShardRouting extends ShardRouting {

private final String pitId;

public PitAwareShardRouting(StreamInput in) throws IOException {
super(in);
this.pitId = in.readString();
}

public PitAwareShardRouting(
String pitId,
ShardId shardId,
String currentNodeId,
String relocatingNodeId,
boolean primary,
ShardRoutingState state,
RecoverySource recoverySource,
UnassignedInfo unassignedInfo,
AllocationId allocationId,
long expectedShardSize
) {
super(
shardId,
currentNodeId,
relocatingNodeId,
primary,
state,
recoverySource,
unassignedInfo,
allocationId,
expectedShardSize
);
this.pitId = pitId;
}

public String getPitId() {
return pitId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pitId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
builder.field("pitId", pitId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have inconsistent field namings for PIT ids:

  • pit_id in https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java#L79
  • id in https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java#L31
  • pitId here

We should stick to same conventions everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to pit_id

return builder.endObject();
}
}
}
Loading