Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence numbers commit data for Lucene uses Iterable interface #20793

Merged
merged 10 commits into from
Oct 12, 2016
108 changes: 63 additions & 45 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -59,7 +58,6 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand All @@ -86,8 +84,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;

public class InternalEngine extends Engine {

/**
Expand Down Expand Up @@ -121,6 +117,7 @@ public class InternalEngine extends Engine {
private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -285,7 +282,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr
boolean success = false;
try {
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
success = true;
} finally {
if (success == false) {
Expand All @@ -310,7 +307,7 @@ public Translog getTranslog() {
private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final Map<String, String> commitUserData = writer.getCommitData();
final Map<String, String> commitUserData = commitDataAsMap(writer);
if (commitUserData.containsKey("translog_id")) {
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
Expand All @@ -325,33 +322,26 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

// package private for testing
SeqNoStats loadSeqNoStatsFromCommit() throws IOException {
return loadSeqNoStatsFromCommit(indexWriter);
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
final long maxSeqNo;
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
maxSeqNo = (long) stats.getMaxValue();
} else {
maxSeqNo = NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
maxSeqNo = Long.parseLong(entry.getValue());
}
}

final Map<String, String> commitUserData = writer.getCommitData();

final long localCheckpoint;
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
} else {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
}

final long globalCheckpoint;
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

Expand Down Expand Up @@ -1323,23 +1313,39 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final Map<String, String> commitData = new HashMap<>(5);

commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);

commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));

if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());

writer.setLiveCommitData(() -> {
/**
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committed writer with commit data [{}]", commitData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be "commit_ting_ writer with commit data"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
return commitData.entrySet().iterator();
});

indexWriter.setCommitData(commitData);
writer.commit();
} catch (Exception ex) {
try {
Expand Down Expand Up @@ -1395,7 +1401,8 @@ public MergeStats getMergeStats() {
public SequenceNumbersService seqNoService() {
return seqNoService;
}
@Override

@Override
public DocsStats getDocStats() {
final int numDocs = indexWriter.numDocs();
final int maxDoc = indexWriter.maxDoc();
Expand Down Expand Up @@ -1441,4 +1448,15 @@ boolean indexWriterHasDeletions() {
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}

/**
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
return commitData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public long generateSeqNo() {
return localCheckpointService.generateSeqNo();
}

/**
* Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details.
*/
public long getMaxSeqNo() {
return localCheckpointService.getMaxSeqNo();
}

/**
* marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)}
* more details
Expand Down
Loading