Skip to content

Commit

Permalink
Migrate translog generation rolling to index shard
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Mar 22, 2017
1 parent 7480faf commit dd14ba8
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand All @@ -46,7 +42,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.core.pattern.ConverterKeys;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -302,15 +297,21 @@ private void maybeFinish() {
}

void run() {
// we either respond immediately ie. if we we don't fsync per request or wait for refresh
// OR we got an pass async operations on and wait for them to return to respond.
indexShard.maybeFlush();
maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success.
/*
* We either respond immediately (i.e., if we do not fsync per request or wait for
* refresh), or we there are past async operations and we wait for them to return to
* respond.
*/
indexShard.maybeFlushOrRollTranslogGeneration();
// decrement pending by one, if there is nothing else to do we just respond with success
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request);
logger.warn(
"block until refresh ran out of slots and forced a refresh: [{}]",
request);
}
refreshed.set(forcedRefresh);
maybeFinish();
Expand Down
110 changes: 81 additions & 29 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException

}

private void rollTranslogGeneration() throws IOException {
final Engine engine = getEngine();
engine.getTranslog().rollGeneration();
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
verifyActive();
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -1256,17 +1261,39 @@ public boolean restoreFromRepository(Repository repository) {
}

/**
* Returns <code>true</code> iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
* Otherwise <code>false</code>.
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
boolean shouldFlush() {
Engine engine = getEngineOrNull();
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
final Translog translog = engine.getTranslog();
return translog.shouldFlush();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
}

/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
boolean shouldRollTranslogGeneration() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldRollGeneration();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
Expand Down Expand Up @@ -1810,28 +1837,31 @@ public Translog.Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
*
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
* Schedules a flush or translog generation roll if needed but will not schedule more than one
* concurrently. The operation will be executed asynchronously on the flush thread pool.
*/
public boolean maybeFlush() {
if (shouldFlush()) {
if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread
if (shouldFlush() == false) {
// we have to check again since otherwise there is a race when a thread passes
// the first shouldFlush() check next to another thread which flushes fast enough
// to finish before the current thread could flip the asyncFlushRunning flag.
// in that situation we have an extra unexpected flush.
asyncFlushRunning.compareAndSet(true, false);
} else {
public void maybeFlushOrRollTranslogGeneration() {
if (shouldFlush() || shouldRollTranslogGeneration()) {
if (flushOrRollRunning.compareAndSet(false, true)) {
/*
* We have to check again since otherwise there is a race when a thread passes the
* first check next to another thread which performs the operation quickly enough to
* finish before the current thread could flip the flag. In that situation, we have
* an extra operation.
*
* Additionally, a flush implicitly executes a translog generation roll so if we
* execute a flush then we do not need to check if we should roll the translog
* generation.
*/
if (shouldFlush()) {
logger.debug("submitting async flush request");
final AbstractRunnable abstractRunnable = new AbstractRunnable() {
final AbstractRunnable flush = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
}
Expand All @@ -1844,16 +1874,38 @@ protected void doRun() throws Exception {

@Override
public void onAfter() {
asyncFlushRunning.compareAndSet(true, false);
maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true
flushOrRollRunning.compareAndSet(true, false);
maybeFlushOrRollTranslogGeneration();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
} else if (shouldRollTranslogGeneration()) {
logger.debug("submitting async roll translog generation request");
final AbstractRunnable roll = new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to roll translog generation", e);
}
}

@Override
protected void doRun() throws Exception {
rollTranslogGeneration();
}

@Override
public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
maybeFlushOrRollTranslogGeneration();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
return true;
threadPool.executor(ThreadPool.Names.FETCH_SHARD_STARTED).execute(roll);
} else {
flushOrRollRunning.compareAndSet(true, false);
}
}
}
return false;
}

/**
Expand Down
86 changes: 39 additions & 47 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -420,31 +421,10 @@ public Location add(final Operation operation) throws IOException {
out.writeInt(operationSize);
out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes();
final Location location;
final boolean shouldRollGeneration;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
location = current.add(bytes, operation.seqNo());
// check if we should roll under the read lock
shouldRollGeneration =
shouldRollGeneration() && rollingGeneration.compareAndSet(false, true);
return current.add(bytes, operation.seqNo());
}
if (shouldRollGeneration) {
try (ReleasableLock ignored = writeLock.acquire()) {
/*
* We have to check the condition again lest we could roll twice if another
* thread committed the translog (which rolls the generation) between us
* releasing the read lock and acquiring the write lock.
*/
if (shouldRollGeneration()) {
this.rollGeneration();
}
} finally {
final boolean wasRolling = rollingGeneration.getAndSet(false);
assert wasRolling;
}
}
return location;
} catch (final AlreadyClosedException | IOException ex) {
try {
closeOnTragicEvent(ex);
Expand All @@ -465,13 +445,24 @@ public Location add(final Operation operation) throws IOException {
}

/**
* Tests whether or not the current generation of the translog should be rolled into a new
* generation. This test is based on the size of the current generation compared to the
* configured generation threshold size.
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
public boolean shouldFlush() {
final long size = this.sizeInBytes();
return size > this.indexSettings.getFlushThresholdSize().getBytes();
}

/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled into a new generation
* @return {@code true} if the current generation should be rolled to a new generation
*/
private boolean shouldRollGeneration() {
public boolean shouldRollGeneration() {
final long size = this.current.sizeInBytes();
final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes();
return size > threshold;
Expand Down Expand Up @@ -1359,28 +1350,29 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl

/**
* Roll the current translog generation into a new generation. This does not commit the
* translog. The translog write lock must be held by the current thread.
* translog.
*
* @throws IOException if an I/O exception occurred during any file operations
*/
void rollGeneration() throws IOException {
assert writeLock.isHeldByCurrentThread() : "translog write lock not held by current thread";
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(this); // tragic event
throw e;
public void rollGeneration() throws IOException {
try (Releasable ignored = writeLock.acquire()) {
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(this); // tragic event
throw e;
}
}
}

Expand Down Expand Up @@ -1428,7 +1420,7 @@ void trimUnreferencedReaders() {
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() <= finalMinReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
Expand Down
Loading

0 comments on commit dd14ba8

Please sign in to comment.