diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 07dcdf7db116a..7be3406659e62 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -33,17 +34,24 @@ */ public final class ResyncReplicationRequest extends ReplicatedWriteRequest { + private long trimAboveSeqNo; private Translog.Operation[] operations; ResyncReplicationRequest() { super(); } - public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) { + public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, + final Translog.Operation[] operations) { super(shardId); + this.trimAboveSeqNo = trimAboveSeqNo; this.operations = operations; } + public long getTrimAboveSeqNo() { + return trimAboveSeqNo; + } + public Translog.Operation[] getOperations() { return operations; } @@ -60,12 +68,20 @@ public void readFrom(final StreamInput in) throws IOException { throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); } super.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + trimAboveSeqNo = in.readZLong(); + } else { + trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeZLong(trimAboveSeqNo); + } out.writeArray(Translog.Operation::writeOperation, operations); } @@ -74,12 +90,13 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final ResyncReplicationRequest that = (ResyncReplicationRequest) o; - return Arrays.equals(operations, that.operations); + return trimAboveSeqNo == that.trimAboveSeqNo + && Arrays.equals(operations, that.operations); } @Override public int hashCode() { - return Arrays.hashCode(operations); + return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations); } @Override @@ -88,6 +105,7 @@ public String toString() { "shardId=" + shardId + ", timeout=" + timeout + ", index='" + index + '\'' + + ", trimAboveSeqNo=" + trimAboveSeqNo + ", ops=" + operations.length + "}"; } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 3dd2bd4df580f..78c1e835d4087 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -135,6 +135,9 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques } } } + if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo()); + } return location; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 314eeffd7aa6a..5283975be7b12 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -236,6 +236,12 @@ boolean isThrottled() { */ public abstract boolean isThrottled(); + /** + * Trims translog for terms below belowTerm and seq# above aboveSeqNo + * @see Translog#trimOperations(long, long) + */ + public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException; + /** A Lock implementation that always allows the lock to be acquired */ protected static final class NoOpLock implements Lock { @@ -904,7 +910,7 @@ public final boolean refreshNeeded() { * checks and removes translog files that no longer need to be retained. See * {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details */ - public abstract void trimTranslog() throws EngineException; + public abstract void trimUnreferencedTranslogFiles() throws EngineException; /** * Tests whether or not the translog generation should be rolled to a new generation. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bca84f81a29c4..88e7160845266 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1552,7 +1552,7 @@ public void rollTranslogGeneration() throws EngineException { } @Override - public void trimTranslog() throws EngineException { + public void trimUnreferencedTranslogFiles() throws EngineException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); translog.trimUnreferencedReaders(); @@ -1569,6 +1569,24 @@ public void trimTranslog() throws EngineException { } } + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + translog.trimOperations(belowTerm, aboveSeqNo); + } catch (AlreadyClosedException e) { + failOnTragicEvent(e); + throw e; + } catch (Exception e) { + try { + failEngine("translog operations trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog operations", e); + } + } + private void pruneDeletedTombstones() { /* * We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary diff --git a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index 0c071f4b2d422..7cffc8c1ac911 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -37,7 +37,7 @@ public class SequenceNumbers { */ public static final long UNASSIGNED_SEQ_NO = -2L; /** - * Represents no operations have been performed on the shard. + * Represents no operations have been performed on the shard. Initial value of a sequence number. */ public static final long NO_OPS_PERFORMED = -1L; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 60392ab7990df..1716f57032809 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -993,7 +993,7 @@ public Engine.CommitId flush(FlushRequest request) { public void trimTranslog() { verifyNotClosed(); final Engine engine = getEngine(); - engine.trimTranslog(); + engine.trimUnreferencedTranslogFiles(); } /** @@ -1195,6 +1195,10 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { + getEngine().trimOperationsFromTranslog(primaryTerm, aboveSeqNo); + } + public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { final Engine.Result result; switch (operation.opType()) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index af8c9bdd0272f..8e05e7bf08efa 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; @@ -84,6 +85,7 @@ public void resync(final IndexShard indexShard, final ActionListener try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { @@ -135,7 +137,7 @@ public synchronized Translog.Operation next() throws IOException { } }; resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, - startingSeqNo, resyncListener); + startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { if (resyncListener != null) { resyncListener.onFailure(e); @@ -146,7 +148,7 @@ public synchronized Translog.Operation next() throws IOException { } private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot, - long startingSeqNo, ActionListener listener) { + long startingSeqNo, long maxSeqNo, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @@ -166,7 +168,7 @@ public void onFailure(Exception e) { }; try { new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, wrappedListener).run(); + startingSeqNo, maxSeqNo, wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -186,14 +188,16 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener; + private final AtomicBoolean firstMessage = new AtomicBoolean(true); private final AtomicInteger totalSentOps = new AtomicInteger(); private final AtomicInteger totalSkippedOps = new AtomicInteger(); private AtomicBoolean closed = new AtomicBoolean(); SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm, - Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener listener) { + Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener listener) { this.logger = logger; this.syncAction = syncAction; this.task = task; @@ -203,6 +207,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListenerbelowTerm and seq# above aboveSeqNo. + * Effectively it moves max visible seq# {@link Checkpoint#trimmedAboveSeqNo} therefore {@link TranslogSnapshot} skips those operations. + */ + public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { + assert aboveSeqNo >= SequenceNumbers.NO_OPS_PERFORMED : "aboveSeqNo has to a valid sequence number"; + + try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); + if (current.getPrimaryTerm() < belowTerm) { + throw new IllegalArgumentException("Trimming the translog can only be done for terms lower than the current one. " + + "Trim requested for term [ " + belowTerm + " ] , current is [ " + current.getPrimaryTerm() + " ]"); + } + // we assume that the current translog generation doesn't have trimmable ops. Verify that. + assert current.assertNoSeqAbove(belowTerm, aboveSeqNo); + // update all existed ones (if it is necessary) as checkpoint and reader are immutable + final List newReaders = new ArrayList<>(readers.size()); + try { + for (TranslogReader reader : readers) { + final TranslogReader newReader = + reader.getPrimaryTerm() < belowTerm + ? reader.closeIntoTrimmedReader(aboveSeqNo, getChannelFactory()) + : reader; + newReaders.add(newReader); + } + } catch (IOException e) { + IOUtils.closeWhileHandlingException(newReaders); + close(); + throw e; + } + + this.readers.clear(); + this.readers.addAll(newReaders); + } + } /** * Ensures that the given location has be synced / written to the underlying storage. @@ -845,6 +880,13 @@ public interface Snapshot extends Closeable { */ int totalOperations(); + /** + * The number of operations have been skipped (overridden or trimmed) in the snapshot so far. + */ + default int skippedOperations() { + return 0; + } + /** * The number of operations have been overridden (eg. superseded) in the snapshot so far. * If two operations have the same sequence number, the operation with a lower term will be overridden by the operation diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 29e30bd25dd37..4091fa45762e1 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -21,6 +21,8 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.Closeable; import java.io.EOFException; @@ -28,8 +30,11 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.index.translog.Translog.getCommitCheckpointFileName; + /** * an immutable translog filereader */ @@ -70,6 +75,39 @@ public static TranslogReader open( return new TranslogReader(checkpoint, channel, path, header); } + /** + * Closes current reader and creates new one with new checkoint and same file channel + */ + TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFactory) throws IOException { + if (closed.compareAndSet(false, true)) { + Closeable toCloseOnFailure = channel; + final TranslogReader newReader; + try { + if (aboveSeqNo < checkpoint.trimmedAboveSeqNo + || aboveSeqNo < checkpoint.maxSeqNo && checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + final Path checkpointFile = path.getParent().resolve(getCommitCheckpointFileName(checkpoint.generation)); + final Checkpoint newCheckpoint = new Checkpoint(checkpoint.offset, checkpoint.numOps, + checkpoint.generation, checkpoint.minSeqNo, checkpoint.maxSeqNo, + checkpoint.globalCheckpoint, checkpoint.minTranslogGeneration, aboveSeqNo); + Checkpoint.write(channelFactory, checkpointFile, newCheckpoint, StandardOpenOption.WRITE); + + IOUtils.fsync(checkpointFile, false); + IOUtils.fsync(checkpointFile.getParent(), true); + + newReader = new TranslogReader(newCheckpoint, channel, path, header); + } else { + newReader = new TranslogReader(checkpoint, channel, path, header); + } + toCloseOnFailure = null; + return newReader; + } finally { + IOUtils.close(toCloseOnFailure); + } + } else { + throw new AlreadyClosedException(toString() + " is already closed"); + } + } + public long sizeInBytes() { return length; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index a966720353297..8fe92bba0097c 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.translog; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.EOFException; import java.io.IOException; @@ -32,6 +33,7 @@ final class TranslogSnapshot extends BaseTranslogReader { private final ByteBuffer reusableBuffer; private long position; + private int skippedOperations; private int readOperations; private BufferedChecksumStreamInput reuse; @@ -54,17 +56,24 @@ public int totalOperations() { return totalOperations; } + int skippedOperations(){ + return skippedOperations; + } + @Override Checkpoint getCheckpoint() { return checkpoint; } public Translog.Operation next() throws IOException { - if (readOperations < totalOperations) { - return readOperation(); - } else { - return null; + while (readOperations < totalOperations) { + final Translog.Operation operation = readOperation(); + if (operation.seqNo() <= checkpoint.trimmedAboveSeqNo || checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + return operation; + } + skippedOperations++; } + return null; } protected Translog.Operation readOperation() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index cae6578886534..b89b21c52588a 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.OutputStreamDataOutput; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.common.bytes.BytesArray; @@ -92,6 +91,7 @@ private TranslogWriter( this.minSeqNo = initialCheckpoint.minSeqNo; assert initialCheckpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo; this.maxSeqNo = initialCheckpoint.maxSeqNo; + assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; } @@ -213,6 +213,25 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc return true; } + synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { + seenSequenceNumbers.entrySet().stream().filter(e -> e.getKey().longValue() > aboveSeqNo) + .forEach(e -> { + final Translog.Operation op; + try { + op = Translog.readOperation(new BufferedChecksumStreamInput(e.getValue().v1().streamInput())); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + long seqNo = op.seqNo(); + long primaryTerm = op.primaryTerm(); + if (primaryTerm < belowTerm) { + throw new AssertionError("current should not have any operations with seq#:primaryTerm [" + + seqNo + ":" + primaryTerm + "] > " + aboveSeqNo + ":" + belowTerm); + } + }); + return true; + } + /** * write all buffered ops to disk and fsync file. * @@ -241,7 +260,8 @@ public int totalOperations() { @Override synchronized Checkpoint getCheckpoint() { return new Checkpoint(totalOffset, operationCounter, generation, minSeqNo, maxSeqNo, - globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong()); + globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong(), + SequenceNumbers.UNASSIGNED_SEQ_NO); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4c543aeeb22d4..72a6fcb6ba329 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -615,9 +615,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require cancellableThreads.executeIO(sendBatch); } - assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps + assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); + expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps); if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { throw new IllegalStateException("translog replay failed to cover required sequence numbers" + diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index d5ad3941a5e8f..914c2b87422db 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -40,7 +40,7 @@ public void testSerialization() throws IOException { final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(), Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); - final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index}); + final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index}); final BytesStreamOutput out = new BytesStreamOutput(); before.writeTo(out); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 736dc40e6867d..018548be9629f 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -340,9 +340,10 @@ public void testSeqNoCollision() throws Exception { op1 = snapshot.next(); assertThat(op1, notNullValue()); assertThat(snapshot.next(), nullValue()); - assertThat(snapshot.overriddenOperations(), equalTo(0)); + assertThat(snapshot.skippedOperations(), equalTo(0)); } - // Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1). + // Make sure that replica2 receives translog ops (eg. op2) from replica1 + // and does not overwrite its stale operation (op1) as it is trimmed. logger.info("--> Promote replica1 as the primary"); shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); @@ -353,7 +354,8 @@ public void testSeqNoCollision() throws Exception { assertThat(op2.seqNo(), equalTo(op1.seqNo())); assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat(snapshot.overriddenOperations(), equalTo(1)); + assertThat(snapshot.overriddenOperations(), equalTo(0)); + assertThat(snapshot.skippedOperations(), equalTo(1)); } // Make sure that peer-recovery transfers all but non-overridden operations. @@ -366,7 +368,7 @@ public void testSeqNoCollision() throws Exception { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0)); + assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } // TODO: We should assert the content of shards in the ReplicationGroup. // Without rollback replicas(current implementation), we don't have the same content across shards: diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index a34963a475155..ae9488227e409 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -52,8 +52,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,6 +68,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -352,10 +357,19 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.action.resync:TRACE") public void testResyncAfterPrimaryPromotion() throws Exception { - // TODO: check translog trimming functionality once it's implemented - try (ReplicationGroup shards = createGroup(2)) { + // TODO: check translog trimming functionality once rollback is implemented in Lucene (ES trimming is done) + Map mappings = + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); - int initialDocs = shards.indexDocs(randomInt(10)); + int initialDocs = randomInt(10); + + for (int i = 0; i < initialDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "initial_doc_" + i) + .source("{ \"f\": \"normal\"}", XContentType.JSON); + shards.index(indexRequest); + } + boolean syncedGlobalCheckPoint = randomBoolean(); if (syncedGlobalCheckPoint) { shards.syncGlobalCheckpoint(); @@ -363,16 +377,30 @@ public void testResyncAfterPrimaryPromotion() throws Exception { final IndexShard oldPrimary = shards.getPrimary(); final IndexShard newPrimary = shards.getReplicas().get(0); + final IndexShard justReplica = shards.getReplicas().get(1); // simulate docs that were inflight when primary failed - final int extraDocs = randomIntBetween(0, 5); + final int extraDocs = randomInt(5); logger.info("--> indexing {} extra docs", extraDocs); for (int i = 0; i < extraDocs; i++) { - final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i) - .source("{}", XContentType.JSON); + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_doc_" + i) + .source("{ \"f\": \"normal\"}", XContentType.JSON); final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); indexOnReplica(bulkShardRequest, shards, newPrimary); } + + final int extraDocsToBeTrimmed = randomIntBetween(0, 10); + logger.info("--> indexing {} extra docs to be trimmed", extraDocsToBeTrimmed); + for (int i = 0; i < extraDocsToBeTrimmed; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_trimmed_" + i) + .source("{ \"f\": \"trimmed\"}", XContentType.JSON); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + // have to replicate to another replica != newPrimary one - the subject to trim + indexOnReplica(bulkShardRequest, shards, justReplica); + } + + logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); + logger.info("--> resyncing replicas"); PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get(); if (syncedGlobalCheckPoint) { @@ -380,7 +408,36 @@ public void testResyncAfterPrimaryPromotion() throws Exception { } else { assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); } - shards.assertAllEqual(initialDocs + extraDocs); + List replicas = shards.getReplicas(); + + // check all docs on primary are available on replica + Set primaryIds = getShardDocUIDs(newPrimary); + assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs)); + for (IndexShard replica : replicas) { + Set replicaIds = getShardDocUIDs(replica); + Set temp = new HashSet<>(primaryIds); + temp.removeAll(replicaIds); + assertThat(replica.routingEntry() + " is missing docs", temp, empty()); + temp = new HashSet<>(replicaIds); + temp.removeAll(primaryIds); + // yeah, replica has more docs as there is no Lucene roll back on it + assertThat(replica.routingEntry() + " has to have extra docs", temp, + extraDocsToBeTrimmed > 0 ? not(empty()) : empty()); + } + + // check translog on replica is trimmed + int translogOperations = 0; + try(Translog.Snapshot snapshot = getTranslog(justReplica).newSnapshot()) { + Translog.Operation next; + while ((next = snapshot.next()) != null) { + translogOperations++; + assertThat("unexpected op: " + next, (int)next.seqNo(), lessThan(initialDocs + extraDocs)); + assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(), is(oldPrimary.getPrimaryTerm())); + final Translog.Source source = next.getSource(); + assertThat(source.source.utf8ToString(), is("{ \"f\": \"normal\"}")); + } + } + assertThat(translogOperations, is(initialDocs + extraDocs)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 1257aea3d14fa..b290f4d45597b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -36,15 +37,20 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; public class PrimaryReplicaSyncerTests extends IndexShardTestCase { @@ -53,15 +59,17 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { IndexShard shard = newStartedShard(true); TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); + List resyncRequests = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); + resyncRequests.add(request); assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, taskManager, syncAction); - syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 100))); + syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); int numDocs = randomInt(10); for (int i = 0; i < numDocs; i++) { @@ -72,7 +80,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { } long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0; - boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1; + boolean syncNeeded = numDocs > 0; String allocationId = shard.routingEntry().allocationId().getId(); shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), @@ -84,19 +92,29 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { PlainActionFuture fut = new PlainActionFuture<>(); syncer.resync(shard, fut); - fut.get(); + PrimaryReplicaSyncer.ResyncTask resyncTask = fut.get(); if (syncNeeded) { assertTrue("Sync action was not called", syncActionCalled.get()); + ResyncReplicationRequest resyncRequest = resyncRequests.remove(0); + assertThat(resyncRequest.getTrimAboveSeqNo(), equalTo(numDocs - 1L)); + + assertThat("trimAboveSeqNo has to be specified in request #0 only", resyncRequests.stream() + .mapToLong(ResyncReplicationRequest::getTrimAboveSeqNo) + .filter(seqNo -> seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) + .findFirst() + .isPresent(), + is(false)); } - assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, fut.get().getTotalOperations()); - if (syncNeeded) { + + assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations()); + if (syncNeeded && globalCheckPoint < numDocs - 1) { long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included - assertEquals(skippedOps, fut.get().getSkippedOperations()); - assertEquals(numDocs - skippedOps, fut.get().getResyncedOperations()); + assertEquals(skippedOps, resyncTask.getSkippedOperations()); + assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations()); } else { - assertEquals(0, fut.get().getSkippedOperations()); - assertEquals(0, fut.get().getResyncedOperations()); + assertEquals(0, resyncTask.getSkippedOperations()); + assertEquals(0, resyncTask.getResyncedOperations()); } closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java index 0dc404767de3c..99e21d4760463 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java @@ -118,7 +118,8 @@ private void checkFailsToOpen(String file, Class expect assertThat("test file [" + translogFile + "] should exist", Files.exists(translogFile), equalTo(true)); final E error = expectThrows(expectedErrorType, () -> { final Checkpoint checkpoint = new Checkpoint(Files.size(translogFile), 1, 1, - SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, 1); + SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, 1, SequenceNumbers.NO_OPS_PERFORMED); try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { TranslogReader.open(channel, translogFile, checkpoint, null); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b3b9fca886e17..cf6e753684676 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -107,7 +107,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -120,8 +122,11 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.stub; @@ -1474,8 +1479,8 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { fail("corrupted"); } catch (IllegalStateException ex) { assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " + - "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + - "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0}", ex.getMessage()); + "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " + + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { @@ -1507,6 +1512,191 @@ public void testSnapshotFromStreamInput() throws IOException { assertEquals(ops, readOperations); } + public void testSnapshotCurrentHasUnexpectedOperationsForTrimmedOperations() throws Exception { + int extraDocs = randomIntBetween(10, 15); + + // increment primaryTerm to avoid potential negative numbers + primaryTerm.addAndGet(extraDocs); + translog.rollGeneration(); + + for (int op = 0; op < extraDocs; op++) { + String ascii = randomAlphaOfLengthBetween(1, 50); + Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get() - op, + ascii.getBytes("UTF-8")); + translog.add(operation); + } + + AssertionError error = expectThrows(AssertionError.class, () -> translog.trimOperations(primaryTerm.get(), 0)); + assertThat(error.getMessage(), is("current should not have any operations with seq#:primaryTerm " + + "[1:" + (primaryTerm.get() - 1) + "] > 0:" + primaryTerm.get())); + + primaryTerm.incrementAndGet(); + translog.rollGeneration(); + + // add a single operation to current with seq# > trimmed seq# but higher primary term + Translog.Index operation = new Translog.Index("test", "" + 1, 1L, primaryTerm.get(), + randomAlphaOfLengthBetween(1, 50).getBytes("UTF-8")); + translog.add(operation); + + // it is possible to trim after generation rollover + translog.trimOperations(primaryTerm.get(), 0); + } + + public void testSnapshotTrimmedOperations() throws Exception { + final InMemoryTranslog inMemoryTranslog = new InMemoryTranslog(); + final List allOperations = new ArrayList<>(); + + for(int attempt = 0, maxAttempts = randomIntBetween(3, 10); attempt < maxAttempts; attempt++) { + List ops = LongStream.range(0, allOperations.size() + randomIntBetween(10, 15)) + .boxed().collect(Collectors.toList()); + Randomness.shuffle(ops); + + AtomicReference source = new AtomicReference<>(); + for (final long op : ops) { + source.set(randomAlphaOfLengthBetween(1, 50)); + + // have to use exactly the same source for same seq# if primaryTerm is not changed + if (primaryTerm.get() == translog.getCurrent().getPrimaryTerm()) { + // use the latest source of op with the same seq# - therefore no break + allOperations + .stream() + .filter(allOp -> allOp instanceof Translog.Index && allOp.seqNo() == op) + .map(allOp -> ((Translog.Index)allOp).source().utf8ToString()) + .reduce((a, b) -> b) + .ifPresent(source::set); + } + + // use ongoing primaryTerms - or the same as it was + Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get(), + source.get().getBytes("UTF-8")); + translog.add(operation); + inMemoryTranslog.add(operation); + allOperations.add(operation); + } + + if (randomBoolean()) { + primaryTerm.incrementAndGet(); + translog.rollGeneration(); + } + + long maxTrimmedSeqNo = randomInt(allOperations.size()); + + translog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo); + inMemoryTranslog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo); + translog.sync(); + + Collection effectiveOperations = inMemoryTranslog.operations(); + + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, containsOperationsInAnyOrder(effectiveOperations)); + assertThat(snapshot.totalOperations(), is(allOperations.size())); + assertThat(snapshot.skippedOperations(), is(allOperations.size() - effectiveOperations.size())); + } + } + } + + /** + * this class mimic behaviour of original {@link Translog} + */ + static class InMemoryTranslog { + private final Map operations = new HashMap<>(); + + void add(Translog.Operation operation) { + final Translog.Operation old = operations.put(operation.seqNo(), operation); + assert old == null || old.primaryTerm() <= operation.primaryTerm(); + } + + void trimOperations(long belowTerm, long aboveSeqNo) { + for (final Iterator> it = operations.entrySet().iterator(); it.hasNext(); ) { + final Map.Entry next = it.next(); + Translog.Operation op = next.getValue(); + boolean drop = op.primaryTerm() < belowTerm && op.seqNo() > aboveSeqNo; + if (drop) { + it.remove(); + } + } + } + + Collection operations() { + return operations.values(); + } + } + + public void testRandomExceptionsOnTrimOperations( ) throws Exception { + Path tempDir = createTempDir(); + final FailSwitch fail = new FailSwitch(); + fail.failNever(); + TranslogConfig config = getTranslogConfig(tempDir); + List fileChannels = new ArrayList<>(); + final Translog failableTLog = + getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy(), fileChannels); + + IOException expectedException = null; + int translogOperations = 0; + final int maxAttempts = 10; + for(int attempt = 0; attempt < maxAttempts; attempt++) { + int maxTrimmedSeqNo; + fail.failNever(); + int extraTranslogOperations = randomIntBetween(10, 100); + + List ops = IntStream.range(translogOperations, translogOperations + extraTranslogOperations) + .boxed().collect(Collectors.toList()); + Randomness.shuffle(ops); + for (int op : ops) { + String ascii = randomAlphaOfLengthBetween(1, 50); + Translog.Index operation = new Translog.Index("test", "" + op, op, + primaryTerm.get(), ascii.getBytes("UTF-8")); + + failableTLog.add(operation); + } + + translogOperations += extraTranslogOperations; + + // at least one roll + inc of primary term has to be there - otherwise trim would not take place at all + // last attempt we have to make roll as well - otherwise could skip trimming as it has been trimmed already + boolean rollover = attempt == 0 || attempt == maxAttempts - 1 || randomBoolean(); + if (rollover) { + primaryTerm.incrementAndGet(); + failableTLog.rollGeneration(); + } + + maxTrimmedSeqNo = rollover ? translogOperations - randomIntBetween(4, 8) : translogOperations + 1; + + // if we are so happy to reach the max attempts - fail it always` + fail.failRate(attempt < maxAttempts - 1 ? 25 : 100); + try { + failableTLog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo); + } catch (IOException e){ + expectedException = e; + break; + } + } + + assertThat(expectedException, is(not(nullValue()))); + + assertThat(fileChannels, is(not(empty()))); + assertThat("all file channels have to be closed", + fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false)); + + assertThat(failableTLog.isOpen(), is(false)); + final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, () -> failableTLog.newSnapshot()); + assertThat(alreadyClosedException.getMessage(), + is("translog is already closed")); + + fail.failNever(); + + // check that despite of IO exception translog is not corrupted + try(Translog reopenedTranslog = openTranslog(config, failableTLog.getTranslogUUID())) { + try (Translog.Snapshot snapshot = reopenedTranslog.newSnapshot()) { + assertThat(snapshot.totalOperations(), greaterThan(0)); + Translog.Operation operation; + for (int i = 0; (operation = snapshot.next()) != null; i++) { + assertNotNull("operation " + i + " must be non-null", operation); + } + } + } + } + public void testLocationHashCodeEquals() throws IOException { List locations = new ArrayList<>(); List locations2 = new ArrayList<>(); @@ -2007,7 +2197,8 @@ private static class FailSwitch { private volatile boolean onceFailedFailAlways = false; public boolean fail() { - boolean fail = randomIntBetween(1, 100) <= failRate; + final int rnd = randomIntBetween(1, 100); + boolean fail = rnd <= failRate; if (fail && onceFailedFailAlways) { failAlways(); } @@ -2026,17 +2217,30 @@ public void failRandomly() { failRate = randomIntBetween(1, 100); } + public void failRate(int rate) { + failRate = rate; + } + public void onceFailedFailAlways() { onceFailedFailAlways = true; } } - private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites, final boolean throwUnknownException, String translogUUID, final TranslogDeletionPolicy deletionPolicy) throws IOException { + return getFailableTranslog(fail, config, partialWrites, throwUnknownException, translogUUID, deletionPolicy, null); + } + + private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites, + final boolean throwUnknownException, String translogUUID, + final TranslogDeletionPolicy deletionPolicy, + final List fileChannels) throws IOException { final ChannelFactory channelFactory = (file, openOption) -> { FileChannel channel = FileChannel.open(file, openOption); + if (fileChannels != null) { + fileChannels.add(channel); + } boolean success = false; try { final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation @@ -2393,7 +2597,7 @@ private Checkpoint randomCheckpoint() { } final long generation = randomNonNegativeLong(); return new Checkpoint(randomLong(), randomInt(), generation, minSeqNo, maxSeqNo, randomNonNegativeLong(), - randomLongBetween(1, generation)); + randomLongBetween(1, generation), maxSeqNo); } public void testCheckpointOnDiskFull() throws IOException { @@ -2617,7 +2821,7 @@ public void testMinSeqNoBasedAPI() throws IOException { assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); readFromSnapshot++; } - readFromSnapshot += snapshot.overriddenOperations(); + readFromSnapshot += snapshot.skippedOperations(); } assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo;