diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3655a2096ddd4..ae1de7e3663be 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -202,8 +202,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { indexWriter = writer; translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; - this.translog = translog; - updateWriterOnOpen(); + // we can only do this after we generated and committed a translog uuid. other wise the combined + // retention policy, which listens to commits, gets all confused. + persistHistoryUUIDIfNeeded(); } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -215,6 +216,8 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { throw e; } } + + this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); @@ -369,28 +372,38 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Tra throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); + final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); + if (translogUUID == null) { + assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " + + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; + boolean success = false; + try { + commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG + ? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null); + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(translog); + } + } + } + return translog; } /** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */ - private void updateWriterOnOpen() throws IOException { + private void persistHistoryUUIDIfNeeded() throws IOException { Objects.requireNonNull(historyUUID); final Map commitUserData = commitDataAsMap(indexWriter); - boolean needsCommit = false; if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) { - needsCommit = true; + assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : + "if the translog was created, history should have been committed"; + assert commitUserData.containsKey(HISTORY_UUID_KEY) == false || config().getForceNewHistoryUUID(); + Map newData = new HashMap<>(commitUserData); + newData.put(HISTORY_UUID_KEY, historyUUID); + commitIndexWriter(indexWriter, newData.entrySet()); } else { - assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change"; - assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid"; - } - if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { - needsCommit = true; - } else { - assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode; - } - if (needsCommit) { - commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - ? commitUserData.get(SYNC_COMMIT_ID) : null); + assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || config().getForceNewHistoryUUID() == false : + "history uuid is already committed, but the translog uuid isn't committed and a new history id was generated"; } } @@ -1901,15 +1914,13 @@ protected void doRun() throws Exception { * @throws IOException if an I/O exception occurs committing the specfied writer */ protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { - ensureCanFlush(); - try { final long localCheckpoint = seqNoService().getLocalCheckpoint(); final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); final String translogUUID = translogGeneration.translogUUID; final String localCheckpointValue = Long.toString(localCheckpoint); - writer.setLiveCommitData(() -> { + final Iterable> commitIterable = () -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want @@ -1931,9 +1942,25 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(HISTORY_UUID_KEY, historyUUID); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); - }); + }; + commitIndexWriter(writer, commitIterable); + } + private void commitIndexWriter(IndexWriter writer, Iterable> userData) throws IOException { + try { + ensureCanFlush(); + writer.setLiveCommitData(userData); writer.commit(); + // assert we don't loose key entries + assert commitDataAsMap(writer).containsKey(Translog.TRANSLOG_UUID_KEY) : "commit misses translog uuid"; + assert commitDataAsMap(writer).containsKey(Translog.TRANSLOG_GENERATION_KEY) : "commit misses translog generation"; + assert commitDataAsMap(writer).containsKey(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : "commit misses max unsafe timestamp"; + assert commitDataAsMap(writer).containsKey(HISTORY_UUID_KEY) : "commit misses a history uuid"; + assert commitDataAsMap(writer).containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) || + config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1): "commit misses local checkpoint"; + assert commitDataAsMap(writer).containsKey(SequenceNumbers.MAX_SEQ_NO) || + config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "commit misses max seq no"; + } catch (final Exception ex) { try { failEngine("lucene commit failed", ex);