diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
index 10f8741ecccb6..6cd3e5a7b3a03 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
@@ -22,16 +22,12 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
-import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@@ -46,7 +42,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
-import org.apache.logging.log4j.core.pattern.ConverterKeys;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -302,15 +297,21 @@ private void maybeFinish() {
}
void run() {
- // we either respond immediately ie. if we we don't fsync per request or wait for refresh
- // OR we got an pass async operations on and wait for them to return to respond.
- indexShard.maybeFlush();
- maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success.
+ /*
+ * We either respond immediately (i.e., if we do not fsync per request or wait for
+ * refresh), or we there are past async operations and we wait for them to return to
+ * respond.
+ */
+ indexShard.maybeFlushOrRollTranslogGeneration();
+ // decrement pending by one, if there is nothing else to do we just respond with success
+ maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
- logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request);
+ logger.warn(
+ "block until refresh ran out of slots and forced a refresh: [{}]",
+ request);
}
refreshed.set(forcedRefresh);
maybeFinish();
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 0e6054deccd0f..f6b7b3a73ac91 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -794,6 +794,11 @@ public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException
}
+ private void rollTranslogGeneration() throws IOException {
+ final Engine engine = getEngine();
+ engine.getTranslog().rollGeneration();
+ }
+
public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
verifyActive();
if (logger.isTraceEnabled()) {
@@ -1256,17 +1261,39 @@ public boolean restoreFromRepository(Repository repository) {
}
/**
- * Returns true
iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
- * Otherwise false
.
+ * Tests whether or not the translog should be flushed. This test is based on the current size
+ * of the translog comparted to the configured flush threshold size.
+ *
+ * @return {@code true} if the translog should be flushed
*/
boolean shouldFlush() {
- Engine engine = getEngineOrNull();
+ final Engine engine = getEngineOrNull();
if (engine != null) {
try {
- Translog translog = engine.getTranslog();
- return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
- } catch (AlreadyClosedException ex) {
- // that's fine we are already close - no need to flush
+ final Translog translog = engine.getTranslog();
+ return translog.shouldFlush();
+ } catch (final AlreadyClosedException e) {
+ // we are already closed, no need to flush or roll
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Tests whether or not the translog generation should be rolled to a new generation. This test
+ * is based on the size of the current generation compared to the configured generation
+ * threshold size.
+ *
+ * @return {@code true} if the current generation should be rolled to a new generation
+ */
+ boolean shouldRollTranslogGeneration() {
+ final Engine engine = getEngineOrNull();
+ if (engine != null) {
+ try {
+ final Translog translog = engine.getTranslog();
+ return translog.shouldRollGeneration();
+ } catch (final AlreadyClosedException e) {
+ // we are already closed, no need to flush or roll
}
}
return false;
@@ -1810,28 +1837,31 @@ public Translog.Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}
- private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
+ // we can not protect with a lock since we "release" on a different thread
+ private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();
/**
- * Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
- * Flush thread-pool asynchronously.
- *
- * @return true
if a new flush is scheduled otherwise false
.
+ * Schedules a flush or translog generation roll if needed but will not schedule more than one
+ * concurrently. The operation will be executed asynchronously on the flush thread pool.
*/
- public boolean maybeFlush() {
- if (shouldFlush()) {
- if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread
- if (shouldFlush() == false) {
- // we have to check again since otherwise there is a race when a thread passes
- // the first shouldFlush() check next to another thread which flushes fast enough
- // to finish before the current thread could flip the asyncFlushRunning flag.
- // in that situation we have an extra unexpected flush.
- asyncFlushRunning.compareAndSet(true, false);
- } else {
+ public void maybeFlushOrRollTranslogGeneration() {
+ if (shouldFlush() || shouldRollTranslogGeneration()) {
+ if (flushOrRollRunning.compareAndSet(false, true)) {
+ /*
+ * We have to check again since otherwise there is a race when a thread passes the
+ * first check next to another thread which performs the operation quickly enough to
+ * finish before the current thread could flip the flag. In that situation, we have
+ * an extra operation.
+ *
+ * Additionally, a flush implicitly executes a translog generation roll so if we
+ * execute a flush then we do not need to check if we should roll the translog
+ * generation.
+ */
+ if (shouldFlush()) {
logger.debug("submitting async flush request");
- final AbstractRunnable abstractRunnable = new AbstractRunnable() {
+ final AbstractRunnable flush = new AbstractRunnable() {
@Override
- public void onFailure(Exception e) {
+ public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
}
@@ -1844,16 +1874,38 @@ protected void doRun() throws Exception {
@Override
public void onAfter() {
- asyncFlushRunning.compareAndSet(true, false);
- maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true
+ flushOrRollRunning.compareAndSet(true, false);
+ maybeFlushOrRollTranslogGeneration();
+ }
+ };
+ threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
+ } else if (shouldRollTranslogGeneration()) {
+ logger.debug("submitting async roll translog generation request");
+ final AbstractRunnable roll = new AbstractRunnable() {
+ @Override
+ public void onFailure(final Exception e) {
+ if (state != IndexShardState.CLOSED) {
+ logger.warn("failed to roll translog generation", e);
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ rollTranslogGeneration();
+ }
+
+ @Override
+ public void onAfter() {
+ flushOrRollRunning.compareAndSet(true, false);
+ maybeFlushOrRollTranslogGeneration();
}
};
- threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
- return true;
+ threadPool.executor(ThreadPool.Names.FETCH_SHARD_STARTED).execute(roll);
+ } else {
+ flushOrRollRunning.compareAndSet(true, false);
}
}
}
- return false;
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 065ddc4740a9c..1a2e44d07eeab 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -34,6 +34,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.util.BigArrays;
@@ -420,31 +421,10 @@ public Location add(final Operation operation) throws IOException {
out.writeInt(operationSize);
out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes();
- final Location location;
- final boolean shouldRollGeneration;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
- location = current.add(bytes, operation.seqNo());
- // check if we should roll under the read lock
- shouldRollGeneration =
- shouldRollGeneration() && rollingGeneration.compareAndSet(false, true);
+ return current.add(bytes, operation.seqNo());
}
- if (shouldRollGeneration) {
- try (ReleasableLock ignored = writeLock.acquire()) {
- /*
- * We have to check the condition again lest we could roll twice if another
- * thread committed the translog (which rolls the generation) between us
- * releasing the read lock and acquiring the write lock.
- */
- if (shouldRollGeneration()) {
- this.rollGeneration();
- }
- } finally {
- final boolean wasRolling = rollingGeneration.getAndSet(false);
- assert wasRolling;
- }
- }
- return location;
} catch (final AlreadyClosedException | IOException ex) {
try {
closeOnTragicEvent(ex);
@@ -465,13 +445,24 @@ public Location add(final Operation operation) throws IOException {
}
/**
- * Tests whether or not the current generation of the translog should be rolled into a new
- * generation. This test is based on the size of the current generation compared to the
- * configured generation threshold size.
+ * Tests whether or not the translog should be flushed. This test is based on the current size
+ * of the translog comparted to the configured flush threshold size.
+ *
+ * @return {@code true} if the translog should be flushed
+ */
+ public boolean shouldFlush() {
+ final long size = this.sizeInBytes();
+ return size > this.indexSettings.getFlushThresholdSize().getBytes();
+ }
+
+ /**
+ * Tests whether or not the translog generation should be rolled to a new generation. This test
+ * is based on the size of the current generation compared to the configured generation
+ * threshold size.
*
- * @return {@code true} if the current generation should be rolled into a new generation
+ * @return {@code true} if the current generation should be rolled to a new generation
*/
- private boolean shouldRollGeneration() {
+ public boolean shouldRollGeneration() {
final long size = this.current.sizeInBytes();
final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes();
return size > threshold;
@@ -1359,28 +1350,29 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
/**
* Roll the current translog generation into a new generation. This does not commit the
- * translog. The translog write lock must be held by the current thread.
+ * translog.
*
* @throws IOException if an I/O exception occurred during any file operations
*/
- void rollGeneration() throws IOException {
- assert writeLock.isHeldByCurrentThread() : "translog write lock not held by current thread";
- try {
- final TranslogReader reader = current.closeIntoReader();
- readers.add(reader);
- final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
- assert Checkpoint.read(checkpoint).generation == current.getGeneration();
- final Path generationCheckpoint =
- location.resolve(getCommitCheckpointFileName(current.getGeneration()));
- Files.copy(checkpoint, generationCheckpoint);
- IOUtils.fsync(generationCheckpoint, false);
- IOUtils.fsync(generationCheckpoint.getParent(), true);
- // create a new translog file; this will sync it and update the checkpoint data;
- current = createWriter(current.getGeneration() + 1);
- logger.trace("current translog set to [{}]", current.getGeneration());
- } catch (final Exception e) {
- IOUtils.closeWhileHandlingException(this); // tragic event
- throw e;
+ public void rollGeneration() throws IOException {
+ try (Releasable ignored = writeLock.acquire()) {
+ try {
+ final TranslogReader reader = current.closeIntoReader();
+ readers.add(reader);
+ final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
+ assert Checkpoint.read(checkpoint).generation == current.getGeneration();
+ final Path generationCheckpoint =
+ location.resolve(getCommitCheckpointFileName(current.getGeneration()));
+ Files.copy(checkpoint, generationCheckpoint);
+ IOUtils.fsync(generationCheckpoint, false);
+ IOUtils.fsync(generationCheckpoint.getParent(), true);
+ // create a new translog file; this will sync it and update the checkpoint data;
+ current = createWriter(current.getGeneration() + 1);
+ logger.trace("current translog set to [{}]", current.getGeneration());
+ } catch (final Exception e) {
+ IOUtils.closeWhileHandlingException(this); // tragic event
+ throw e;
+ }
}
}
@@ -1428,7 +1420,7 @@ void trimUnreferencedReaders() {
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
- List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
+ List unreferenced = readers.stream().filter(r -> r.getGeneration() <= finalMinReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
index 97c96c8af12f7..093a6d124bd44 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
@@ -68,6 +68,7 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
+import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.nio.file.Files;
@@ -363,49 +364,104 @@ public void testMaybeFlush() throws Exception {
assertEquals(0, shard.getEngine().getTranslog().totalOperations());
}
- public void testStressMaybeFlush() throws Exception {
+ public void testMaybeRollTranslogGeneration() throws Exception {
+ final int generationThreshold = randomIntBetween(1, 512);
+ final Settings settings =
+ Settings
+ .builder()
+ .put("index.number_of_shards", 1)
+ .put("index.translog.generation_threshold_size", generationThreshold + "b")
+ .put()
+ .build();
+ createIndex("test", settings);
+ ensureGreen("test");
+ final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+ final IndexService test = indicesService.indexService(resolveIndex("test"));
+ final IndexShard shard = test.getShardOrNull(0);
+ int rolls = 0;
+ final Translog translog = shard.getEngine().getTranslog();
+ final long generation = translog.currentFileGeneration();
+ for (int i = 0; i < randomIntBetween(32, 128); i++) {
+ assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
+ final ParsedDocument doc = testParsedDocument(
+ "1",
+ "test",
+ null,
+ SequenceNumbersService.UNASSIGNED_SEQ_NO,
+ new ParseContext.Document(),
+ new BytesArray(new byte[]{1}), XContentType.JSON, null);
+ final Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
+ final Engine.IndexResult result = shard.index(index);
+ final Translog.Location location = result.getTranslogLocation();
+ shard.maybeFlushOrRollTranslogGeneration();
+ if (location.translogLocation + location.size > generationThreshold) {
+ // wait until the roll completes
+ assertBusy(() -> assertFalse(shard.shouldRollTranslogGeneration()));
+ rolls++;
+ assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
+ }
+ }
+ }
+
+ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
- client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
- IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
- new ByteSizeValue(117/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
- client().prepareIndex("test", "test", "0").setSource("{}", XContentType.JSON)
- .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
+ final String key;
+ final boolean flush = randomBoolean();
+ if (flush) {
+ key = "index.translog.flush_threshold_size";
+ } else {
+ key = "index.translog.generation_threshold_size";
+ }
+ // size of the operation plus header and footer
+ final Settings settings = Settings.builder().put(key, "117b").build();
+ client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
+ client().prepareIndex("test", "test", "0")
+ .setSource("{}", XContentType.JSON)
+ .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE)
+ .get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);
final int numThreads = randomIntBetween(2, 4);
- Thread[] threads = new Thread[numThreads];
- CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
+ final Thread[] threads = new Thread[numThreads];
+ final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread() {
- @Override
- public void run() {
- try {
- barrier.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- throw new RuntimeException(e);
- }
- while (running.get()) {
- shard.maybeFlush();
- }
+ threads[i] = new Thread(() -> {
+ try {
+ barrier.await();
+ } catch (final InterruptedException | BrokenBarrierException e) {
+ throw new RuntimeException(e);
+ }
+ while (running.get()) {
+ shard.maybeFlushOrRollTranslogGeneration();
}
- };
+ });
threads[i].start();
}
barrier.await();
- FlushStats flushStats = shard.flushStats();
- long total = flushStats.getTotal();
- client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
- assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal()));
+ final Runnable check;
+ if (flush) {
+ final FlushStats flushStats = shard.flushStats();
+ final long total = flushStats.getTotal();
+ client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
+ check = () -> assertEquals(total + 1, shard.flushStats().getTotal());
+ } else {
+ final long generation = shard.getEngine().getTranslog().currentFileGeneration();
+ client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
+ check = () -> assertEquals(
+ generation + 1,
+ shard.getEngine().getTranslog().currentFileGeneration());
+ }
+ assertBusy(check);
running.set(false);
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
- assertEquals(total + 1, shard.flushStats().getTotal());
+ check.run();
}
public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 87f8b6bc4c9cb..36401deed4bc2 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -103,7 +103,6 @@
import java.util.stream.Collectors;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
-import static org.elasticsearch.common.util.BigArrays.PAGE_SIZE_IN_BYTES;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -2120,37 +2119,6 @@ public void testRollGeneration() throws IOException {
assertFileIsPresent(translog, generation + rolls + 1);
}
- public void testGenerationThreshold() throws IOException {
- translog.close();
- final int generationThreshold = randomIntBetween(1, 512);
- final Settings settings = Settings
- .builder()
- .put("index.translog.generation_threshold_size", generationThreshold + "b")
- .build();
- long seqNo = 0;
- long rolls = 0;
- final TranslogConfig config = getTranslogConfig(translogDir, settings);
- try (Translog translog =
- new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
- final long generation = translog.currentFileGeneration();
- for (int i = 0; i < randomIntBetween(32, 128); i++) {
- assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
- final Location location = translog.add(new Translog.NoOp(seqNo++, 0, "test"));
- if (location.translogLocation + location.size > generationThreshold) {
- rolls++;
- assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
- for (int j = 0; j < rolls; j++) {
- assertFileIsPresent(translog, generation + j);
- }
- }
- }
-
- for (int j = 0; j < rolls; j++) {
- assertFileIsPresent(translog, generation + j);
- }
- }
- }
-
public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException {
final long generation = translog.currentFileGeneration();
int seqNo = 0;
@@ -2201,10 +2169,10 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
translog.commit();
- for (int i = 0; i < rollsBefore; i++) {
+ for (int i = 0; i <= rollsBefore; i++) {
assertFileDeleted(translog, generation + i);
}
- for (int i = rollsBefore; i <= rollsBefore + 1 + rollsBetween; i++) {
+ for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) {
assertFileIsPresent(translog, generation + i);
}