Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Race in Concurrent Snapshot Delete and Create #37612

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1118,14 +1118,20 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream()
.map(e -> e.snapshot().getSnapshotId()).filter(s -> s.getName().equals(snapshotName)).findFirst();
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
// Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L;
}
}
if (matchedEntry.isPresent() == false) {
throw new SnapshotMissingException(repositoryName, snapshotName);
}
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repositoryData.getGenId(), immediatePriority);
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
Expand Down Expand Up @@ -196,6 +199,53 @@ public void testSuccessfulSnapshot() {
assertEquals(0, snapshotInfo.failedShards());
}

public void testConcurrentSnapshotCreateAndDelete() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";

final int shards = randomIntBetween(1, 10);

TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(
Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shards)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)),
assertNoFailureListener(
() -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(assertNoFailureListener(
() -> masterNode.client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName),
assertNoFailureListener(() -> masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(
assertNoFailureListener(() -> createdSnapshot.set(true))
)))))))));

deterministicTaskQueue.runAllRunnableTasks();

assertTrue(createdSnapshot.get());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
assertThat(snapshotIds, hasSize(1));

final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
assertEquals(shards, snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
}

private void startCluster() {
final ClusterState initialClusterState =
new ClusterState.Builder(ClusterName.DEFAULT).nodes(testClusterNodes.randomDiscoveryNodes()).build();
Expand Down Expand Up @@ -519,6 +569,11 @@ allocationService, new AliasValidator(), environment, indexScopedSettings,
transportService, clusterService, threadPool,
snapshotsService, actionFilters, indexNameExpressionResolver
));
actions.put(DeleteSnapshotAction.INSTANCE,
new TransportDeleteSnapshotAction(
transportService, clusterService, threadPool,
snapshotsService, actionFilters, indexNameExpressionResolver
));
client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
}

Expand Down