Skip to content

Commit

Permalink
Allow snapshotting replicated closed indices (#39644)
Browse files Browse the repository at this point in the history
This adds the capability to snapshot replicated closed indices.

It also changes snapshot requests in v8.0.0 to automatically expand wildcards to closed indices and hence start snapshotting closed indices by default. For v7.1.0 and above, wildcards are by default only expanded to open indices, which can be changed by explicitly setting the expand_wildcards option either to all or closed.

Note that indices are always restored as open indices, even if they have been snapshotted as closed replicated indices.

Relates to #33888
  • Loading branch information
ywelsch committed Mar 6, 2019
1 parent e620fb2 commit fef11f7
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ protected CreateSnapshotResponse newResponse() {

@Override
protected ClusterBlockException checkBlock(CreateSnapshotRequest request, ClusterState state) {
// We are reading the cluster metadata and indices - so we need to check both blocks
// We only check metadata block, as we want to snapshot closed indices (which have a read block)
ClusterBlockException clusterBlockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
if (clusterBlockException != null) {
return clusterBlockException;
}
return state.blocks()
.indicesBlockedException(ClusterBlockLevel.READ, indexNameExpressionResolver.concreteIndexNames(state, request));
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -473,9 +474,6 @@ private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMeta
* merging them with settings in changeSettings.
*/
private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings, String[] ignoreSettings) {
if (changeSettings.names().isEmpty() && ignoreSettings.length == 0) {
return indexMetaData;
}
Settings normalizedChangeSettings = Settings.builder()
.put(changeSettings)
.normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX)
Expand Down Expand Up @@ -519,6 +517,7 @@ private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings
return true;
}
}));
settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
return builder.settings(settingsBuilder).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,11 +1384,6 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
// The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
} else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed"));
}
} else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -114,11 +113,8 @@ public void testCreateSnapshotWithIndexBlocks() {
logger.info("--> creating a snapshot is blocked when an index is blocked for reads");
try {
enableIndexBlock(INDEX_NAME, SETTING_BLOCKS_READ);
assertBlocked(client().admin().cluster().prepareCreateSnapshot(REPOSITORY_NAME, "snapshot-2")
.setIndices(COMMON_INDEX_NAME_MASK), IndexMetaData.INDEX_READ_BLOCK);
logger.info("--> creating a snapshot is not blocked when an read-blocked index is not part of the snapshot");
assertThat(client().admin().cluster().prepareCreateSnapshot(REPOSITORY_NAME, "snapshot-2")
.setIndices(OTHER_INDEX_NAME).setWaitForCompletion(true).get().status(), equalTo(RestStatus.OK));
.setIndices(COMMON_INDEX_NAME_MASK).setWaitForCompletion(true).get().status(), equalTo(RestStatus.OK));
} finally {
disableIndexBlock(INDEX_NAME, SETTING_BLOCKS_READ);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
Expand All @@ -43,7 +42,6 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
Expand Down Expand Up @@ -102,7 +100,6 @@
import java.util.function.Consumer;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -530,15 +527,14 @@ public void testRestoreIndexWithMissingShards() throws Exception {
logger.info("--> create an index that will be closed");
assertAcked(prepareCreate("test-idx-closed", 1, Settings.builder().put("number_of_shards", 4)
.put("number_of_replicas", 0)));
ensureGreen("test-idx-closed");

logger.info("--> indexing some data into test-idx-all");
for (int i = 0; i < 100; i++) {
index("test-idx-all", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-closed", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh("test-idx-closed", "test-idx-all"); // don't refresh test-idx-some it will take 30 sec until it times out...
assertThat(client().prepareSearch("test-idx-all").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
assertThat(client().prepareSearch("test-idx-closed").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
assertAcked(client().admin().indices().prepareClose("test-idx-closed"));

logger.info("--> create an index that will have no allocated shards");
Expand All @@ -552,23 +548,17 @@ public void testRestoreIndexWithMissingShards() throws Exception {
.setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));

logger.info("--> start snapshot with default settings and closed index - should be blocked");
assertBlocked(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);


logger.info("--> start snapshot with default settings without a closed index - should fail");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(true).execute().actionGet();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED));
assertThat(createSnapshotResponse.getSnapshotInfo().reason(), containsString("Indices don't have primary shards"));

if (randomBoolean()) {
logger.info("checking snapshot completion using status");
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(false).setPartial(true).execute().actionGet();
assertBusy(() -> {
SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo")
Expand All @@ -585,9 +575,9 @@ public void testRestoreIndexWithMissingShards() throws Exception {
SnapshotStatus snapshotStatus = snapshotStatuses.get(0);
logger.info("State: [{}], Reason: [{}]",
createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().reason());
assertThat(snapshotStatus.getShardsStats().getTotalShards(), equalTo(18));
assertThat(snapshotStatus.getShardsStats().getDoneShards(), lessThan(12));
assertThat(snapshotStatus.getShardsStats().getDoneShards(), greaterThan(6));
assertThat(snapshotStatus.getShardsStats().getTotalShards(), equalTo(22));
assertThat(snapshotStatus.getShardsStats().getDoneShards(), lessThan(16));
assertThat(snapshotStatus.getShardsStats().getDoneShards(), greaterThan(10));

// There is slight delay between snapshot being marked as completed in the cluster state and on the file system
// After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well
Expand All @@ -602,19 +592,19 @@ public void testRestoreIndexWithMissingShards() throws Exception {
} else {
logger.info("checking snapshot completion using wait_for_completion flag");
createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(true).setPartial(true).execute().actionGet();
logger.info("State: [{}], Reason: [{}]",
createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().reason());
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(18));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(12));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(6));
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(22));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(16));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(10));
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").execute().actionGet()
.getSnapshots().get(0).state(),
equalTo(SnapshotState.PARTIAL));
}

