Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for replicating closed indices #39499

Merged
merged 37 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c63fd69
[RCI] Add NoOpEngine for closed indices (#33903)
tlrx Sep 26, 2018
54d110b
[RCI] Adapt NoOpEngine to latest FrozenEngine changes
tlrx Nov 8, 2018
cae4155
Relax NoOpEngine constraints (#37413)
tlrx Jan 23, 2019
e53a9be
Fix compilation error in IndexShardIT after merge with master
tlrx Jan 29, 2019
46a6fba
Merge branch 'master' into replicated-closed-indices
tlrx Feb 1, 2019
fd1046c
Merge branch 'master' into replicated-closed-indices
tlrx Feb 4, 2019
02cc730
Allow shards of closed indices to be replicated as regular shards (#3…
tlrx Feb 4, 2019
82b4ba6
Merge branch 'master' into replicated-closed-indices
tlrx Feb 5, 2019
404d065
Merge branch 'master' into replicated-closed-indices
tlrx Feb 6, 2019
5c61cac
Merge branch 'master' into replicated-closed-indices
tlrx Feb 6, 2019
d746ef5
Merge branch 'master' into replicated-closed-indices
tlrx Feb 6, 2019
f53cd65
Merge branch 'master' into replicated-closed-indices
tlrx Feb 8, 2019
a663613
Merge branch 'master' into replicated-closed-indices
tlrx Feb 9, 2019
b9becdd
Adapt testPendingTasks() for replicated closed indices (#38326)
tlrx Feb 11, 2019
cf9a015
Adapt testIndexCanChangeCustomDataPath for replicated closed indices …
tlrx Feb 11, 2019
e845b0a
Do not schedule Refresh/Translog/GlobalCheckpoint tasks for closed in…
tlrx Feb 11, 2019
0cd3636
Merge branch 'master' into replicated-closed-indices
tlrx Feb 11, 2019
b55aca7
Merge branch 'master' into replicated-closed-indices
tlrx Feb 12, 2019
03335b2
Merge branch 'master' into replicated-closed-indices
tlrx Feb 13, 2019
040476a
Merge branch 'master' into replicated-closed-indices
tlrx Feb 15, 2019
00f1828
Mute CloseFollowerIndexIT.testCloseAndReopenFollowerIndex()
tlrx Feb 15, 2019
c484c66
Remove index routing table of closed indices in mixed versions cluste…
tlrx Feb 18, 2019
43f555b
Merge branch 'master' into replicated-closed-indices
tlrx Feb 18, 2019
b756f6c
Test the Cluster Shard Allocation Explain API with closed indices (#3…
tlrx Feb 18, 2019
538cdcd
Merge branch 'master' into replicated-closed-indices
tlrx Feb 20, 2019
0519016
Add replica to primary promotion test for closed indices (#39110)
tlrx Feb 21, 2019
05debeb
Merge branch 'master' into replicated-closed-indices
tlrx Feb 21, 2019
4fd1bb2
Adapt more tests suites to closed indices (#39186)
tlrx Feb 22, 2019
4db7fd9
Adapt the Recovery API for closed indices (#38421)
tlrx Feb 25, 2019
d0cf376
Merge branch 'master' into replicated-closed-indices
tlrx Feb 25, 2019
71f5c34
Recover closed indices after a full cluster restart (#39249)
tlrx Feb 26, 2019
3e61939
Adapt CloseFollowerIndexIT for replicated closed indices (#38767)
tlrx Feb 26, 2019
5e7a428
Adapt the Cluster Health API to closed indices (#39364)
tlrx Feb 26, 2019
3f9993d
Wait for shards to be active after closing indices (#38854)
tlrx Feb 26, 2019
1b7eb90
Merge branch 'master' into replicated-closed-indices
tlrx Feb 28, 2019
c6c42a1
Adapt NoOpEngineTests after #39006
tlrx Feb 28, 2019
79b9a5a
Amend comment
tlrx Feb 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
Expand All @@ -41,6 +42,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,8 +61,11 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* Tests to run before and after a full cluster restart. This is run twice,
Expand Down Expand Up @@ -951,6 +956,97 @@ public void testSoftDeletes() throws Exception {
}
}

/**
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
* it verifies that the index exists and is replicated if the old version supports replication.
*/
public void testClosedIndices() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen(index);

int numDocs = 0;
if (randomBoolean()) {
numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
assertOK(client().performRequest(request));
if (rarely()) {
refresh();
}
}
refresh();
}

assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
closeIndex(index);
}

if (getOldClusterVersion().onOrAfter(Version.V_8_0_0)) {
ensureGreenLongWait(index);
assertClosedIndex(index, true);
} else {
assertClosedIndex(index, false);
}

if (isRunningAgainstOldCluster() == false) {
openIndex(index);
ensureGreen(index);

final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
assertThat(numberOfShards, notNullValue());
final int nbShards = Integer.parseInt(numberOfShards);
assertThat(nbShards, greaterThanOrEqualTo(1));

for (int i = 0; i < nbShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(2));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Predicate;
Expand All @@ -43,7 +48,9 @@
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* In depth testing of the recovery mechanism during a rolling restart.
Expand Down Expand Up @@ -310,4 +317,144 @@ public void testRecoveryWithSoftDeletes() throws Exception {
}
ensureGreen(index);
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
* the replication of closed indices) during the rolling upgrade.
*/
public void testRecoveryClosedIndex() throws Exception {
final String indexName = "closed_index_created_on_old";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

final Version indexVersionCreated = indexVersionCreated(indexName);
if (indexVersionCreated.onOrAfter(Version.V_8_0_0)) {
// index was created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
* time the index was closed.
*/
public void testCloseIndexDuringRollingUpgrade() throws Exception {
final Version minimumNodeVersion = minimumNodeVersion();
final String indexName =
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);

if (indexExists(indexName) == false) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

if (minimumNodeVersion.onOrAfter(Version.V_8_0_0)) {
// index is created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* Returns the version in which the given index has been created
*/
private static Version indexVersionCreated(final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_settings");
final String versionCreatedSetting = indexName + ".settings.index.version.created";
request.addParameter("filter_path", versionCreatedSetting);

final Response response = client().performRequest(request);
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));

for (int i = 0; i < numberOfShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
}
},
"params": {
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "all",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"level": {
"type" : "enum",
"options" : ["cluster","indices","shards"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Sets the number of active shards to wait for before the operation returns."
}
}
},
Expand Down
Loading