diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 357268fa051e0..63e2957aacc78 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -58,6 +58,11 @@ public final class RepositoryData { */ public static final long UNKNOWN_REPO_GEN = -2L; + /** + * The generation value indicating that the repository generation could not be determined. + */ + public static final long CORRUPTED_REPO_GEN = -3L; + /** * An instance initialized for an empty repository. */ diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b56a22e99845b..b9016394ca514 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; @@ -184,6 +185,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp */ public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", true, Setting.Property.NodeScope); + /** + * When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository + * contents will not result in the repository being marked as corrupted. + * Note: This setting is intended as a backwards compatibility solution for 7.x and will go away in 8. + */ + public static final Setting ALLOW_CONCURRENT_MODIFICATION = + Setting.boolSetting("allow_concurrent_modifications", false, Setting.Property.Deprecated); + private final boolean compress; private final RateLimiter snapshotRateLimiter; @@ -216,6 +225,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ClusterService clusterService; + /** + * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for + * {@link RepositoryMetaData#pendingGeneration()} than for {@link RepositoryMetaData#generation()} indicating a full cluster restart + * potentially accounting for the the last {@code index-N} write in the cluster state. + * Note: While it is true that this value could also be set to {@code true} for an instance on a node that is just joining the cluster + * during a new {@code index-N} write, this does not present a problem. The node will still load the correct {@link RepositoryData} in + * all cases and simply do a redundant listing of the repository contents if it tries to load {@link RepositoryData} and falls back + * to {@link #latestIndexBlobId()} to validate the value of {@link RepositoryMetaData#generation()}. + */ + private boolean uncleanStart; + + /** + * This flag indicates that the repository can not exclusively rely on the value stored in {@link #latestKnownRepoGen} to determine the + * latest repository generation but must inspect its physical contents as well via {@link #latestIndexBlobId()}. + * This flag is set in the following situations: + * + */ + private volatile boolean bestEffortConsistency; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -249,6 +286,8 @@ protected BlobStoreRepository( @Override protected void doStart() { + uncleanStart = metadata.pendingGeneration() > RepositoryData.EMPTY_REPO_GEN && + metadata.generation() != metadata.pendingGeneration(); ByteSizeValue chunkSize = chunkSize(); if (chunkSize != null && chunkSize.getBytes() <= 0) { throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]"); @@ -279,29 +318,42 @@ protected void doClose() { // #latestKnownRepoGen if a newer than currently known generation is found @Override public void updateState(ClusterState state) { - if (readOnly) { + metadata = getRepoMetaData(state); + uncleanStart = uncleanStart && metadata.generation() != metadata.pendingGeneration(); + bestEffortConsistency = uncleanStart || isReadOnly() + || state.nodes().getMinNodeVersion().before(RepositoryMetaData.REPO_GEN_IN_CS_VERSION) + || metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN || ALLOW_CONCURRENT_MODIFICATION.get(metadata.settings()); + if (isReadOnly()) { // No need to waste cycles, no operations can run against a read-only repository return; } - long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN; - final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress != null) { - bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries()); - } - final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE); - // Don't use generation from the delete task if we already found a generation for an in progress snapshot. - // In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet exist - if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) { - bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries()); - } - final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); - if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) { - bestGenerationFromCS = bestGeneration(cleanupInProgress.entries()); + if (bestEffortConsistency) { + long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN; + final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null) { + bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries()); + } + final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE); + // Don't use generation from the delete task if we already found a generation for an in progress snapshot. + // In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet + // exist + if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) { + bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries()); + } + final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); + if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) { + bestGenerationFromCS = bestGeneration(cleanupInProgress.entries()); + } + final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation()); + latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen)); + } else { + final long previousBest = latestKnownRepoGen.getAndSet(metadata.generation()); + if (previousBest != metadata.generation()) { + assert metadata.generation() == RepositoryData.CORRUPTED_REPO_GEN || previousBest < metadata.generation() : + "Illegal move from repository generation [" + previousBest + "] to generation [" + metadata.generation() + "]"; + logger.debug("Updated repository generation from [{}] to [{}]", previousBest, metadata.generation()); + } } - - metadata = getRepoMetaData(state); - final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation()); - latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen)); } private long bestGeneration(Collection operations) { @@ -446,7 +498,12 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea */ private RepositoryData safeRepositoryData(long repositoryStateId, Map rootBlobs) { final long generation = latestGeneration(rootBlobs.keySet()); - final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId)); + final long genToLoad; + if (bestEffortConsistency) { + genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId)); + } else { + genToLoad = latestKnownRepoGen.get(); + } if (genToLoad > generation) { // It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just // debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or @@ -983,36 +1040,106 @@ public void endVerification(String seed) { // Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs // and concurrent modifications. - private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); + private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN); @Override public void getRepositoryData(ActionListener listener) { - ActionListener.completeWith(listener, () -> { - // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. - while (true) { + if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) { + listener.onFailure(corruptedStateException(null)); + return; + } + // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. + while (true) { + final long genToLoad; + if (bestEffortConsistency) { + // We're only using #latestKnownRepoGen as a hint in this mode and listing repo contents as a secondary way of trying + // to find a higher generation final long generation; try { generation = latestIndexBlobId(); } catch (IOException ioe) { throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); } - final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); + genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); if (genToLoad > generation) { logger.info("Determined repository generation [" + generation + "] from repository contents but correct generation must be at least [" + genToLoad + "]"); } - try { - return getRepositoryData(genToLoad); - } catch (RepositoryException e) { - if (genToLoad != latestKnownRepoGen.get()) { - logger.warn("Failed to load repository data generation [" + genToLoad + - "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); - continue; - } + } else { + // We only rely on the generation tracked in #latestKnownRepoGen which is exclusively updated from the cluster state + genToLoad = latestKnownRepoGen.get(); + } + try { + listener.onResponse(getRepositoryData(genToLoad)); + return; + } catch (RepositoryException e) { + if (genToLoad != latestKnownRepoGen.get()) { + logger.warn("Failed to load repository data generation [" + genToLoad + + "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); + continue; + } + if (bestEffortConsistency == false && ExceptionsHelper.unwrap(e, NoSuchFileException.class) != null) { + // We did not find the expected index-N even though the cluster state continues to point at the missing value + // of N so we mark this repository as corrupted. + markRepoCorrupted(genToLoad, e, + ActionListener.wrap(v -> listener.onFailure(corruptedStateException(e)), listener::onFailure)); + return; + } else { throw e; } } - }); + } + } + + private RepositoryException corruptedStateException(@Nullable Exception cause) { + return new RepositoryException(metadata.name(), + "Could not read repository data because the contents of the repository do not match its " + + "expected state. This is likely the result of either concurrently modifying the contents of the " + + "repository by a process other than this cluster or an issue with the repository's underlying" + + "storage. The repository has been disabled to prevent corrupting its contents. To re-enable it " + + "and continue using it please remove the repository from the cluster and add it again to make " + + "the cluster recover the known state of the repository from its physical contents.", cause); + } + + /** + * Marks the repository as corrupted. This puts the repository in a state where its tracked value for + * {@link RepositoryMetaData#pendingGeneration()} is unchanged while its value for {@link RepositoryMetaData#generation()} is set to + * {@link RepositoryData#CORRUPTED_REPO_GEN}. In this state, the repository can not be used any longer and must be removed and + * recreated after the problem that lead to it being marked as corrupted has been fixed. + * + * @param corruptedGeneration generation that failed to load because the index file was not found but that should have loaded + * @param originalException exception that lead to the failing to load the {@code index-N} blob + * @param listener listener to invoke once done + */ + private void markRepoCorrupted(long corruptedGeneration, Exception originalException, ActionListener listener) { + assert corruptedGeneration != RepositoryData.UNKNOWN_REPO_GEN; + assert bestEffortConsistency == false; + clusterService.submitStateUpdateTask("mark repository corrupted [" + metadata.name() + "][" + corruptedGeneration + "]", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + final RepositoriesMetaData state = currentState.metaData().custom(RepositoriesMetaData.TYPE); + final RepositoryMetaData repoState = state.repository(metadata.name()); + if (repoState.generation() != corruptedGeneration) { + throw new IllegalStateException("Tried to mark repo generation [" + corruptedGeneration + + "] as corrupted but its state concurrently changed to [" + repoState + "]"); + } + return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData()).putCustom( + RepositoriesMetaData.TYPE, state.withUpdatedGeneration( + metadata.name(), RepositoryData.CORRUPTED_REPO_GEN, repoState.pendingGeneration())).build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(new RepositoryException(metadata.name(), "Failed marking repository state as corrupted", + ExceptionsHelper.useOrSuppress(e, originalException))); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(null); + } + }); } private RepositoryData getRepositoryData(long indexGen) { @@ -1029,11 +1156,13 @@ private RepositoryData getRepositoryData(long indexGen) { return RepositoryData.snapshotsFromXContent(parser, indexGen); } } catch (IOException ioe) { - // If we fail to load the generation we tracked in latestKnownRepoGen we reset it. - // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent - // operations must start from the EMPTY_REPO_GEN again - if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) { - logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe); + if (bestEffortConsistency) { + // If we fail to load the generation we tracked in latestKnownRepoGen we reset it. + // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent + // operations must start from the EMPTY_REPO_GEN again + if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) { + logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe); + } } throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe); } @@ -1085,13 +1214,12 @@ public ClusterState execute(ClusterState currentState) { final RepositoryMetaData meta = getRepoMetaData(currentState); final String repoName = metadata.name(); final long genInState = meta.generation(); - // TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData - final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN; + final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN || bestEffortConsistency; if (uninitializedMeta == false && meta.pendingGeneration() != genInState) { logger.info("Trying to write new repository data over unfinished write, repo [{}] is at " + "safe generation [{}] and pending generation [{}]", meta.name(), genInState, meta.pendingGeneration()); } - assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation() + assert expectedGen == RepositoryData.EMPTY_REPO_GEN || uninitializedMeta || expectedGen == meta.generation() : "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]"; // If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of @@ -1102,7 +1230,8 @@ public ClusterState execute(ClusterState currentState) { // even if a repository has been manually cleared of all contents we will never reuse the same repository generation. // This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does // not offer any consistency guarantees when it comes to overwriting the same blob name with different content. - newGen = uninitializedMeta ? expectedGen + 1: metadata.pendingGeneration() + 1; + final long nextPendingGen = metadata.pendingGeneration() + 1; + newGen = uninitializedMeta ? Math.max(expectedGen + 1, nextPendingGen) : nextPendingGen; assert newGen > latestKnownRepoGen.get() : "Attempted new generation [" + newGen + "] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]"; return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData()) diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 432091b81e1ec..40e17a81be40f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; @@ -193,13 +194,16 @@ public void testSnapshotWithConflictingName() throws IOException { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), - BlobStoreTestUtil.mockClusterService(repositoryMetaData)) { + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData); + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually } }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); repository.start(); return repository; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java new file mode 100644 index 0000000000000..2a860eda3f972 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -0,0 +1,247 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoriesMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase { + + public void testConcurrentlyChangeRepositoryContents() throws Exception { + Client client = client(); + + Path repo = randomRepoPath(); + final String repoName = "test-repo"; + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex("test-idx-1", "test-idx-2"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1").setSource("foo", "bar"), + client().prepareIndex("test-idx-2").setSource("foo", "bar")); + + final String snapshot = "test-snap"; + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + logger.info("--> move index-N blob to next generation"); + final RepositoryData repositoryData = + getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName)); + Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1))); + + assertRepositoryBlocked(client, repoName, snapshot); + + if (randomBoolean()) { + logger.info("--> move index-N blob back to initial generation"); + Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId())); + + logger.info("--> verify repository remains blocked"); + assertRepositoryBlocked(client, repoName, snapshot); + } + + logger.info("--> remove repository"); + assertAcked(client.admin().cluster().prepareDeleteRepository(repoName)); + + logger.info("--> recreate repository"); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + logger.info("--> delete snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + + logger.info("--> make sure snapshot doesn't exist"); + expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName) + .addSnapshots(snapshot).get().getSnapshots(repoName)); + } + + public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception { + Client client = client(); + + Path repo = randomRepoPath(); + final String repoName = "test-repo"; + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex("test-idx-1", "test-idx-2"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1").setSource("foo", "bar"), + client().prepareIndex("test-idx-2").setSource("foo", "bar")); + + final String snapshot = "test-snap"; + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName); + + logger.info("--> move index-N blob to next generation"); + final RepositoryData repositoryData = getRepositoryData(repository); + final long beforeMoveGen = repositoryData.getGenId(); + Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1))); + + logger.info("--> verify index-N blob is found at the new location"); + assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1)); + + logger.info("--> delete snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + + logger.info("--> verify index-N blob is found at the expected location"); + assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2)); + + logger.info("--> make sure snapshot doesn't exist"); + expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName) + .addSnapshots(snapshot).get().getSnapshots(repoName)); + } + + public void testFindDanglingLatestGeneration() throws Exception { + Path repo = randomRepoPath(); + final String repoName = "test-repo"; + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client().admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex("test-idx-1", "test-idx-2"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1").setSource("foo", "bar"), + client().prepareIndex("test-idx-2").setSource("foo", "bar")); + + final String snapshot = "test-snap"; + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + + logger.info("--> move index-N blob to next generation"); + final RepositoryData repositoryData = getRepositoryData(repository); + final long beforeMoveGen = repositoryData.getGenId(); + Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1))); + + logger.info("--> set next generation as pending in the cluster state"); + final PlainActionFuture csUpdateFuture = PlainActionFuture.newFuture(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(RepositoriesMetaData.TYPE, + currentState.metaData().custom(RepositoriesMetaData.TYPE).withUpdatedGeneration( + repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + csUpdateFuture.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + csUpdateFuture.onResponse(null); + } + } + ); + csUpdateFuture.get(); + + logger.info("--> full cluster restart"); + internalCluster().fullRestart(); + ensureGreen(); + + Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + + logger.info("--> verify index-N blob is found at the new location"); + assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1)); + + logger.info("--> delete snapshot"); + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + + logger.info("--> verify index-N blob is found at the expected location"); + assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2)); + + logger.info("--> make sure snapshot doesn't exist"); + expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName) + .addSnapshots(snapshot).get().getSnapshots(repoName)); + } + + private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) { + logger.info("--> try to delete snapshot"); + final RepositoryException repositoryException3 = expectThrows(RepositoryException.class, + () -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet()); + assertThat(repositoryException3.getMessage(), + containsString("Could not read repository data because the contents of the repository do not match its expected state.")); + + logger.info("--> try to create snapshot"); + final RepositoryException repositoryException4 = expectThrows(RepositoryException.class, + () -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet()); + assertThat(repositoryException4.getMessage(), + containsString("Could not read repository data because the contents of the repository do not match its expected state.")); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 47e2626a3b7f3..7443eaded77a7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -140,6 +140,8 @@ public void testOverwriteSnapshotInfoBlob() { try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) { clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); repository.start(); // We create a snap- blob for snapshot "foo" in the first generation diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index c6e76ac7174ed..efc0e653edf5c 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -76,6 +76,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { deleteAndAssertEmpty(getRepository().basePath()); + client().admin().cluster().prepareDeleteRepository("test-repo").get(); super.tearDown(); } @@ -169,8 +170,6 @@ protected void assertBlobsByPrefix(BlobPath path, String prefix, Map currentState = new AtomicReference<>(initialState); + // Setting local node as master so it may update the repository metadata in the cluster state + final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT); + final AtomicReference currentState = new AtomicReference<>( + ClusterState.builder(initialState).nodes( + DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build()); when(clusterService.state()).then(invocationOnMock -> currentState.get()); final List appliers = new CopyOnWriteArrayList<>(); doAnswer(invocation -> { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index dba66e0b1b1db..bf6512cc531e2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; @@ -352,8 +353,12 @@ private Environment createEnvironment() { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), - BlobStoreTestUtil.mockClusterService(repositoryMetaData)); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData); + final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService); + clusterService.addStateApplier(e -> repository.updateState(e.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + return repository; } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {