From cd9ffbaa2d5a6d402fdf318665f78b17cef84087 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 17 Sep 2017 15:13:53 +0300 Subject: [PATCH 1/8] initial version --- .../index/engine/EngineConfig.java | 16 ++++- .../index/engine/InternalEngine.java | 67 ++++++++++--------- .../elasticsearch/index/shard/IndexShard.java | 16 ++++- .../index/shard/StoreRecovery.java | 5 +- .../index/engine/InternalEngineTests.java | 63 ++++++++++++++--- .../index/shard/RefreshListenersTests.java | 2 +- .../SharedClusterSnapshotRestoreIT.java | 34 +++++++++- 7 files changed, 153 insertions(+), 50 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 66911ab80c723..fbc87f2279b3d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -71,6 +71,7 @@ public final class EngineConfig { private final List refreshListeners; @Nullable private final Sort indexSort; + private final boolean forceNewHistoryUUID; private final TranslogRecoveryRunner translogRecoveryRunner; /** @@ -115,8 +116,9 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, - Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) { + boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, + List refreshListeners, Sort indexSort, + TranslogRecoveryRunner translogRecoveryRunner) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -141,6 +143,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr this.translogConfig = translogConfig; this.flushMergesAfter = flushMergesAfter; this.openMode = openMode; + this.forceNewHistoryUUID = forceNewHistoryUUID; this.refreshListeners = refreshListeners; this.indexSort = indexSort; this.translogRecoveryRunner = translogRecoveryRunner; @@ -300,6 +303,15 @@ public OpenMode getOpenMode() { return openMode; } + + /** + * Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing + * one is found. + */ + public boolean getForceNewHistoryUUID() { + return forceNewHistoryUUID; + } + @FunctionalInterface public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d7cf3e16069e1..c0887ab2742b9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -177,23 +177,15 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { switch (openMode) { case OPEN_INDEX_AND_TRANSLOG: writer = createWriter(false); - String existingHistoryUUID = loadHistoryUUIDFromCommit(writer); - if (existingHistoryUUID == null) { - historyUUID = UUIDs.randomBase64UUID(); - } else { - historyUUID = existingHistoryUUID; - } final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); seqNoStats = store.loadSeqNoStats(globalCheckpoint); break; case OPEN_INDEX_CREATE_TRANSLOG: writer = createWriter(false); - historyUUID = loadHistoryUUIDFromCommit(writer); seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); break; case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); - historyUUID = UUIDs.randomBase64UUID(); seqNoStats = new SeqNoStats( SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, @@ -205,9 +197,13 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); + historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); + Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; + this.translog = translog; + updateWriterOnOpen(); } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -219,8 +215,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { throw e; } } - - this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); @@ -375,24 +369,32 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Tra throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); - if (translogUUID == null) { - assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " - + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - boolean success = false; - try { - commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - ? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null); - success = true; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(translog); - } - } + return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); + } + + /** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */ + private void updateWriterOnOpen() throws IOException { + Objects.requireNonNull(historyUUID); + final Map commitUserData = commitDataAsMap(indexWriter); + boolean needsCommit = false; + if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) { + needsCommit = true; + } else { + assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change"; + assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid"; + } + if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { + needsCommit = true; + } else { + assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode; + } + if (needsCommit) { + commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG + ? commitUserData.get(SYNC_COMMIT_ID) : null); } - return translog; } + @Override public Translog getTranslog() { ensureOpen(); @@ -424,14 +426,17 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException } /** - * Reads the current stored history ID from the IW commit data. If the id is not found, returns null. + * Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced. */ - @Nullable - private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException { + private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException { String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); - if (uuid == null) { - assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : - "index was created after 6_0_0_rc1 but has no history uuid"; + if (uuid == null || forceNew) { + assert + uuid != null || + openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || + config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : + "existing index was created after 6_0_0_rc1 but has no history uuid"; + uuid = UUIDs.randomBase64UUID(); } return uuid; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dd47be5a141dd..2f8262c656e25 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2080,10 +2080,24 @@ private DocumentMapperForType docMapper(String type) { private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { Sort indexSort = indexSortSupplier.get(); + final boolean forceNewHistoryUUID; + switch (shardRouting.recoverySource().getType()) { + case EMPTY_STORE: + case EXISTING_STORE: + case PEER: + forceNewHistoryUUID = false; + break; + case SNAPSHOT: + case LOCAL_SHARDS: + forceNewHistoryUUID = true; + break; + default: + throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]"); + } return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, - indexCache.query(), cachingPolicy, translogConfig, + indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, this::runTranslogRecovery); diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 63b7bc0805581..e5053fc7882e0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -35,12 +35,10 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; @@ -164,11 +162,10 @@ void addIndices( * document-level semantics. */ writer.setLiveCommitData(() -> { - final HashMap liveCommitData = new HashMap<>(4); + final HashMap liveCommitData = new HashMap<>(3); liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp)); - liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); return liveCommitData.entrySet().iterator(); }); writer.commit(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0ea47392d5c21..5da108eff70cc 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -82,7 +81,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -271,8 +269,8 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), - config.getTranslogRecoveryRunner()); + config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), + config.getIndexSort(), config.getTranslogRecoveryRunner()); } @Override @@ -453,7 +451,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); return config; @@ -2797,8 +2795,8 @@ public void testRecoverFromForeignTranslog() throws IOException { EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(), threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(), - null, config.getTranslogRecoveryRunner()); + IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2810,7 +2808,7 @@ public void testRecoverFromForeignTranslog() throws IOException { assertVisibleCount(engine, numDocs, false); } - public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException { + public void testHistoryUUIDIsSetIfMissing() throws IOException { final int numDocs = randomIntBetween(0, 3); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); @@ -2843,11 +2841,56 @@ public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException { .put(defaultSettings.getSettings()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) .build()); - engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null); - assertVisibleCount(engine, numDocs, false); + + EngineConfig config = engine.config(); + + EngineConfig newConfig = new EngineConfig( + randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + shardId, allocationId.getId(), + threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + engine = new InternalEngine(newConfig); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + } else { + assertVisibleCount(engine, 0, false); + } assertThat(engine.getHistoryUUID(), notNullValue()); } + public void testHistoryUUIDCanBeForced() throws IOException { + final int numDocs = randomIntBetween(0, 3); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.IndexResult index = engine.index(firstIndexRequest); + assertThat(index.getVersion(), equalTo(1L)); + } + assertVisibleCount(engine, numDocs); + final String oldHistoryUUID = engine.getHistoryUUID(); + engine.close(); + EngineConfig config = engine.config(); + + EngineConfig newConfig = new EngineConfig( + randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + shardId, allocationId.getId(), + threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + engine = new InternalEngine(newConfig); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + } else { + assertVisibleCount(engine, 0, false); + } + assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); + } + public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { AtomicReference exception = new AtomicReference<>(); String operation = randomFrom("optimize", "refresh", "flush"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 01893a99ae4e3..1f24d0b079dd1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -119,7 +119,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { }; EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), - eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 601ca1b8210d3..e2edb33fafa3f 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.ingest.DeletePipelineRequest; @@ -66,6 +67,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; @@ -119,6 +121,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; @@ -170,8 +173,23 @@ public void testBasicWorkFlow() throws Exception { flushResponseFuture = client.admin().indices().prepareFlush(indices).execute(); } } + + final String[] indicesToSnapshot = {"test-idx-*", "-test-idx-3"}; + + logger.info("--> capturing history UUIDs"); + final Map historyUUIDs = new HashMap<>(); + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + if (historyUUIDs.containsKey(shardId)) { + assertThat(shardStats.getShardRouting() + " has a different history uuid", historyUUID, equalTo(historyUUIDs.get(shardId))); + } else { + historyUUIDs.put(shardId, historyUUID); + } + } + logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices(indicesToSnapshot).get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); @@ -211,6 +229,13 @@ public void testBasicWorkFlow() throws Exception { assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 50L); } + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + assertThat(shardStats.getShardRouting() + " doesn't have a history uuid", historyUUID, notNullValue()); + assertThat(shardStats.getShardRouting() + " doesn't have a new history", historyUUID, not(equalTo(historyUUIDs.get(shardId)))); + } + // Test restore after index deletion logger.info("--> delete indices"); cluster().wipeIndices("test-idx-1", "test-idx-2"); @@ -226,6 +251,13 @@ public void testBasicWorkFlow() throws Exception { assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + assertThat(shardStats.getShardRouting() + " doesn't have a history uuid", historyUUID, notNullValue()); + assertThat(shardStats.getShardRouting() + " doesn't have a new history", historyUUID, not(equalTo(historyUUIDs.get(shardId)))); + } + if (flushResponseFuture != null) { // Finish flush flushResponseFuture.actionGet(); From ab13a767f9bcafd89f6bcb5c575499a036d2b8b3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 18 Sep 2017 10:52:54 +0200 Subject: [PATCH 2/8] better assertions for non existing uuids --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c0887ab2742b9..665f73fed5d09 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -432,7 +432,7 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); if (uuid == null || forceNew) { assert - uuid != null || + forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : "existing index was created after 6_0_0_rc1 but has no history uuid"; From da5f84853cc4d4234bcae190228c2907bfc080c1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 18 Sep 2017 11:36:58 +0200 Subject: [PATCH 3/8] add recovery IT --- .../elasticsearch/upgrades/RecoveryIT.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java new file mode 100644 index 0000000000000..ec1be6d486ee3 --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; + +public class RecoveryIT extends ESRestTestCase { + + private enum CLUSTER_TYPE { + OLD, + MIXED, + UPGRADED; + + public static CLUSTER_TYPE parse(String value) { + switch (value) { + case "old_cluster": + return OLD; + case "mixed_cluster": + return MIXED; + case "upgraded_cluster": + return UPGRADED; + default: + throw new AssertionError("unknown cluster type: " + value); + } + } + } + + private final CLUSTER_TYPE clusterType = CLUSTER_TYPE.parse(System.getProperty("tests.rest.suite")); + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } + + private void ensureGreen() throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }", ContentType.APPLICATION_JSON))); + } + + + public void testHistoryUUIDIsGenerated() throws Exception { + final String index = "index_history_uuid"; + if (clusterType == CLUSTER_TYPE.OLD) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1); + createIndex(index, settings.build()); + + } else if (clusterType == CLUSTER_TYPE.UPGRADED) { + ensureGreen(); + Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); + assertOK(response); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + List shardStats = objectPath.evaluate(index + ".shards.0"); + assertThat(shardStats, hasSize(2)); + String expectHistoryUUID = null; + for (int shard = 0; shard < 2; shard++) { + String nodeID = objectPath.evaluate(index + ".shard." + shard + ".routing.node"); + String historyUUID = objectPath.evaluate(index + ".shard." + shard + ".commit.user_data.history_uuid"); + assertThat("no history uuid found for shard on " + nodeID, historyUUID, notNullValue()); + if (expectHistoryUUID == null) { + expectHistoryUUID = historyUUID; + } else { + assertThat("different history uuid found for shard on " + nodeID, historyUUID, equalTo(expectHistoryUUID)); + } + } + } + } + +} From 171aa4566dcc1a5e4ffa6c442a8e3b8dcf406c16 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 18 Sep 2017 14:48:33 +0200 Subject: [PATCH 4/8] preserve indices --- .../test/java/org/elasticsearch/upgrades/RecoveryIT.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index ec1be6d486ee3..612400b201e02 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -40,6 +40,11 @@ public class RecoveryIT extends ESRestTestCase { + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + private enum CLUSTER_TYPE { OLD, MIXED, @@ -85,7 +90,6 @@ public void testHistoryUUIDIsGenerated() throws Exception { .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1); createIndex(index, settings.build()); - } else if (clusterType == CLUSTER_TYPE.UPGRADED) { ensureGreen(); Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); From efdf3f623d912983360b0c5d847d5e804c7c5e96 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 18 Sep 2017 16:10:40 +0200 Subject: [PATCH 5/8] fix paths --- .../test/java/org/elasticsearch/upgrades/RecoveryIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 612400b201e02..ee52069ac4aaf 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -95,12 +95,12 @@ public void testHistoryUUIDIsGenerated() throws Exception { Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); assertOK(response); ObjectPath objectPath = ObjectPath.createFromResponse(response); - List shardStats = objectPath.evaluate(index + ".shards.0"); + List shardStats = objectPath.evaluate("indices." + index + ".shards.0"); assertThat(shardStats, hasSize(2)); String expectHistoryUUID = null; for (int shard = 0; shard < 2; shard++) { - String nodeID = objectPath.evaluate(index + ".shard." + shard + ".routing.node"); - String historyUUID = objectPath.evaluate(index + ".shard." + shard + ".commit.user_data.history_uuid"); + String nodeID = objectPath.evaluate("indices." + index + ".shards.0." + shard + ".routing.node"); + String historyUUID = objectPath.evaluate("indices." + index + ".shards.0." + shard + ".commit.user_data.history_uuid"); assertThat("no history uuid found for shard on " + nodeID, historyUUID, notNullValue()); if (expectHistoryUUID == null) { expectHistoryUUID = historyUUID; From f5cfbdcc119a9f289b6a5a5214ef451f1425bc9b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 19 Sep 2017 11:40:20 +0200 Subject: [PATCH 6/8] history uuid is never null. --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 665f73fed5d09..3655a2096ddd4 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1928,9 +1928,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); - if (historyUUID != null) { - commitData.put(HISTORY_UUID_KEY, historyUUID); - } + commitData.put(HISTORY_UUID_KEY, historyUUID); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); From 3c363f0331f825e671f600339e2fe19cb0a77997 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 19 Sep 2017 11:49:54 +0200 Subject: [PATCH 7/8] EMPTY_STORE to forceGenerating a new history uuid --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2f8262c656e25..77c343d32341b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2082,11 +2082,11 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { Sort indexSort = indexSortSupplier.get(); final boolean forceNewHistoryUUID; switch (shardRouting.recoverySource().getType()) { - case EMPTY_STORE: case EXISTING_STORE: case PEER: forceNewHistoryUUID = false; break; + case EMPTY_STORE: case SNAPSHOT: case LOCAL_SHARDS: forceNewHistoryUUID = true; From 431ceeff2c6ee2eaf9be2de8098410553ffffeac Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 19 Sep 2017 14:09:44 +0200 Subject: [PATCH 8/8] RecoveryIT should preserveReposUponCompletion (to be a good citizen and not delete people's stuff) --- .../src/test/java/org/elasticsearch/upgrades/RecoveryIT.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index ee52069ac4aaf..26a192cce9e38 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -45,6 +45,11 @@ protected boolean preserveIndicesUponCompletion() { return true; } + @Override + protected boolean preserveReposUponCompletion() { + return true; + } + private enum CLUSTER_TYPE { OLD, MIXED,