From 9959cc5eb32445839e0d79db0d32707b77638ab7 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 21 Jun 2022 12:30:48 +0530 Subject: [PATCH 1/5] Introduce decoupled translog manager interfaces Signed-off-by: Bukhtawar Khan --- .../translog/InternalTranslogManager.java | 344 ++++++++++++ .../index/translog/NoOpTranslogManager.java | 110 ++++ .../index/translog/TranslogManager.java | 108 ++++ .../translog/TranslogRecoveryRunner.java | 28 + .../translog/WriteOnlyTranslogManager.java | 69 +++ .../CompositeTranslogEventListener.java | 71 +++ .../listener/TranslogEventListener.java | 35 ++ .../index/translog/listener/package-info.java | 11 + .../InternalTranslogManagerTests.java | 276 ++++++++++ .../translog/TranslogManagerTestCase.java | 488 ++++++++++++++++++ 10 files changed, 1540 insertions(+) create mode 100644 server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/TranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java create mode 100644 server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/listener/package-info.java create mode 100644 server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java create mode 100644 server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java new file mode 100644 index 0000000000000..37e04b51564a6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -0,0 +1,344 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.listener.TranslogEventListener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * The {@link TranslogManager} implementation capable of orchestrating all read/write {@link Translog} operations while + * interfacing with the {@link org.opensearch.index.engine.InternalEngine} + * + * @opensearch.internal + */ +public class InternalTranslogManager implements TranslogManager { + + private final ReleasableLock readLock; + private final Runnable ensureOpen; + private final ShardId shardId; + private final Translog translog; + private final BiConsumer failEngine; + private final Function failOnTragicEvent; + private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); + private final TranslogEventListener translogEventListener; + private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class); + + public InternalTranslogManager( + EngineConfig engineConfig, + ShardId shardId, + ReleasableLock readLock, + Supplier localCheckpointTrackerSupplier, + String translogUUID, + TranslogEventListener translogEventListener, + Runnable ensureOpen, + BiConsumer failEngine, + Function failOnTragicEvent + ) throws IOException { + this.shardId = shardId; + this.readLock = readLock; + this.ensureOpen = ensureOpen; + this.failEngine = failEngine; + this.failOnTragicEvent = failOnTragicEvent; + this.translogEventListener = translogEventListener; + final TranslogDeletionPolicy translogDeletionPolicy; + TranslogDeletionPolicy customTranslogDeletionPolicy = null; + Translog translog; + if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) { + customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory() + .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier()); + } + if (customTranslogDeletionPolicy != null) { + translogDeletionPolicy = customTranslogDeletionPolicy; + } else { + translogDeletionPolicy = new DefaultTranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + ); + } + translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> { + final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get(); + assert tracker != null || getTranslog(true).isOpen() == false; + if (tracker != null) { + tracker.markSeqNoAsPersisted(seqNo); + } + }, translogUUID); + assert translog.getGeneration() != null; + this.translog = translog; + assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; + // don't allow commits until we are done with recovering + pendingTranslogRecovery.set(true); + } + + /** + * Rolls the translog generation and cleans unneeded. + */ + @Override + public void rollTranslogGeneration() throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen.run(); + translog.rollGeneration(); + translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + failOnTragicEvent.apply(e); + throw e; + } catch (Exception e) { + try { + failEngine.accept("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new TranslogException(shardId, "failed to roll translog", e); + } + } + + /** + * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive). + * This operation will close the engine if the recovery fails. + * @param translogRecoveryRunner the translog recovery runner + * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered + * @return the total number of operations recovered + */ + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + int opsRecovered = 0; + translogEventListener.onBeginTranslogRecovery(); + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen.run(); + if (pendingTranslogRecovery.get() == false) { + throw new IllegalStateException("Engine has already been recovered"); + } + try { + opsRecovered = recoverFromTranslogInternal(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo); + } catch (Exception e) { + try { + pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush + failEngine.accept("failed to recover from translog", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } + } + return opsRecovered; + } + + private int recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) { + final int opsRecovered; + if (localCheckpoint < recoverUpToSeqNo) { + try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + opsRecovered = translogRecoveryRunner.run(snapshot); + } catch (Exception e) { + throw new TranslogException(shardId, "failed to recover from translog", e); + } + } else { + opsRecovered = 0; + } + // flush if we recovered something or if we have references to older translogs + // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. + assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; + pendingTranslogRecovery.set(false); // we are good - now we can commit + logger.trace( + () -> new ParameterizedMessage( + "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", + opsRecovered, + translog.currentFileGeneration() + ) + ); + translogEventListener.onTranslogRecovery(); + return opsRecovered; + } + + /** + * Checks if the underlying storage sync is required. + */ + @Override + public boolean isTranslogSyncNeeded() { + return getTranslog(true).syncNeeded(); + } + + /** + * Ensures that all locations in the given stream have been written to the underlying storage. + */ + @Override + public boolean ensureTranslogSynced(Stream locations) throws IOException { + final boolean synced = translog.ensureSynced(locations); + if (synced) { + translogEventListener.onTranslogSync(); + } + return synced; + } + + /** + * Syncs the translog and invokes the listener + * @throws IOException the exception on sync failure + */ + @Override + public void syncTranslog() throws IOException { + translog.sync(); + translogEventListener.onTranslogSync(); + } + + @Override + public TranslogStats getTranslogStats() { + return getTranslog(true).stats(); + } + + /** + * Returns the last location that the translog of this engine has written into. + */ + @Override + public Translog.Location getTranslogLastWriteLocation() { + return getTranslog(true).getLastWriteLocation(); + } + + /** + * checks and removes translog files that no longer need to be retained. See + * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details + */ + @Override + public void trimUnreferencedTranslogFiles() throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen.run(); + translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + failOnTragicEvent.apply(e); + throw e; + } catch (Exception e) { + try { + failEngine.accept("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new TranslogException(shardId, "failed to trim translog", e); + } + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. + * This test is based on the size of the current generation compared to the configured generation threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + @Override + public boolean shouldRollTranslogGeneration() { + return getTranslog(true).shouldRollGeneration(); + } + + /** + * Trims translog for terms below belowTerm and seq# above aboveSeqNo + * @see Translog#trimOperations(long, long) + */ + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen.run(); + translog.trimOperations(belowTerm, aboveSeqNo); + } catch (AlreadyClosedException e) { + failOnTragicEvent.apply(e); + throw e; + } catch (Exception e) { + try { + failEngine.accept("translog operations trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new TranslogException(shardId, "failed to trim translog operations", e); + } + } + + /** + * This method replays translog to restore the Lucene index which might be reverted previously. + * This ensures that all acknowledged writes are restored correctly when this engine is promoted. + * + * @return the number of translog operations have been recovered + */ + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen.run(); + try (Translog.Snapshot snapshot = getTranslog(true).newSnapshot(processedCheckpoint + 1, Long.MAX_VALUE)) { + return translogRecoveryRunner.run(snapshot); + } + } + } + + /** + * Ensures that the flushes can succeed if there are no pending translog recovery + */ + @Override + public void ensureCanFlush() { + // translog recovery happens after the engine is fully constructed. + // If we are in this stage we have to prevent flushes from this + // engine otherwise we might loose documents if the flush succeeds + // and the translog recovery fails when we "commit" the translog on flush. + if (pendingTranslogRecovery.get()) { + throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery"); + } + } + + /** + * Do not replay translog operations, but make the engine be ready. + */ + @Override + public void skipTranslogRecovery() { + assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; + pendingTranslogRecovery.set(false); // we are good - now we can commit + } + + private Translog openTranslog( + EngineConfig engineConfig, + TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier, + LongConsumer persistedSequenceNumberConsumer, + String translogUUID + ) throws IOException { + + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! + return new Translog( + translogConfig, + translogUUID, + translogDeletionPolicy, + globalCheckpointSupplier, + engineConfig.getPrimaryTermSupplier(), + persistedSequenceNumberConsumer + ); + } + + /** + * Returns the the translog instance + * @param ensureOpen check if the engine is open + * @return the {@link Translog} instance + */ + @Override + public Translog getTranslog(boolean ensureOpen) { + if (ensureOpen) { + this.ensureOpen.run(); + } + return translog; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java new file mode 100644 index 0000000000000..07cae808ce071 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.stream.Stream; + +/** + * The no-op implementation of {@link TranslogManager} that doesn't perform any operation + * + * @opensearch.internal + */ +public class NoOpTranslogManager implements TranslogManager { + + private final Translog.Snapshot emptyTranslogSnapshot; + private final ReleasableLock readLock; + private final Runnable ensureOpen; + private final ShardId shardId; + private final TranslogStats translogStats; + + public NoOpTranslogManager( + ShardId shardId, + ReleasableLock readLock, + Runnable ensureOpen, + TranslogStats translogStats, + Translog.Snapshot emptyTranslogSnapshot + ) throws IOException { + this.emptyTranslogSnapshot = emptyTranslogSnapshot; + this.readLock = readLock; + this.shardId = shardId; + this.ensureOpen = ensureOpen; + this.translogStats = translogStats; + } + + @Override + public void rollTranslogGeneration() throws TranslogException {} + + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen.run(); + try (Translog.Snapshot snapshot = emptyTranslogSnapshot) { + translogRecoveryRunner.run(snapshot); + } catch (final Exception e) { + throw new TranslogException(shardId, "failed to recover from empty translog snapshot", e); + } + } + return emptyTranslogSnapshot.totalOperations(); + } + + @Override + public boolean isTranslogSyncNeeded() { + return false; + } + + @Override + public boolean ensureTranslogSynced(Stream locations) throws IOException { + return false; + } + + @Override + public void syncTranslog() throws IOException {} + + @Override + public TranslogStats getTranslogStats() { + return translogStats; + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + return new Translog.Location(0, 0, 0); + } + + @Override + public void trimUnreferencedTranslogFiles() throws TranslogException {} + + @Override + public boolean shouldRollTranslogGeneration() { + return false; + } + + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} + + @Override + public Translog getTranslog(boolean ensureOpen) { + return null; + } + + @Override + public void ensureCanFlush() {} + + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + return 0; + } + + @Override + public void skipTranslogRecovery() {} +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java new file mode 100644 index 0000000000000..08207a8237c75 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.IOException; +import java.util.stream.Stream; + +/** + * The interface that orchestrates Translog operations and manages the {@link Translog} and interfaces with the Engine + * + * @opensearch.internal + */ +public interface TranslogManager { + + /** + * Rolls the translog generation and cleans unneeded. + */ + void rollTranslogGeneration() throws TranslogException; + + /** + * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive). + * This operation will close the engine if the recovery fails. + * + * @param translogRecoveryRunner the translog recovery runner + * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered + * @return ops recovered + */ + int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException; + + /** + * Checks if the underlying storage sync is required. + */ + boolean isTranslogSyncNeeded(); + + /** + * Ensures that all locations in the given stream have been written to the underlying storage. + */ + boolean ensureTranslogSynced(Stream locations) throws IOException; + + /** + * Syncs translog to disk + * @throws IOException the exception while performing the sync operation + */ + void syncTranslog() throws IOException; + + /** + * Translog operation stats + * @return the translog stats + */ + TranslogStats getTranslogStats(); + + /** + * Returns the last location that the translog of this engine has written into. + */ + Translog.Location getTranslogLastWriteLocation(); + + /** + * checks and removes translog files that no longer need to be retained. See + * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details + */ + void trimUnreferencedTranslogFiles() throws TranslogException; + + /** + * Tests whether or not the translog generation should be rolled to a new generation. + * This test is based on the size of the current generation compared to the configured generation threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + boolean shouldRollTranslogGeneration(); + + /** + * Trims translog for terms below belowTerm and seq# above aboveSeqNo + * + * @see Translog#trimOperations(long, long) + */ + void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException; + + /** + * This method replays translog to restore the Lucene index which might be reverted previously. + * This ensures that all acknowledged writes are restored correctly when this engine is promoted. + * + * @return the number of translog operations have been recovered + */ + int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException; + + /** + * Do not replay translog operations, but make the engine be ready. + */ + void skipTranslogRecovery(); + + /** + * Returns the instance of the translog with a precondition + * @param ensureOpen check if the engine is open + * @return the translog instance + */ + Translog getTranslog(boolean ensureOpen); + + /** + * Checks if the translog has a pending recovery + */ + void ensureCanFlush(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java new file mode 100644 index 0000000000000..b1d121db397ce --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.IOException; + +/** + * The interface that defines how {@link Translog.Snapshot} will get replayed into the Engine + * + * @opensearch.internal + */ +@FunctionalInterface +public interface TranslogRecoveryRunner { + + /** + * Recovers a translog snapshot + * @param snapshot + * @return recoveredOps + * @throws IOException exception while recovering operations + */ + int run(Translog.Snapshot snapshot) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java new file mode 100644 index 0000000000000..f4f5e752f69d1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.listener.TranslogEventListener; + +import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/*** + * The implementation of {@link TranslogManager} that only orchestrates writes to the underlying {@link Translog} + * + * @opensearch.internal + */ +public class WriteOnlyTranslogManager extends InternalTranslogManager { + + public WriteOnlyTranslogManager( + EngineConfig engineConfig, + ShardId shardId, + ReleasableLock readLock, + Supplier localCheckpointTrackerSupplier, + String translogUUID, + TranslogEventListener translogEventListener, + Runnable ensureOpen, + BiConsumer failEngine, + Function failOnTragicEvent + ) throws IOException { + super( + engineConfig, + shardId, + readLock, + localCheckpointTrackerSupplier, + translogUUID, + translogEventListener, + ensureOpen, + failEngine, + failOnTragicEvent + ); + } + + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + return 0; + } + + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog."); + } + + @Override + public void skipTranslogRecovery() { + // Do nothing. + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java new file mode 100644 index 0000000000000..761c46021d980 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.listener; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The listener that multiplexes other {@link TranslogEventListener} + * + * @opensearch.internal + */ +public final class CompositeTranslogEventListener implements TranslogEventListener { + + private final List listeners; + private final Logger logger = LogManager.getLogger(CompositeTranslogEventListener.class); + + public CompositeTranslogEventListener(Collection listeners) { + for (TranslogEventListener listener : listeners) { + if (listener == null) { + throw new IllegalArgumentException("listeners must be non-null"); + } + } + this.listeners = Collections.unmodifiableList(new ArrayList<>(listeners)); + } + + @Override + public void onTranslogSync() { + for (TranslogEventListener listener : listeners) { + try { + listener.onTranslogSync(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogSync listener"), ex); + } + } + } + + @Override + public void onTranslogRecovery() { + for (TranslogEventListener listener : listeners) { + try { + listener.onTranslogRecovery(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogRecovery listener"), ex); + } + } + } + + @Override + public void onBeginTranslogRecovery() { + for (TranslogEventListener listener : listeners) { + try { + listener.onBeginTranslogRecovery(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onBeginTranslogRecovery listener"), ex); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java new file mode 100644 index 0000000000000..be89895c23a51 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.listener; + +/** + * The listener that gets fired on events related to {@link org.opensearch.index.translog.TranslogManager} + * + * @opensearch.internal + */ +public interface TranslogEventListener { + + TranslogEventListener NOOP_TRANSLOG_EVENT_LISTENER = new TranslogEventListener() { + }; + + /** + * Invoked after translog sync operations + */ + default void onTranslogSync() {} + + /** + * Invoked after recovering operations from translog + */ + default void onTranslogRecovery() {} + + /** + * Invoked before recovering operations from translog + */ + default void onBeginTranslogRecovery() {} +} diff --git a/server/src/main/java/org/opensearch/index/translog/listener/package-info.java b/server/src/main/java/org/opensearch/index/translog/listener/package-info.java new file mode 100644 index 0000000000000..bfb2415881c10 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/listener/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * Provides mechanism to listen into translog operations + */ +package org.opensearch.index.translog.listener; diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java new file mode 100644 index 0000000000000..8892d3795d2d8 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -0,0 +1,276 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; +import org.opensearch.index.translog.listener.TranslogEventListener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; + +public class InternalTranslogManagerTests extends TranslogManagerTestCase { + + public void testRecoveryFromTranslog() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final AtomicBoolean beginTranslogRecoveryInvoked = new AtomicBoolean(false); + final AtomicBoolean onTranslogRecoveryInvoked = new AtomicBoolean(false); + TranslogManager translogManager = null; + + LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> tracker, + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {}, + null, + null + ); + final int docs = randomIntBetween(1, 100); + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); + tracker.markSeqNoAsProcessed(i); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.rollTranslogGeneration(); + } + long maxSeqNo = tracker.getMaxSeqNo(); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); + + translogManager.syncTranslog(); + translogManager.getTranslog(false).close(); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), + translogUUID, + new TranslogEventListener() { + @Override + public void onTranslogRecovery() { + onTranslogRecoveryInvoked.set(true); + } + + @Override + public void onBeginTranslogRecovery() { + beginTranslogRecoveryInvoked.set(true); + } + }, + () -> {}, + null, + null + ); + AtomicInteger opsRecovered = new AtomicInteger(); + int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + opsRecovered.incrementAndGet(); + } + return opsRecovered.get(); + }, NO_OPS_PERFORMED, Long.MAX_VALUE); + + assertEquals(maxSeqNo + 1, opsRecovered.get()); + assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); + + assertTrue(beginTranslogRecoveryInvoked.get()); + assertTrue(onTranslogRecoveryInvoked.get()); + + } finally { + translogManager.getTranslog(false).close(); + } + } + + public void testTranslogRollsGeneration() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + TranslogManager translogManager = null; + LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> tracker, + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {}, + null, + null + ); + final int docs = randomIntBetween(1, 100); + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); + tracker.markSeqNoAsProcessed(i); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.rollTranslogGeneration(); + } + long maxSeqNo = tracker.getMaxSeqNo(); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); + + translogManager.syncTranslog(); + translogManager.getTranslog(false).close(); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {}, + null, + null + ); + AtomicInteger opsRecovered = new AtomicInteger(); + int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + opsRecovered.incrementAndGet(); + } + return opsRecovered.get(); + }, NO_OPS_PERFORMED, Long.MAX_VALUE); + + assertEquals(maxSeqNo + 1, opsRecovered.get()); + assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); + } finally { + translogManager.getTranslog(false).close(); + } + } + + public void testTrimOperationsFromTranslog() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + TranslogManager translogManager = null; + LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> tracker, + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {}, + null, + null + ); + final int docs = randomIntBetween(1, 100); + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); + tracker.markSeqNoAsProcessed(i); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + } + long maxSeqNo = tracker.getMaxSeqNo(); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); + + primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE)); + translogManager.rollTranslogGeneration(); + translogManager.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog + + translogManager.getTranslog(false).close(); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {}, + null, + null + ); + AtomicInteger opsRecovered = new AtomicInteger(); + int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + opsRecovered.incrementAndGet(); + } + return opsRecovered.get(); + }, NO_OPS_PERFORMED, Long.MAX_VALUE); + + assertEquals(0, opsRecovered.get()); + assertEquals(0, opsRecoveredFromTranslog); + } finally { + translogManager.getTranslog(false).close(); + } + } + + public void testTranslogSync() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + AtomicBoolean syncListenerInvoked = new AtomicBoolean(); + TranslogManager translogManager = null; + final AtomicInteger maxSeqNo = new AtomicInteger(randomIntBetween(0, 128)); + final AtomicInteger localCheckpoint = new AtomicInteger(randomIntBetween(0, maxSeqNo.get())); + try (Store store = createStore()) { + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + AtomicReference translogManagerAtomicReference = new AtomicReference<>(); + translogManager = new InternalTranslogManager( + config, + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(maxSeqNo.get(), localCheckpoint.get()), + translogUUID, + new TranslogEventListener() { + @Override + public void onTranslogSync() { + try { + translogManagerAtomicReference.get().getTranslog(false).trimUnreferencedReaders(); + syncListenerInvoked.set(true); + } catch (IOException ex) { + fail("Failed due to " + ex); + } + } + }, + () -> {}, + null, + null + ); + translogManagerAtomicReference.set(translogManager); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), 1, false); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + + translogManager.syncTranslog(); + + assertThat(translogManager.getTranslog(true).currentFileGeneration(), equalTo(2L)); + assertThat(translogManager.getTranslog(true).getMinFileGeneration(), equalTo(2L)); + assertTrue(syncListenerInvoked.get()); + } finally { + translogManager.getTranslog(false).close(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java new file mode 100644 index 0000000000000..d3c5918245007 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java @@ -0,0 +1,488 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.Sort; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.common.Nullable; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.Index; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.engine.SafeCommitInfo; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.Mapping; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SourceFieldMapper; +import org.opensearch.index.mapper.SeqNoFieldMapper; +import org.opensearch.index.mapper.VersionFieldMapper; +import org.opensearch.index.mapper.Uid; +import org.opensearch.index.mapper.IdFieldMapper; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; +import org.opensearch.indices.breaker.CircuitBreakerService; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.test.DummyShardLock; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +import static java.util.Collections.emptyList; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; + +public abstract class TranslogManagerTestCase extends OpenSearchTestCase { + + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); + protected final AllocationId allocationId = AllocationId.newInitializing(); + protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); + private AtomicLong globalCheckpoint; + protected ThreadPool threadPool; + protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L); + + protected IndexSettings defaultSettings; + protected String codecName; + protected Path primaryTranslogDir; + protected String translogUUID; + + protected static final BytesArray SOURCE = bytesArray("{}"); + protected static final BytesReference B_1 = new BytesArray(new byte[] { 1 }); + + protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOException { + return createTranslog(primaryTranslogDir, primaryTermSupplier); + } + + protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + String translogUUID = Translog.createEmptyTranslog( + translogPath, + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTermSupplier.getAsLong() + ); + return new Translog( + translogConfig, + translogUUID, + createTranslogDeletionPolicy(INDEX_SETTINGS), + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTermSupplier, + seqNo -> {} + ); + } + + protected Store createStore() throws IOException { + return createStore(newDirectory()); + } + + protected Store createStore(final Directory directory) throws IOException { + return createStore(INDEX_SETTINGS, directory); + } + + protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { + return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); + } + + private String create(Path path) throws IOException { + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + return Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + primaryTerm.set(randomIntBetween(1, 100)); + defaultSettings = IndexSettingsModule.newIndexSettings("test", indexSettings()); + threadPool = new TestThreadPool(getClass().getName()); + primaryTranslogDir = createTempDir("translog-primary"); + translogUUID = create(primaryTranslogDir); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + IOUtils.close(() -> terminate(threadPool)); + } + + protected Settings indexSettings() { + // TODO randomize more settings + return Settings.builder() + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put( + IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)) + ) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) + .build(); + } + + public static final class PrimaryTermSupplier implements LongSupplier { + private final AtomicLong term; + + PrimaryTermSupplier(long initialTerm) { + this.term = new AtomicLong(initialTerm); + } + + public long get() { + return term.get(); + } + + public void set(long newTerm) { + this.term.set(newTerm); + } + + @Override + public long getAsLong() { + return get(); + } + } + + protected static ParsedDocument testParsedDocument( + String id, + String routing, + ParseContext.Document document, + BytesReference source, + Mapping mappingUpdate + ) { + return testParsedDocument(id, routing, document, source, mappingUpdate, false); + } + + protected static ParsedDocument testParsedDocument( + String id, + String routing, + ParseContext.Document document, + BytesReference source, + Mapping mappingUpdate, + boolean recoverySource + ) { + Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); + Field versionField = new NumericDocValuesField("_version", 0); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + BytesRef ref = source.toBytesRef(); + if (recoverySource) { + document.add(new StoredField(SourceFieldMapper.RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); + document.add(new NumericDocValuesField(SourceFieldMapper.RECOVERY_SOURCE_NAME, 1)); + } else { + document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); + } + return new ParsedDocument(versionField, seqID, id, routing, List.of(document), source, XContentType.JSON, mappingUpdate); + } + + protected static ParseContext.Document testDocumentWithTextField() { + return testDocumentWithTextField("test"); + } + + protected static ParseContext.Document testDocumentWithTextField(String value) { + ParseContext.Document document = testDocument(); + document.add(new TextField("value", value, Field.Store.YES)); + return document; + } + + protected static ParseContext.Document testDocument() { + return new ParseContext.Document(); + } + + protected Engine.Index indexForDoc(ParsedDocument doc) { + return new Engine.Index(newUid(doc), primaryTerm.get(), doc); + } + + public static Term newUid(String id) { + return new Term("_id", Uid.encodeId(id)); + } + + public static Term newUid(ParsedDocument doc) { + return newUid(doc.id()); + } + + protected static BytesArray bytesArray(String string) { + return new BytesArray(string.getBytes(Charset.defaultCharset())); + } + + public EngineConfig config( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener + ) { + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, () -> SequenceNumbers.NO_OPS_PERFORMED); + } + + public EngineConfig config( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener, + Sort indexSort, + LongSupplier globalCheckpointSupplier + ) { + return config( + indexSettings, + store, + translogPath, + mergePolicy, + refreshListener, + indexSort, + globalCheckpointSupplier, + globalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY + ); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener refreshListener, + final Sort indexSort, + final LongSupplier globalCheckpointSupplier, + final Supplier retentionLeasesSupplier + ) { + return config( + indexSettings, + store, + translogPath, + mergePolicy, + refreshListener, + null, + indexSort, + globalCheckpointSupplier, + retentionLeasesSupplier, + new NoneCircuitBreakerService() + ); + } + + public EngineConfig config( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + ReferenceManager.RefreshListener externalRefreshListener, + ReferenceManager.RefreshListener internalRefreshListener, + Sort indexSort, + @Nullable LongSupplier maybeGlobalCheckpointSupplier, + CircuitBreakerService breakerService + ) { + return config( + indexSettings, + store, + translogPath, + mergePolicy, + externalRefreshListener, + internalRefreshListener, + indexSort, + maybeGlobalCheckpointSupplier, + maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, + breakerService + ); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener externalRefreshListener, + final ReferenceManager.RefreshListener internalRefreshListener, + final Sort indexSort, + final @Nullable LongSupplier maybeGlobalCheckpointSupplier, + final @Nullable Supplier maybeRetentionLeasesSupplier, + final CircuitBreakerService breakerService + ) { + final IndexWriterConfig iwc = newIndexWriterConfig(); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final Engine.EventListener eventListener = new Engine.EventListener() { + }; // we don't need to notify anybody in this test + final List extRefreshListenerList = externalRefreshListener == null + ? emptyList() + : Collections.singletonList(externalRefreshListener); + final List intRefreshListenerList = internalRefreshListener == null + ? emptyList() + : Collections.singletonList(internalRefreshListener); + final LongSupplier globalCheckpointSupplier; + final Supplier retentionLeasesSupplier; + if (maybeGlobalCheckpointSupplier == null) { + assert maybeRetentionLeasesSupplier == null; + final ReplicationTracker replicationTracker = new ReplicationTracker( + shardId, + allocationId.getId(), + indexSettings, + randomNonNegativeLong(), + SequenceNumbers.NO_OPS_PERFORMED, + update -> {}, + () -> 0L, + (leases, listener) -> listener.onResponse(new ReplicationResponse()), + () -> SafeCommitInfo.EMPTY + ); + globalCheckpointSupplier = replicationTracker; + retentionLeasesSupplier = replicationTracker::getRetentionLeases; + } else { + assert maybeRetentionLeasesSupplier != null; + globalCheckpointSupplier = maybeGlobalCheckpointSupplier; + retentionLeasesSupplier = maybeRetentionLeasesSupplier; + } + return new EngineConfig( + shardId, + threadPool, + indexSettings, + null, + store, + mergePolicy, + iwc.getAnalyzer(), + iwc.getSimilarity(), + new CodecService(null, logger), + eventListener, + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + translogConfig, + TimeValue.timeValueMinutes(5), + extRefreshListenerList, + intRefreshListenerList, + indexSort, + breakerService, + globalCheckpointSupplier, + retentionLeasesSupplier, + primaryTerm, + tombstoneDocSupplier() + ); + } + + protected EngineConfig config( + EngineConfig config, + Store store, + Path translogPath, + EngineConfig.TombstoneDocSupplier tombstoneDocSupplier + ) { + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(config.getIndexSettings().getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build() + ); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + return new EngineConfig( + config.getShardId(), + config.getThreadPool(), + indexSettings, + config.getWarmer(), + store, + config.getMergePolicy(), + config.getAnalyzer(), + config.getSimilarity(), + new CodecService(null, logger), + config.getEventListener(), + config.getQueryCache(), + config.getQueryCachingPolicy(), + translogConfig, + config.getFlushMergesAfter(), + config.getExternalRefreshListener(), + config.getInternalRefreshListener(), + config.getIndexSort(), + config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier(), + config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), + tombstoneDocSupplier + ); + } + + /** + * Creates a tombstone document that only includes uid, seq#, term and version fields. + */ + public static EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() { + return new EngineConfig.TombstoneDocSupplier() { + @Override + public ParsedDocument newDeleteTombstoneDoc(String id) { + final ParseContext.Document doc = new ParseContext.Document(); + Field uidField = new Field(IdFieldMapper.NAME, Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); + doc.add(uidField); + Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0); + doc.add(versionField); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + doc.add(seqID.seqNo); + doc.add(seqID.seqNoDocValue); + doc.add(seqID.primaryTerm); + seqID.tombstoneField.setLongValue(1); + doc.add(seqID.tombstoneField); + return new ParsedDocument( + versionField, + seqID, + id, + null, + Collections.singletonList(doc), + new BytesArray("{}"), + XContentType.JSON, + null + ); + } + + @Override + public ParsedDocument newNoopTombstoneDoc(String reason) { + final ParseContext.Document doc = new ParseContext.Document(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + doc.add(seqID.seqNo); + doc.add(seqID.seqNoDocValue); + doc.add(seqID.primaryTerm); + seqID.tombstoneField.setLongValue(1); + doc.add(seqID.tombstoneField); + Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0); + doc.add(versionField); + BytesRef byteRef = new BytesRef(reason); + doc.add(new StoredField(SourceFieldMapper.NAME, byteRef.bytes, byteRef.offset, byteRef.length)); + return new ParsedDocument(versionField, seqID, null, null, Collections.singletonList(doc), null, XContentType.JSON, null); + } + }; + } +} From 3d66a2d4429791fb9504f3d9e7cdae3f8d430af2 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 21 Jun 2022 15:24:11 +0530 Subject: [PATCH 2/5] Spotless checks Signed-off-by: Bukhtawar Khan --- .../index/translog/TranslogManager.java | 2 +- .../translog/TranslogRecoveryRunner.java | 2 +- .../listener/TranslogListenerTests.java | 66 +++++++++++++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 08207a8237c75..988a88c5d2ae5 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -13,7 +13,7 @@ /** * The interface that orchestrates Translog operations and manages the {@link Translog} and interfaces with the Engine - * + * * @opensearch.internal */ public interface TranslogManager { diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java index b1d121db397ce..91c9a95b07d58 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java @@ -20,7 +20,7 @@ public interface TranslogRecoveryRunner { /** * Recovers a translog snapshot - * @param snapshot + * @param snapshot the snapshot of translog operations * @return recoveredOps * @throws IOException exception while recovering operations */ diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java new file mode 100644 index 0000000000000..4d37e5ad4989e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.listener; + +import org.opensearch.test.OpenSearchTestCase; + +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class TranslogListenerTests extends OpenSearchTestCase { + + public void testCompositeTranslogEventListener() { + AtomicInteger onTranslogSyncInvoked = new AtomicInteger(); + AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); + + TranslogEventListener listener = new TranslogEventListener() { + @Override + public void onTranslogSync() { + onTranslogSyncInvoked.incrementAndGet(); + } + + @Override + public void onTranslogRecovery() { + onTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onBeginTranslogRecovery() { + onBeginTranslogRecoveryInvoked.incrementAndGet(); + } + }; + TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( + TranslogEventListener.class.getClassLoader(), + new Class[] { TranslogEventListener.class }, + (a, b, c) -> { throw new RuntimeException(); } + ); + + final List translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener)); + if (randomBoolean()) { + translogEventListeners.add(throwingListener); + if (randomBoolean()) { + translogEventListeners.add(throwingListener); + } + } + Collections.shuffle(translogEventListeners, random()); + TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + compositeListener.onTranslogRecovery(); + compositeListener.onTranslogSync(); + compositeListener.onBeginTranslogRecovery(); + + assertEquals(2, onBeginTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogSyncInvoked.get()); + } +} From 5888ef49893eefa977f6f74233c9ca3d9de11958 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 22 Jun 2022 12:29:46 +0530 Subject: [PATCH 3/5] Addressing review comments Signed-off-by: Bukhtawar Khan --- .../translog/InternalTranslogManager.java | 77 ++--- .../translog/WriteOnlyTranslogManager.java | 23 +- .../CompositeTranslogEventListener.java | 31 +- .../listener/TranslogEventListener.java | 19 +- .../InternalTranslogManagerTests.java | 83 +++--- .../translog/TranslogManagerTestCase.java | 271 ------------------ .../listener/TranslogListenerTests.java | 25 +- 7 files changed, 146 insertions(+), 383 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 37e04b51564a6..7c4ddf389b803 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -13,15 +13,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -36,49 +33,30 @@ public class InternalTranslogManager implements TranslogManager { private final ReleasableLock readLock; - private final Runnable ensureOpen; + private final Runnable ensureEngineOpen; private final ShardId shardId; private final Translog translog; - private final BiConsumer failEngine; - private final Function failOnTragicEvent; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final TranslogEventListener translogEventListener; private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class); public InternalTranslogManager( - EngineConfig engineConfig, + TranslogConfig translogConfig, + LongSupplier primaryTermSupplier, + LongSupplier globalCheckpointSupplier, + TranslogDeletionPolicy translogDeletionPolicy, ShardId shardId, ReleasableLock readLock, Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - Runnable ensureOpen, - BiConsumer failEngine, - Function failOnTragicEvent + Runnable ensureEngineOpen ) throws IOException { this.shardId = shardId; this.readLock = readLock; - this.ensureOpen = ensureOpen; - this.failEngine = failEngine; - this.failOnTragicEvent = failOnTragicEvent; + this.ensureEngineOpen = ensureEngineOpen; this.translogEventListener = translogEventListener; - final TranslogDeletionPolicy translogDeletionPolicy; - TranslogDeletionPolicy customTranslogDeletionPolicy = null; - Translog translog; - if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) { - customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory() - .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier()); - } - if (customTranslogDeletionPolicy != null) { - translogDeletionPolicy = customTranslogDeletionPolicy; - } else { - translogDeletionPolicy = new DefaultTranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() - ); - } - translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> { + Translog translog = openTranslog(translogConfig, primaryTermSupplier, translogDeletionPolicy, globalCheckpointSupplier, seqNo -> { final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get(); assert tracker != null || getTranslog(true).isOpen() == false; if (tracker != null) { @@ -98,15 +76,15 @@ public InternalTranslogManager( @Override public void rollTranslogGeneration() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen.run(); + ensureEngineOpen.run(); translog.rollGeneration(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { - failOnTragicEvent.apply(e); + translogEventListener.onTragicFailure(e); throw e; } catch (Exception e) { try { - failEngine.accept("translog trimming failed", e); + translogEventListener.onFailure("translog trimming failed", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -127,7 +105,7 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo int opsRecovered = 0; translogEventListener.onBeginTranslogRecovery(); try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen.run(); + ensureEngineOpen.run(); if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -136,7 +114,7 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo } catch (Exception e) { try { pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush - failEngine.accept("failed to recover from translog", e); + translogEventListener.onFailure("failed to recover from translog", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -168,7 +146,7 @@ private int recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryR translog.currentFileGeneration() ) ); - translogEventListener.onTranslogRecovery(); + translogEventListener.onAfterTranslogRecovery(); return opsRecovered; } @@ -187,7 +165,7 @@ public boolean isTranslogSyncNeeded() { public boolean ensureTranslogSynced(Stream locations) throws IOException { final boolean synced = translog.ensureSynced(locations); if (synced) { - translogEventListener.onTranslogSync(); + translogEventListener.onAfterTranslogSync(); } return synced; } @@ -199,7 +177,7 @@ public boolean ensureTranslogSynced(Stream locations) throws @Override public void syncTranslog() throws IOException { translog.sync(); - translogEventListener.onTranslogSync(); + translogEventListener.onAfterTranslogSync(); } @Override @@ -222,14 +200,14 @@ public Translog.Location getTranslogLastWriteLocation() { @Override public void trimUnreferencedTranslogFiles() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen.run(); + ensureEngineOpen.run(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { - failOnTragicEvent.apply(e); + translogEventListener.onTragicFailure(e); throw e; } catch (Exception e) { try { - failEngine.accept("translog trimming failed", e); + translogEventListener.onFailure("translog trimming failed", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -255,14 +233,14 @@ public boolean shouldRollTranslogGeneration() { @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen.run(); + ensureEngineOpen.run(); translog.trimOperations(belowTerm, aboveSeqNo); } catch (AlreadyClosedException e) { - failOnTragicEvent.apply(e); + translogEventListener.onTragicFailure(e); throw e; } catch (Exception e) { try { - failEngine.accept("translog operations trimming failed", e); + translogEventListener.onFailure("translog operations trimming failed", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -279,7 +257,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T @Override public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen.run(); + ensureEngineOpen.run(); try (Translog.Snapshot snapshot = getTranslog(true).newSnapshot(processedCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(snapshot); } @@ -310,21 +288,20 @@ public void skipTranslogRecovery() { } private Translog openTranslog( - EngineConfig engineConfig, + TranslogConfig translogConfig, + LongSupplier primaryTermSupplier, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer, String translogUUID ) throws IOException { - final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! return new Translog( translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, - engineConfig.getPrimaryTermSupplier(), + primaryTermSupplier, persistedSequenceNumberConsumer ); } @@ -337,7 +314,7 @@ private Translog openTranslog( @Override public Translog getTranslog(boolean ensureOpen) { if (ensureOpen) { - this.ensureOpen.run(); + this.ensureEngineOpen.run(); } return translog; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index f4f5e752f69d1..366f70ac50ca2 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -8,16 +8,13 @@ package org.opensearch.index.translog; -import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; -import java.util.function.BiConsumer; -import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.function.Supplier; /*** @@ -28,26 +25,28 @@ public class WriteOnlyTranslogManager extends InternalTranslogManager { public WriteOnlyTranslogManager( - EngineConfig engineConfig, + TranslogConfig translogConfig, + LongSupplier primaryTermSupplier, + LongSupplier globalCheckpointSupplier, + TranslogDeletionPolicy translogDeletionPolicy, ShardId shardId, ReleasableLock readLock, Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - Runnable ensureOpen, - BiConsumer failEngine, - Function failOnTragicEvent + Runnable ensureEngineOpen ) throws IOException { super( - engineConfig, + translogConfig, + primaryTermSupplier, + globalCheckpointSupplier, + translogDeletionPolicy, shardId, readLock, localCheckpointTrackerSupplier, translogUUID, translogEventListener, - ensureOpen, - failEngine, - failOnTragicEvent + ensureEngineOpen ); } diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java index 761c46021d980..4ca7069140811 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import java.util.ArrayList; import java.util.Collection; @@ -37,10 +38,10 @@ public CompositeTranslogEventListener(Collection listener } @Override - public void onTranslogSync() { + public void onAfterTranslogSync() { for (TranslogEventListener listener : listeners) { try { - listener.onTranslogSync(); + listener.onAfterTranslogSync(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogSync listener"), ex); } @@ -48,10 +49,10 @@ public void onTranslogSync() { } @Override - public void onTranslogRecovery() { + public void onAfterTranslogRecovery() { for (TranslogEventListener listener : listeners) { try { - listener.onTranslogRecovery(); + listener.onAfterTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogRecovery listener"), ex); } @@ -68,4 +69,26 @@ public void onBeginTranslogRecovery() { } } } + + @Override + public void onFailure(String reason, Exception e) { + for (TranslogEventListener listener : listeners) { + try { + listener.onFailure(reason, e); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onFailure listener"), ex); + } + } + } + + @Override + public void onTragicFailure(AlreadyClosedException e) { + for (TranslogEventListener listener : listeners) { + try { + listener.onTragicFailure(e); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex); + } + } + } } diff --git a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java index be89895c23a51..1862b4b9a62b7 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java @@ -8,6 +8,8 @@ package org.opensearch.index.translog.listener; +import org.apache.lucene.store.AlreadyClosedException; + /** * The listener that gets fired on events related to {@link org.opensearch.index.translog.TranslogManager} * @@ -21,15 +23,28 @@ public interface TranslogEventListener { /** * Invoked after translog sync operations */ - default void onTranslogSync() {} + default void onAfterTranslogSync() {} /** * Invoked after recovering operations from translog */ - default void onTranslogRecovery() {} + default void onAfterTranslogRecovery() {} /** * Invoked before recovering operations from translog */ default void onBeginTranslogRecovery() {} + + /** + * Invoked when translog operations run into accessing an already closed resource + * @param ex the exception thrown when accessing a closed resource + */ + default void onTragicFailure(AlreadyClosedException ex) {} + + /** + * Invoked when translog operations run into any other failure + * @param reason the failure reason + * @param ex the failure exception + */ + default void onFailure(String reason, Exception ex) {} } diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 8892d3795d2d8..4db792b4a3fc2 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -8,13 +8,12 @@ package org.opensearch.index.translog; +import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.index.engine.Engine; -import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.store.Store; import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; @@ -26,6 +25,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; public class InternalTranslogManagerTests extends TranslogManagerTestCase { @@ -36,18 +36,18 @@ public void testRecoveryFromTranslog() throws IOException { TranslogManager translogManager = null; LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + try { translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> tracker, translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {}, - null, - null + () -> {} ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -66,14 +66,17 @@ public void testRecoveryFromTranslog() throws IOException { translogManager.syncTranslog(); translogManager.getTranslog(false).close(); translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), translogUUID, new TranslogEventListener() { @Override - public void onTranslogRecovery() { + public void onAfterTranslogRecovery() { onTranslogRecoveryInvoked.set(true); } @@ -82,9 +85,7 @@ public void onBeginTranslogRecovery() { beginTranslogRecoveryInvoked.set(true); } }, - () -> {}, - null, - null + () -> {} ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -110,18 +111,18 @@ public void testTranslogRollsGeneration() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); TranslogManager translogManager = null; LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + try { translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> tracker, translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {}, - null, - null + () -> {} ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -140,15 +141,16 @@ public void testTranslogRollsGeneration() throws IOException { translogManager.syncTranslog(); translogManager.getTranslog(false).close(); translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {}, - null, - null + () -> {} ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -170,18 +172,18 @@ public void testTrimOperationsFromTranslog() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); TranslogManager translogManager = null; LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); + try { translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> tracker, translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {}, - null, - null + () -> {} ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -202,15 +204,16 @@ public void testTrimOperationsFromTranslog() throws IOException { translogManager.getTranslog(false).close(); translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, - () -> {}, - null, - null + () -> {} ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -234,19 +237,21 @@ public void testTranslogSync() throws IOException { TranslogManager translogManager = null; final AtomicInteger maxSeqNo = new AtomicInteger(randomIntBetween(0, 128)); final AtomicInteger localCheckpoint = new AtomicInteger(randomIntBetween(0, maxSeqNo.get())); - try (Store store = createStore()) { + try { ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); - EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, null, globalCheckpoint::get); AtomicReference translogManagerAtomicReference = new AtomicReference<>(); translogManager = new InternalTranslogManager( - config, + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), shardId, new ReleasableLock(new ReentrantReadWriteLock().readLock()), () -> new LocalCheckpointTracker(maxSeqNo.get(), localCheckpoint.get()), translogUUID, new TranslogEventListener() { @Override - public void onTranslogSync() { + public void onAfterTranslogSync() { try { translogManagerAtomicReference.get().getTranslog(false).trimUnreferencedReaders(); syncListenerInvoked.set(true); @@ -255,9 +260,7 @@ public void onTranslogSync() { } } }, - () -> {}, - null, - null + () -> {} ); translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java index d3c5918245007..25867cdb666ad 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java @@ -12,50 +12,32 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.search.Sort; -import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.junit.After; import org.junit.Before; import org.opensearch.Version; -import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.AllocationId; -import org.opensearch.common.Nullable; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.Index; import org.opensearch.index.IndexSettings; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineConfig; -import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.mapper.ParseContext; import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.SeqNoFieldMapper; -import org.opensearch.index.mapper.VersionFieldMapper; import org.opensearch.index.mapper.Uid; import org.opensearch.index.mapper.IdFieldMapper; -import org.opensearch.index.seqno.ReplicationTracker; -import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.Store; -import org.opensearch.indices.breaker.CircuitBreakerService; -import org.opensearch.indices.breaker.NoneCircuitBreakerService; -import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -64,13 +46,10 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Path; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; -import java.util.function.Supplier; -import static java.util.Collections.emptyList; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; public abstract class TranslogManagerTestCase extends OpenSearchTestCase { @@ -112,18 +91,6 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup ); } - protected Store createStore() throws IOException { - return createStore(newDirectory()); - } - - protected Store createStore(final Directory directory) throws IOException { - return createStore(INDEX_SETTINGS, directory); - } - - protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { - return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); - } - private String create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); return Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); @@ -247,242 +214,4 @@ public static Term newUid(ParsedDocument doc) { protected static BytesArray bytesArray(String string) { return new BytesArray(string.getBytes(Charset.defaultCharset())); } - - public EngineConfig config( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - ReferenceManager.RefreshListener refreshListener - ) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, () -> SequenceNumbers.NO_OPS_PERFORMED); - } - - public EngineConfig config( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - ReferenceManager.RefreshListener refreshListener, - Sort indexSort, - LongSupplier globalCheckpointSupplier - ) { - return config( - indexSettings, - store, - translogPath, - mergePolicy, - refreshListener, - indexSort, - globalCheckpointSupplier, - globalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY - ); - } - - public EngineConfig config( - final IndexSettings indexSettings, - final Store store, - final Path translogPath, - final MergePolicy mergePolicy, - final ReferenceManager.RefreshListener refreshListener, - final Sort indexSort, - final LongSupplier globalCheckpointSupplier, - final Supplier retentionLeasesSupplier - ) { - return config( - indexSettings, - store, - translogPath, - mergePolicy, - refreshListener, - null, - indexSort, - globalCheckpointSupplier, - retentionLeasesSupplier, - new NoneCircuitBreakerService() - ); - } - - public EngineConfig config( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - ReferenceManager.RefreshListener externalRefreshListener, - ReferenceManager.RefreshListener internalRefreshListener, - Sort indexSort, - @Nullable LongSupplier maybeGlobalCheckpointSupplier, - CircuitBreakerService breakerService - ) { - return config( - indexSettings, - store, - translogPath, - mergePolicy, - externalRefreshListener, - internalRefreshListener, - indexSort, - maybeGlobalCheckpointSupplier, - maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, - breakerService - ); - } - - public EngineConfig config( - final IndexSettings indexSettings, - final Store store, - final Path translogPath, - final MergePolicy mergePolicy, - final ReferenceManager.RefreshListener externalRefreshListener, - final ReferenceManager.RefreshListener internalRefreshListener, - final Sort indexSort, - final @Nullable LongSupplier maybeGlobalCheckpointSupplier, - final @Nullable Supplier maybeRetentionLeasesSupplier, - final CircuitBreakerService breakerService - ) { - final IndexWriterConfig iwc = newIndexWriterConfig(); - final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - final Engine.EventListener eventListener = new Engine.EventListener() { - }; // we don't need to notify anybody in this test - final List extRefreshListenerList = externalRefreshListener == null - ? emptyList() - : Collections.singletonList(externalRefreshListener); - final List intRefreshListenerList = internalRefreshListener == null - ? emptyList() - : Collections.singletonList(internalRefreshListener); - final LongSupplier globalCheckpointSupplier; - final Supplier retentionLeasesSupplier; - if (maybeGlobalCheckpointSupplier == null) { - assert maybeRetentionLeasesSupplier == null; - final ReplicationTracker replicationTracker = new ReplicationTracker( - shardId, - allocationId.getId(), - indexSettings, - randomNonNegativeLong(), - SequenceNumbers.NO_OPS_PERFORMED, - update -> {}, - () -> 0L, - (leases, listener) -> listener.onResponse(new ReplicationResponse()), - () -> SafeCommitInfo.EMPTY - ); - globalCheckpointSupplier = replicationTracker; - retentionLeasesSupplier = replicationTracker::getRetentionLeases; - } else { - assert maybeRetentionLeasesSupplier != null; - globalCheckpointSupplier = maybeGlobalCheckpointSupplier; - retentionLeasesSupplier = maybeRetentionLeasesSupplier; - } - return new EngineConfig( - shardId, - threadPool, - indexSettings, - null, - store, - mergePolicy, - iwc.getAnalyzer(), - iwc.getSimilarity(), - new CodecService(null, logger), - eventListener, - IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), - translogConfig, - TimeValue.timeValueMinutes(5), - extRefreshListenerList, - intRefreshListenerList, - indexSort, - breakerService, - globalCheckpointSupplier, - retentionLeasesSupplier, - primaryTerm, - tombstoneDocSupplier() - ); - } - - protected EngineConfig config( - EngineConfig config, - Store store, - Path translogPath, - EngineConfig.TombstoneDocSupplier tombstoneDocSupplier - ) { - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - "test", - Settings.builder() - .put(config.getIndexSettings().getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build() - ); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - return new EngineConfig( - config.getShardId(), - config.getThreadPool(), - indexSettings, - config.getWarmer(), - store, - config.getMergePolicy(), - config.getAnalyzer(), - config.getSimilarity(), - new CodecService(null, logger), - config.getEventListener(), - config.getQueryCache(), - config.getQueryCachingPolicy(), - translogConfig, - config.getFlushMergesAfter(), - config.getExternalRefreshListener(), - config.getInternalRefreshListener(), - config.getIndexSort(), - config.getCircuitBreakerService(), - config.getGlobalCheckpointSupplier(), - config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), - tombstoneDocSupplier - ); - } - - /** - * Creates a tombstone document that only includes uid, seq#, term and version fields. - */ - public static EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() { - return new EngineConfig.TombstoneDocSupplier() { - @Override - public ParsedDocument newDeleteTombstoneDoc(String id) { - final ParseContext.Document doc = new ParseContext.Document(); - Field uidField = new Field(IdFieldMapper.NAME, Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); - doc.add(uidField); - Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0); - doc.add(versionField); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - doc.add(seqID.seqNo); - doc.add(seqID.seqNoDocValue); - doc.add(seqID.primaryTerm); - seqID.tombstoneField.setLongValue(1); - doc.add(seqID.tombstoneField); - return new ParsedDocument( - versionField, - seqID, - id, - null, - Collections.singletonList(doc), - new BytesArray("{}"), - XContentType.JSON, - null - ); - } - - @Override - public ParsedDocument newNoopTombstoneDoc(String reason) { - final ParseContext.Document doc = new ParseContext.Document(); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - doc.add(seqID.seqNo); - doc.add(seqID.seqNoDocValue); - doc.add(seqID.primaryTerm); - seqID.tombstoneField.setLongValue(1); - doc.add(seqID.tombstoneField); - Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0); - doc.add(versionField); - BytesRef byteRef = new BytesRef(reason); - doc.add(new StoredField(SourceFieldMapper.NAME, byteRef.bytes, byteRef.offset, byteRef.length)); - return new ParsedDocument(versionField, seqID, null, null, Collections.singletonList(doc), null, XContentType.JSON, null); - } - }; - } } diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java index 4d37e5ad4989e..713dabcbd16b0 100644 --- a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.listener; +import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.test.OpenSearchTestCase; import java.lang.reflect.Proxy; @@ -23,15 +24,17 @@ public void testCompositeTranslogEventListener() { AtomicInteger onTranslogSyncInvoked = new AtomicInteger(); AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onFailureInvoked = new AtomicInteger(); + AtomicInteger onTragicFailureInvoked = new AtomicInteger(); TranslogEventListener listener = new TranslogEventListener() { @Override - public void onTranslogSync() { + public void onAfterTranslogSync() { onTranslogSyncInvoked.incrementAndGet(); } @Override - public void onTranslogRecovery() { + public void onAfterTranslogRecovery() { onTranslogRecoveryInvoked.incrementAndGet(); } @@ -39,6 +42,16 @@ public void onTranslogRecovery() { public void onBeginTranslogRecovery() { onBeginTranslogRecoveryInvoked.incrementAndGet(); } + + @Override + public void onFailure(String reason, Exception ex) { + onFailureInvoked.incrementAndGet(); + } + + @Override + public void onTragicFailure(AlreadyClosedException ex) { + onTragicFailureInvoked.incrementAndGet(); + } }; TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( TranslogEventListener.class.getClassLoader(), @@ -55,12 +68,16 @@ public void onBeginTranslogRecovery() { } Collections.shuffle(translogEventListeners, random()); TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); - compositeListener.onTranslogRecovery(); - compositeListener.onTranslogSync(); + compositeListener.onAfterTranslogRecovery(); + compositeListener.onAfterTranslogSync(); compositeListener.onBeginTranslogRecovery(); + compositeListener.onFailure("reason", new RuntimeException("reason")); + compositeListener.onTragicFailure(new AlreadyClosedException("reason")); assertEquals(2, onBeginTranslogRecoveryInvoked.get()); assertEquals(2, onTranslogRecoveryInvoked.get()); assertEquals(2, onTranslogSyncInvoked.get()); + assertEquals(2, onFailureInvoked.get()); + assertEquals(2, onTragicFailureInvoked.get()); } } From 831c3818b083e085f8d755d3d7fb1bc9ec0d4f58 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 22 Jun 2022 21:04:07 +0530 Subject: [PATCH 4/5] Introduce LifecycleAware Signed-off-by: Bukhtawar Khan --- .../org/opensearch/index/engine/Engine.java | 4 +- .../index/engine/LifecycleAware.java | 20 ++++++ .../translog/InternalTranslogManager.java | 19 ++--- .../translog/WriteOnlyTranslogManager.java | 5 +- .../CompositeTranslogEventListener.java | 5 ++ .../listener/TranslogListenerTests.java | 72 +++++++++++++++---- 6 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/LifecycleAware.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..5e9bc3926d7c2 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -117,7 +117,7 @@ * * @opensearch.internal */ -public abstract class Engine implements Closeable { +public abstract class Engine implements LifecycleAware, Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0 public static final String HISTORY_UUID_KEY = "history_uuid"; @@ -847,7 +847,7 @@ protected final void ensureOpen(Exception suppressed) { } } - protected final void ensureOpen() { + public final void ensureOpen() { ensureOpen(null); } diff --git a/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java b/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java new file mode 100644 index 0000000000000..06cfb8e7e73a5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +/** + * Interface that is aware of a component lifecycle. + */ +public interface LifecycleAware { + + /** + * Checks to ensure if the component is an open state + */ + void ensureOpen(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 7c4ddf389b803..22f72cc3d9acd 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; @@ -33,7 +34,7 @@ public class InternalTranslogManager implements TranslogManager { private final ReleasableLock readLock; - private final Runnable ensureEngineOpen; + private final LifecycleAware engineLifeCycleAware; private final ShardId shardId; private final Translog translog; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); @@ -50,11 +51,11 @@ public InternalTranslogManager( Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - Runnable ensureEngineOpen + LifecycleAware engineLifeCycleAware ) throws IOException { this.shardId = shardId; this.readLock = readLock; - this.ensureEngineOpen = ensureEngineOpen; + this.engineLifeCycleAware = engineLifeCycleAware; this.translogEventListener = translogEventListener; Translog translog = openTranslog(translogConfig, primaryTermSupplier, translogDeletionPolicy, globalCheckpointSupplier, seqNo -> { final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get(); @@ -76,7 +77,7 @@ public InternalTranslogManager( @Override public void rollTranslogGeneration() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); translog.rollGeneration(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { @@ -105,7 +106,7 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo int opsRecovered = 0; translogEventListener.onBeginTranslogRecovery(); try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -200,7 +201,7 @@ public Translog.Location getTranslogLastWriteLocation() { @Override public void trimUnreferencedTranslogFiles() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { translogEventListener.onTragicFailure(e); @@ -233,7 +234,7 @@ public boolean shouldRollTranslogGeneration() { @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); translog.trimOperations(belowTerm, aboveSeqNo); } catch (AlreadyClosedException e) { translogEventListener.onTragicFailure(e); @@ -257,7 +258,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T @Override public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); try (Translog.Snapshot snapshot = getTranslog(true).newSnapshot(processedCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(snapshot); } @@ -314,7 +315,7 @@ private Translog openTranslog( @Override public Translog getTranslog(boolean ensureOpen) { if (ensureOpen) { - this.ensureEngineOpen.run(); + this.engineLifeCycleAware.ensureOpen(); } return translog; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index 366f70ac50ca2..09f5f38a9f6a9 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; @@ -34,7 +35,7 @@ public WriteOnlyTranslogManager( Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - Runnable ensureEngineOpen + LifecycleAware engineLifecycleAware ) throws IOException { super( translogConfig, @@ -46,7 +47,7 @@ public WriteOnlyTranslogManager( localCheckpointTrackerSupplier, translogUUID, translogEventListener, - ensureEngineOpen + engineLifecycleAware ); } diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java index 4ca7069140811..4f9ca9d282caa 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -44,6 +44,7 @@ public void onAfterTranslogSync() { listener.onAfterTranslogSync(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogSync listener"), ex); + throw ex; } } } @@ -55,6 +56,7 @@ public void onAfterTranslogRecovery() { listener.onAfterTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogRecovery listener"), ex); + throw ex; } } } @@ -66,6 +68,7 @@ public void onBeginTranslogRecovery() { listener.onBeginTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onBeginTranslogRecovery listener"), ex); + throw ex; } } } @@ -77,6 +80,7 @@ public void onFailure(String reason, Exception e) { listener.onFailure(reason, e); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onFailure listener"), ex); + throw ex; } } } @@ -88,6 +92,7 @@ public void onTragicFailure(AlreadyClosedException e) { listener.onTragicFailure(e); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex); + throw ex; } } } diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java index 713dabcbd16b0..3d78667a1ad26 100644 --- a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -12,10 +12,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; public class TranslogListenerTests extends OpenSearchTestCase { @@ -53,19 +50,8 @@ public void onTragicFailure(AlreadyClosedException ex) { onTragicFailureInvoked.incrementAndGet(); } }; - TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( - TranslogEventListener.class.getClassLoader(), - new Class[] { TranslogEventListener.class }, - (a, b, c) -> { throw new RuntimeException(); } - ); final List translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener)); - if (randomBoolean()) { - translogEventListeners.add(throwingListener); - if (randomBoolean()) { - translogEventListeners.add(throwingListener); - } - } Collections.shuffle(translogEventListeners, random()); TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); compositeListener.onAfterTranslogRecovery(); @@ -80,4 +66,60 @@ public void onTragicFailure(AlreadyClosedException ex) { assertEquals(2, onFailureInvoked.get()); assertEquals(2, onTragicFailureInvoked.get()); } + + public void testCompositeTranslogEventListenerOnExceptions() { + AtomicInteger onTranslogSyncInvoked = new AtomicInteger(); + AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onFailureInvoked = new AtomicInteger(); + AtomicInteger onTragicFailureInvoked = new AtomicInteger(); + + TranslogEventListener listener = new TranslogEventListener() { + @Override + public void onAfterTranslogSync() { + onTranslogSyncInvoked.incrementAndGet(); + } + + @Override + public void onAfterTranslogRecovery() { + onTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onBeginTranslogRecovery() { + onBeginTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onFailure(String reason, Exception ex) { + onFailureInvoked.incrementAndGet(); + } + + @Override + public void onTragicFailure(AlreadyClosedException ex) { + onTragicFailureInvoked.incrementAndGet(); + } + }; + + TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( + TranslogEventListener.class.getClassLoader(), + new Class[] { TranslogEventListener.class }, + (a, b, c) -> { throw new RuntimeException(); } + ); + + final List translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener)); + TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery()); + expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync()); + expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery()); + expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason"))); + expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason"))); + + assertEquals(1, onBeginTranslogRecoveryInvoked.get()); + assertEquals(1, onTranslogRecoveryInvoked.get()); + assertEquals(1, onTranslogSyncInvoked.get()); + assertEquals(1, onFailureInvoked.get()); + assertEquals(1, onTragicFailureInvoked.get()); + + } } From 849397663eb7a10e3fdfb29921391d208e6db656 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Thu, 23 Jun 2022 01:39:24 +0530 Subject: [PATCH 5/5] Listener fix up Signed-off-by: Bukhtawar Khan --- .../CompositeTranslogEventListener.java | 21 ++++++++++++++----- .../listener/TranslogListenerTests.java | 11 +++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java index 4f9ca9d282caa..731b069ab0c74 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; import java.util.ArrayList; import java.util.Collection; @@ -39,61 +40,71 @@ public CompositeTranslogEventListener(Collection listener @Override public void onAfterTranslogSync() { + List exceptionList = new ArrayList<>(listeners.size()); for (TranslogEventListener listener : listeners) { try { listener.onAfterTranslogSync(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogSync listener"), ex); - throw ex; + exceptionList.add(ex); } } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); } @Override public void onAfterTranslogRecovery() { + List exceptionList = new ArrayList<>(listeners.size()); for (TranslogEventListener listener : listeners) { try { listener.onAfterTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogRecovery listener"), ex); - throw ex; + exceptionList.add(ex); } } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); } @Override public void onBeginTranslogRecovery() { + List exceptionList = new ArrayList<>(listeners.size()); for (TranslogEventListener listener : listeners) { try { listener.onBeginTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onBeginTranslogRecovery listener"), ex); - throw ex; + exceptionList.add(ex); } } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); } @Override public void onFailure(String reason, Exception e) { + List exceptionList = new ArrayList<>(listeners.size()); for (TranslogEventListener listener : listeners) { try { listener.onFailure(reason, e); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onFailure listener"), ex); - throw ex; + exceptionList.add(ex); } } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); } @Override public void onTragicFailure(AlreadyClosedException e) { + List exceptionList = new ArrayList<>(listeners.size()); for (TranslogEventListener listener : listeners) { try { listener.onTragicFailure(e); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex); - throw ex; + exceptionList.add(ex); } } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); } } diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java index 3d78667a1ad26..79c243772b252 100644 --- a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -108,6 +108,7 @@ public void onTragicFailure(AlreadyClosedException ex) { ); final List translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener)); + Collections.shuffle(translogEventListeners, random()); TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery()); expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync()); @@ -115,11 +116,11 @@ public void onTragicFailure(AlreadyClosedException ex) { expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason"))); expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason"))); - assertEquals(1, onBeginTranslogRecoveryInvoked.get()); - assertEquals(1, onTranslogRecoveryInvoked.get()); - assertEquals(1, onTranslogSyncInvoked.get()); - assertEquals(1, onFailureInvoked.get()); - assertEquals(1, onTragicFailureInvoked.get()); + assertEquals(2, onBeginTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogSyncInvoked.get()); + assertEquals(2, onFailureInvoked.get()); + assertEquals(2, onTragicFailureInvoked.get()); } }