assertAcked(client().admin().indices().prepareClose("test-idx-some", "test-idx-all"));
assertAcked(client().admin().indices().prepareClose("test-idx-all"));

logger.info("--> restore incomplete snapshot - should fail");
assertThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false)
Expand Down Expand Up @@ -654,6 +644,16 @@ public void testRestoreIndexWithMissingShards() throws Exception {

assertThat(client().prepareSearch("test-idx-some").setSize(0).get().getHits().getTotalHits().value, allOf(greaterThan(0L),
lessThan(100L)));

logger.info("--> restore snapshot for the closed index that was snapshotted completely");
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
.setRestoreGlobalState(false).setIndices("test-idx-closed").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(4));
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(4));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));

assertThat(client().prepareSearch("test-idx-closed").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
}

public void testRestoreIndexWithShardsMissingInLocalGateway() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateMissing;
Expand Down Expand Up @@ -227,6 +226,11 @@ public void testBasicWorkFlow() throws Exception {
}
}

final boolean snapshotClosed = randomBoolean();
if (snapshotClosed) {
assertAcked(client.admin().indices().prepareClose(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get());
}

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices(indicesToSnapshot).get();
Expand All @@ -241,6 +245,10 @@ public void testBasicWorkFlow() throws Exception {
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));

if (snapshotClosed) {
assertAcked(client.admin().indices().prepareOpen(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get());
}

logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "_doc", Integer.toString(i)).get();
Expand Down Expand Up @@ -271,6 +279,9 @@ public void testBasicWorkFlow() throws Exception {
assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 50L);
}

assertNull(client.admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1",
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()));

for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) {
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
ShardId shardId = shardStats.getShardRouting().shardId();
Expand All @@ -294,6 +305,9 @@ public void testBasicWorkFlow() throws Exception {
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));

assertNull(client.admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1",
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()));

for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) {
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
ShardId shardId = shardStats.getShardRouting().shardId();
Expand Down Expand Up @@ -1561,33 +1575,8 @@ public void testSnapshotClosedIndex() throws Exception {
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().indices().size(), equalTo(1));
assertThat(createSnapshotResponse.getSnapshotInfo().indices().size(), equalTo(2));
assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), equalTo(0));

logger.info("--> deleting snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").get();

logger.info("--> snapshot with closed index");
assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true)
.setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
}

public void testSnapshotSingleClosedIndex() throws Exception {
Client client = client();

logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())));

createIndex("test-idx");
ensureGreen();
logger.info("--> closing index test-idx");
assertAcked(client.admin().indices().prepareClose("test-idx"));

logger.info("--> snapshot");
assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
}

public void testRenameOnRestore() throws Exception {
Expand Down Expand Up @@ -2483,6 +2472,7 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
} else {
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
}
boolean closedOnPartial = false;
try {
if (allowPartial) {
// partial snapshots allow close / delete operations
Expand All @@ -2491,6 +2481,7 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
client.admin().indices().prepareDelete("test-idx-1").get();
} else {
logger.info("--> close index while partial snapshot is running");
closedOnPartial = true;
client.admin().indices().prepareClose("test-idx-1").get();
}
} else {
Expand Down Expand Up @@ -2525,7 +2516,7 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
logger.info("--> waiting for snapshot to finish");
CreateSnapshotResponse createSnapshotResponse = future.get();

if (allowPartial) {
if (allowPartial && closedOnPartial == false) {
logger.info("Deleted/Closed index during snapshot, but allow partial");
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL)));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
Expand Down

0 comments on commit fef11f7

Please sign in to comment.