Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to encapsulate Translog into TranslogManager #3751

Merged
merged 6 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 30 additions & 78 deletions server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.listener.TranslogEventListener;
Expand All @@ -39,8 +38,6 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
Expand All @@ -55,13 +52,13 @@ public class NRTReplicationEngine extends Engine {
private final NRTReplicationReaderManager readerManager;
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final TranslogManager translogManager;
private final WriteOnlyTranslogManager translogManager;

public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
store.incRef();
NRTReplicationReaderManager readerManager = null;
TranslogManager translogManagerRef = null;
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
Expand Down Expand Up @@ -92,7 +89,7 @@ public void onFailure(String reason, Exception ex) {
@Override
public void onAfterTranslogSync() {
try {
translogManager.getTranslog().trimUnreferencedReaders();
translogManager.trimUnreferencedReaders();
} catch (IOException ex) {
throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex);
}
Expand All @@ -102,11 +99,7 @@ public void onAfterTranslogSync() {
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Translog translog = null;
if (translogManagerRef != null) {
translog = translogManagerRef.getTranslog();
}
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog);
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}
Expand Down Expand Up @@ -158,7 +151,7 @@ public boolean isThrottled() {
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand All @@ -170,7 +163,7 @@ public IndexResult index(Index index) throws IOException {
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -182,8 +175,7 @@ public DeleteResult delete(Delete delete) throws IOException {
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
final Translog.Location location = translogManager.getTranslog()
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
final Translog.Location location = translogManager.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
Expand Down Expand Up @@ -249,7 +241,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {

@Override
public long getLastSyncedGlobalCheckpoint() {
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
return translogManager.getLastSyncedGlobalCheckpoint();
}

@Override
Expand Down Expand Up @@ -317,7 +309,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
Expand Down Expand Up @@ -354,7 +346,7 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
}
Expand All @@ -377,24 +369,4 @@ private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
}

private Translog openTranslog(
EngineConfig engineConfig,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer
) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final Map<String, String> userData = lastCommittedSegmentInfos.getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(),
persistedSequenceNumberConsumer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.engine.LifecycleAware;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.listener.TranslogEventListener;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
Expand All @@ -31,14 +33,15 @@
*
* @opensearch.internal
*/
public class InternalTranslogManager implements TranslogManager {
public class InternalTranslogManager implements TranslogManager, Closeable {

private final ReleasableLock readLock;
private final LifecycleAware engineLifeCycleAware;
private final ShardId shardId;
private final Translog translog;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final TranslogEventListener translogEventListener;
private final Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier;
private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class);

public InternalTranslogManager(
Expand All @@ -57,6 +60,7 @@ public InternalTranslogManager(
this.readLock = readLock;
this.engineLifeCycleAware = engineLifeCycleAware;
this.translogEventListener = translogEventListener;
this.localCheckpointTrackerSupplier = localCheckpointTrackerSupplier;
Translog translog = openTranslog(translogConfig, primaryTermSupplier, translogDeletionPolicy, globalCheckpointSupplier, seqNo -> {
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get();
assert tracker != null || getTranslog(true).isOpen() == false;
Expand Down Expand Up @@ -279,6 +283,28 @@ public void ensureCanFlush() {
}
}

/**
* Reads operations from the translog
* @param location
* @return the translog operation
* @throws IOException
*/
@Override
public Translog.Operation readOperation(Translog.Location location) throws IOException {
return translog.readOperation(location);
}

/**
* Adds an operation to the translog
* @param operation
* @return the location in the translog
* @throws IOException
*/
@Override
public Translog.Location add(Translog.Operation operation) throws IOException {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
return translog.add(operation);
}

/**
* Do not replay translog operations, but make the engine be ready.
*/
Expand All @@ -288,7 +314,20 @@ public void skipTranslogRecovery() {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private Translog openTranslog(
// visible for testing
// TODO refactor tests to remove public access to translog
public Translog getTranslog() {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
return translog;
}

private Translog getTranslog(boolean ensureOpen) {
if (ensureOpen) {
this.engineLifeCycleAware.ensureOpen();
}
return translog;
}

protected Translog openTranslog(
TranslogConfig translogConfig,
LongSupplier primaryTermSupplier,
TranslogDeletionPolicy translogDeletionPolicy,
Expand All @@ -308,18 +347,90 @@ private Translog openTranslog(
}

/**
* Returns the the translog instance
* @return the {@link Translog} instance
* Retrieves last synced global checkpoint
* @return last synced global checkpoint
*/
@Override
public Translog getTranslog() {
return translog;
public long getLastSyncedGlobalCheckpoint() {
return translog.getLastSyncedGlobalCheckpoint();
}

private Translog getTranslog(boolean ensureOpen) {
if (ensureOpen) {
this.engineLifeCycleAware.ensureOpen();
/**
* Retrieves the max seq no
* @return max seq no
*/
public long getMaxSeqNo() {
return translog.getMaxSeqNo();
}

/**
* Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum
* required generation
*/
public void trimUnreferencedReaders() throws IOException {
translog.trimUnreferencedReaders();
}

/**
* Retrieves the translog deletion policy
* @return TranslogDeletionPolicy
*/
public TranslogDeletionPolicy getDeletionPolicy() {
return translog.getDeletionPolicy();
}

/**
* Retrieves the underlying translog tragic exception
* @return the tragic exception
*/
public Exception getTragicExceptionIfClosed() {
return translog.isOpen() == false ? translog.getTragicException() : null;
}

/**
* Retrieves the translog unique identifier
* @return the uuid of the translog
*/
public String getTranslogUUID() {
return translog.getTranslogUUID();
}

/**
*
* @param localCheckpointOfLastCommit
* @param flushThreshold
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
}
return translog;
/*
* We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be
* below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the
* periodically flush condition if this condition is disabled after a flush. The condition will change if the new
* commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1].
*
* When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of
* the new commit, we know that the last generation must contain operations because its size is above the flush
* threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation.
* This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only
* happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled
* generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied.
*
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
*/
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
}

@Override
public void close() throws IOException {
IOUtils.closeWhileHandlingException(translog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void rollTranslogGeneration() throws TranslogException {}
@Override
public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo)
throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen.run();
try (Translog.Snapshot snapshot = emptyTranslogSnapshot) {
translogRecoveryRunner.run(snapshot);
Expand All @@ -64,7 +64,7 @@ public boolean isTranslogSyncNeeded() {
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
return false;
}

Expand Down Expand Up @@ -92,11 +92,6 @@ public boolean shouldRollTranslogGeneration() {
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {}

@Override
public Translog getTranslog() {
return null;
}

@Override
public void ensureCanFlush() {}

Expand All @@ -107,4 +102,14 @@ public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRec

@Override
public void skipTranslogRecovery() {}

@Override
public Translog.Operation readOperation(Translog.Location location) throws IOException {
return null;
}

@Override
public Translog.Location add(Translog.Operation operation) throws IOException {
return new Translog.Location(0, 0, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,20 @@ public interface TranslogManager {
void skipTranslogRecovery();

/**
* Returns the instance of the translog with a precondition
* @return the translog instance
* Reads operations for the translog
* @param location the location in the translog
* @return the translog operation
* @throws IOException
*/
Translog getTranslog();
Translog.Operation readOperation(Translog.Location location) throws IOException;

/**
* Adds an operation to the translog
* @param operation
* @return the location in the translog
* @throws IOException
*/
Translog.Location add(Translog.Operation operation) throws IOException;

/**
* Checks if the translog has a pending recovery
Expand Down
Loading