Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Close Index API] Propagate tasks ids between Freeze, Close and Verify Shard actions #36630

Merged
merged 4 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {

public CloseIndexClusterStateUpdateRequest() {
private final long taskId;

public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
}

public long taskId() {
return taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,19 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta
@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}

final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -141,8 +142,9 @@ public static class ShardRequest extends ReplicationRequest<ShardRequest> {
ShardRequest(){
}

public ShardRequest(final ShardId shardId) {
public ShardRequest(final ShardId shardId, final TaskId parentTaskId) {
super(shardId);
setParentTask(parentTaskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
Expand All @@ -63,6 +62,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -120,9 +120,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
throw new IllegalArgumentException("Index name is required");
}

final TimeValue timeout = request.ackTimeout();
final TimeValue masterTimeout = request.masterNodeTimeout();

clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {

Expand All @@ -141,7 +138,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new WaitForClosedBlocksApplied(blockedIndices, timeout,
.execute(new WaitForClosedBlocksApplied(blockedIndices, request,
ActionListener.wrap(closedBlocksResults ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
Expand Down Expand Up @@ -176,7 +173,7 @@ public void onFailure(final String source, final Exception e) {

@Override
public TimeValue timeout() {
return masterTimeout;
return request.masterNodeTimeout();
}
}
);
Expand Down Expand Up @@ -246,18 +243,18 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta
class WaitForClosedBlocksApplied extends AbstractRunnable {

private final Set<Index> blockedIndices;
private final @Nullable TimeValue timeout;
private final CloseIndexClusterStateUpdateRequest request;
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;

private WaitForClosedBlocksApplied(final Set<Index> blockedIndices,
final @Nullable TimeValue timeout,
final CloseIndexClusterStateUpdateRequest request,
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices");
}
this.blockedIndices = blockedIndices;
this.request = request;
this.listener = listener;
this.timeout = timeout;
}

@Override
Expand All @@ -271,7 +268,7 @@ protected void doRun() throws Exception {
final CountDown countDown = new CountDown(blockedIndices.size());
final ClusterState state = clusterService.state();
for (Index blockedIndex : blockedIndices) {
waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> {
waitForShardsReadyForClosing(blockedIndex, state, response -> {
results.put(blockedIndex, response);
if (countDown.countDown()) {
listener.onResponse(unmodifiableMap(results));
Expand All @@ -280,7 +277,7 @@ protected void doRun() throws Exception {
}
}

private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout,
private void waitForShardsReadyForClosing(final Index index, final ClusterState state,
final Consumer<AcknowledgedResponse> onResponse) {
final IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
Expand All @@ -302,7 +299,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
final IndexShardRoutingTable shardRoutingTable = shard.value;
final ShardId shardId = shardRoutingTable.shardId();
sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener<ReplicationResponse>() {
sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener<ReplicationResponse>() {
@Override
public void innerOnResponse(final ReplicationResponse replicationResponse) {
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
Expand All @@ -326,7 +323,7 @@ private void processIfFinished() {
}
}

private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout,
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable,
final ActionListener<ReplicationResponse> listener) {
final ShardId shardId = shardRoutingTable.shardId();
if (shardRoutingTable.primaryShard().unassigned()) {
Expand All @@ -336,10 +333,11 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar
listener.onResponse(response);
return;
}
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
if (timeout != null) {
shardRequest.timeout(timeout);
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId);
if (request.ackTimeout() != null) {
shardRequest.timeout(request.ackTimeout());
}
// TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -130,7 +131,7 @@ public static void afterClass() {

private void executeOnPrimaryOrReplica() throws Exception {
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId());
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong()));
if (randomBoolean()) {
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
} else {
Expand Down Expand Up @@ -204,7 +205,8 @@ public void testUnavailableShardsMarkedAsStale() throws Exception {
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));

final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L));
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,19 @@ private Index[] resolveIndices(FreezeRequest request, ClusterState state) {

@Override
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<FreezeResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state,
ActionListener<TransportFreezeIndexAction.FreezeResponse> listener) throws Exception {
final Index[] concreteIndices = resolveIndices(request, state);
if (concreteIndices.length == 0) {
listener.onResponse(new FreezeResponse(true, true));
return;
}

final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
Expand Down