diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 67d3007e9af16..21ae48e9c77d4 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -1278,7 +1278,7 @@ public void testOperationBasedRecovery() throws Exception { } } flush(index, true); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); // less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1)); for (int i = 0; i < uncommittedDocs; i++) { @@ -1288,7 +1288,7 @@ public void testOperationBasedRecovery() throws Exception { } else { ensureGreen(index); assertNoFileBasedRecovery(index, n -> true); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); } } @@ -1313,7 +1313,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { ensureGreen(index); flush(index, true); assertEmptyTranslog(index); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); } } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 429687853e897..080b15268db1d 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -511,28 +511,6 @@ private static Version indexVersionCreated(final String indexName) throws IOExce return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting))); } - /** - * Returns the minimum node version among all nodes of the cluster - */ - private static Version minimumNodeVersion() throws IOException { - final Request request = new Request("GET", "_nodes"); - request.addParameter("filter_path", "nodes.*.version"); - - final Response response = client().performRequest(request); - final Map nodes = ObjectPath.createFromResponse(response).evaluate("nodes"); - - Version minVersion = null; - for (Map.Entry node : nodes.entrySet()) { - @SuppressWarnings("unchecked") - Version nodeVersion = Version.fromString((String) ((Map) node.getValue()).get("version")); - if (minVersion == null || minVersion.after(nodeVersion)) { - minVersion = nodeVersion; - } - } - assertNotNull(minVersion); - return minVersion; - } - /** * Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts * that the index has started shards. @@ -695,7 +673,7 @@ public void testOperationBasedRecovery() throws Exception { ensureGreen(index); indexDocs(index, 0, randomIntBetween(100, 200)); flush(index, randomBoolean()); - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); // uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); } else { @@ -705,9 +683,7 @@ public void testOperationBasedRecovery() throws Exception { || nodeName.startsWith(CLUSTER_NAME + "-0") || (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false)); indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); - if (CLUSTER_TYPE == ClusterType.UPGRADED) { - ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true); - } + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index bfe89a65d89df..aff85f10e4bf8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -895,11 +895,10 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || (indexSettings.isSoftDeleteEnabled() && - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || - (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && - indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN))); + indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index f10f444e918ed..d5da4320ab3ee 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -56,6 +56,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -1132,7 +1133,8 @@ public void assertEmptyTranslog(String index) throws Exception { * Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures * that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies. */ - public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, boolean alwaysExists) throws Exception { + public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception { + boolean mustHavePRRLs = minimumNodeVersion().onOrAfter(Version.V_7_6_0); assertBusy(() -> { Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards"))); @SuppressWarnings("unchecked") Map>> shards = @@ -1140,10 +1142,11 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool for (List> shard : shards.values()) { for (Map copy : shard) { Integer globalCheckpoint = (Integer) XContentMapValues.extractValue("seq_no.global_checkpoint", copy); + assertThat(XContentMapValues.extractValue("seq_no.max_seq_no", copy), equalTo(globalCheckpoint)); assertNotNull(globalCheckpoint); @SuppressWarnings("unchecked") List> retentionLeases = (List>) XContentMapValues.extractValue("retention_leases.leases", copy); - if (alwaysExists == false && retentionLeases == null) { + if (mustHavePRRLs == false && retentionLeases == null) { continue; } assertNotNull(retentionLeases); @@ -1152,7 +1155,7 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1)); } } - if (alwaysExists) { + if (mustHavePRRLs) { List existingLeaseIds = retentionLeases.stream().map(lease -> (String) lease.get("id")) .collect(Collectors.toList()); List expectedLeaseIds = shard.stream() @@ -1165,4 +1168,26 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool } }, 60, TimeUnit.SECONDS); } + + /** + * Returns the minimum node version among all nodes of the cluster + */ + protected static Version minimumNodeVersion() throws IOException { + final Request request = new Request("GET", "_nodes"); + request.addParameter("filter_path", "nodes.*.version"); + + final Response response = client().performRequest(request); + final Map nodes = ObjectPath.createFromResponse(response).evaluate("nodes"); + + Version minVersion = null; + for (Map.Entry node : nodes.entrySet()) { + @SuppressWarnings("unchecked") + Version nodeVersion = Version.fromString((String) ((Map) node.getValue()).get("version")); + if (minVersion == null || minVersion.after(nodeVersion)) { + minVersion = nodeVersion; + } + } + assertNotNull(minVersion); + return minVersion; + } }