Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Close Index API] Mark shard copy as stale if needed during shard verification #36755

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
Expand All @@ -39,6 +40,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

Expand Down Expand Up @@ -109,6 +112,30 @@ private void executeShardOperation(final IndexShard indexShard) {
logger.debug("{} shard is ready for closing", shardId);
}

@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
}

/**
* A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification
* and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {

VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}

@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

ShardRequest(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,55 @@
*/
package org.elasticsearch.action.admin.indices.close;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -47,9 +75,17 @@

public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {

private static ThreadPool threadPool;

private IndexShard indexShard;
private TransportVerifyShardBeforeCloseAction action;
private ClusterService clusterService;
private CapturingTransport transport;

@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool(getTestClass().getName());
}

@Override
@Before
Expand All @@ -64,13 +100,32 @@ public void setUp() throws Exception {
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
when(indexShard.shardId()).thenReturn(shardId);

clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test"))
clusterService = createClusterService(threadPool);
setState(clusterService, new ClusterState.Builder(clusterService.state())
.blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build());

action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService,
mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class));
transport = new CapturingTransport();
TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
}

@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}

private void executeOnPrimaryOrReplica() throws Exception {
Expand Down Expand Up @@ -98,7 +153,7 @@ public void testOperationFailsWithOnGoingOps() {
}

public void testOperationFailsWithNoBlock() {
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")).build());
setState(clusterService, new ClusterState.Builder(new ClusterName("test")).build());

IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(),
Expand All @@ -119,4 +174,149 @@ public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}

public void testUnavailableShardsMarkedAsStale() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);

final int nbReplicas = randomIntBetween(1, 10);
final ShardRoutingState[] replicaStates = new ShardRoutingState[nbReplicas];
for (int i = 0; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.STARTED;
}
final ClusterState clusterState = state(index, true, ShardRoutingState.STARTED, replicaStates);
setState(clusterService, clusterState);

IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(index).shard(shardId.id());
final IndexMetaData indexMetaData = clusterState.getMetaData().index(index);
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final long primaryTerm = indexMetaData.primaryTerm(0);

final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(0);
final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();

List<ShardRouting> unavailableShards = randomSubsetOf(randomIntBetween(1, nbReplicas), shardRoutingTable.replicaShards());
IndexShardRoutingTable.Builder shardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardRoutingTable);
unavailableShards.forEach(shardRoutingTableBuilder::removeShard);
shardRoutingTable = shardRoutingTableBuilder.build();

final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));

final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
new ReplicationOperation<>(request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test");
operation.execute();

final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(nbReplicas));

for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
final String actionName = capturedRequest.action;
if (actionName.startsWith(ShardStateAction.SHARD_FAILED_ACTION_NAME)) {
assertThat(capturedRequest.request, instanceOf(ShardStateAction.FailedShardEntry.class));
String allocationId = ((ShardStateAction.FailedShardEntry) capturedRequest.request).getAllocationId();
assertTrue(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId)));
transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE);

} else if (actionName.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) {
assertThat(capturedRequest.request, instanceOf(ConcreteShardRequest.class));
String allocationId = ((ConcreteShardRequest) capturedRequest.request).getTargetAllocationID();
assertFalse(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId)));
assertTrue(inSyncAllocationIds.stream().anyMatch(inSyncAllocationId -> inSyncAllocationId.equals(allocationId)));
transport.handleResponse(capturedRequest.requestId, new TransportReplicationAction.ReplicaResponse(0L, 0L));

} else {
fail("Test does not support action " + capturedRequest.action);
}
}

final ReplicationResponse.ShardInfo shardInfo = listener.get().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
assertThat(shardInfo.getSuccessful(), equalTo(1 + nbReplicas - unavailableShards.size()));
}

private static ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>
createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) {
return new ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>() {
@Override
public ShardRouting routingEntry() {
return primary;
}

@Override
public ReplicationGroup getReplicationGroup() {
return replicationGroup;
}

@Override
public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception {
return new PrimaryResult(request);
}

@Override
public void failShard(String message, Exception exception) {

}

@Override
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
}

@Override
public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) {
}

@Override
public long localCheckpoint() {
return 0;
}

@Override
public long globalCheckpoint() {
return 0;
}

@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return 0;
}
};
}

private static class PrimaryResult implements ReplicationOperation.PrimaryResult<TransportVerifyShardBeforeCloseAction.ShardRequest> {

private final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest;
private final SetOnce<ReplicationResponse.ShardInfo> shardInfo;

private PrimaryResult(final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest) {
this.replicaRequest = replicaRequest;
this.shardInfo = new SetOnce<>();
}

@Override
public TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest() {
return replicaRequest;
}

@Override
public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
this.shardInfo.set(shardInfo);
}

public ReplicationResponse.ShardInfo getShardInfo() {
return shardInfo.get();
}
}

}