Skip to content

Commit

Permalink
Generating and committing a history_uuid on existing old indices dest…
Browse files Browse the repository at this point in the history
…roys translog recovery info (#26734)

This is a bug introduced in #26694 . The issue comes from the attempt to share code that commits the new history uuid and/or a new translog uuid. This goes wrong an existing 5.6 index that is recovered from store:

1) A new history uuid is generated as it doesn't exist in the index
2) The translog is opened and it's uuid doesn't change.
3) The committing the new history uuid used the standard commitIndexWriter method.
4) The latter asks the translog for the oldest file generation that is required to recover from the local checkpoint + 1
5) The local checkpoint on old indices is -1 (until something is indexed) and the translog is asked to recover from position 0. That excludes any operations the translog that do not have a seq no assigned, causing the FullClusterRestart bwc tests to fail.

To bypass this commit moves away from the attempt to share the committing code between a new translog uuid and a new history uuid. Instead we do what we did before and open the translog with a potential commit. Afterwards we commit the history uuid if needed. This comes with the expense of opening up the method to commit an index writer in the engine.
  • Loading branch information
bleskes committed Sep 21, 2017
1 parent 8c5e06c commit 07b0c26
Showing 1 changed file with 48 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
assert translog.getGeneration() != null;
this.translog = translog;
updateWriterOnOpen();
// we can only do this after we generated and committed a translog uuid. other wise the combined
// retention policy, which listens to commits, gets all confused.
persistHistoryUUIDIfNeeded();
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} catch (AssertionError e) {
Expand All @@ -215,6 +216,8 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw e;
}
}

this.translog = translog;
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
Expand Down Expand Up @@ -369,28 +372,38 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Tra
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
if (translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
boolean success = false;
try {
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(translog);
}
}
}
return translog;
}

/** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */
private void updateWriterOnOpen() throws IOException {
private void persistHistoryUUIDIfNeeded() throws IOException {
Objects.requireNonNull(historyUUID);
final Map<String, String> commitUserData = commitDataAsMap(indexWriter);
boolean needsCommit = false;
if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) {
needsCommit = true;
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG :
"if the translog was created, history should have been committed";
assert commitUserData.containsKey(HISTORY_UUID_KEY) == false || config().getForceNewHistoryUUID();
Map<String, String> newData = new HashMap<>(commitUserData);
newData.put(HISTORY_UUID_KEY, historyUUID);
commitIndexWriter(indexWriter, newData.entrySet());
} else {
assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change";
assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid";
}
if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode;
}
if (needsCommit) {
commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? commitUserData.get(SYNC_COMMIT_ID) : null);
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || config().getForceNewHistoryUUID() == false :
"history uuid is already committed, but the translog uuid isn't committed and a new history id was generated";
}
}

Expand Down Expand Up @@ -1901,15 +1914,13 @@ protected void doRun() throws Exception {
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpointValue = Long.toString(localCheckpoint);

writer.setLiveCommitData(() -> {
final Iterable<Map.Entry<String, String>> commitIterable = () -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
Expand All @@ -1931,9 +1942,25 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(HISTORY_UUID_KEY, historyUUID);
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
};
commitIndexWriter(writer, commitIterable);
}

private void commitIndexWriter(IndexWriter writer, Iterable<Map.Entry<String, String>> userData) throws IOException {
try {
ensureCanFlush();
writer.setLiveCommitData(userData);
writer.commit();
// assert we don't loose key entries
assert commitDataAsMap(writer).containsKey(Translog.TRANSLOG_UUID_KEY) : "commit misses translog uuid";
assert commitDataAsMap(writer).containsKey(Translog.TRANSLOG_GENERATION_KEY) : "commit misses translog generation";
assert commitDataAsMap(writer).containsKey(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : "commit misses max unsafe timestamp";
assert commitDataAsMap(writer).containsKey(HISTORY_UUID_KEY) : "commit misses a history uuid";
assert commitDataAsMap(writer).containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ||
config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1): "commit misses local checkpoint";
assert commitDataAsMap(writer).containsKey(SequenceNumbers.MAX_SEQ_NO) ||
config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "commit misses max seq no";

} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
Expand Down

0 comments on commit 07b0c26

Please sign in to comment.