-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
No-op replication for primary term validation with NRTSegRep #4127
Closed
Closed
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
6a81186
Introduced abstraction layer in NRTReplicationEngine for extension
ashking94 3a243cc
[Remote Store] Added NRTReplicationNoOpEngine with NoOpTranslogManager
ashking94 ef95d1c
Removes unwanted formatting changes from prev commit
ashking94 3012063
Adds rationale behind new Engine as comments
ashking94 ac2ebc0
Adds NoOp code behind feature flag
ashking94 974ba60
Adds UTs for NRTReplicationNoOpEngine
ashking94 65fe29c
Revert "Adds NoOp code behind feature flag"
ashking94 5f93b80
NoOp engine enablement on remote translog store enable status
ashking94 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
370 changes: 370 additions & 0 deletions
370
server/src/main/java/org/opensearch/index/engine/AbstractNRTReplicationEngine.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,370 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.engine; | ||
|
||
import org.apache.lucene.index.DirectoryReader; | ||
import org.apache.lucene.index.IndexCommit; | ||
import org.apache.lucene.index.SegmentInfos; | ||
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; | ||
import org.apache.lucene.search.ReferenceManager; | ||
import org.apache.lucene.util.SetOnce; | ||
import org.opensearch.common.CheckedConsumer; | ||
import org.opensearch.common.concurrent.GatedCloseable; | ||
import org.opensearch.common.lucene.Lucene; | ||
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; | ||
import org.opensearch.core.internal.io.IOUtils; | ||
import org.opensearch.index.seqno.LocalCheckpointTracker; | ||
import org.opensearch.index.seqno.SeqNoStats; | ||
import org.opensearch.index.seqno.SequenceNumbers; | ||
import org.opensearch.index.translog.Translog; | ||
import org.opensearch.index.translog.TranslogManager; | ||
import org.opensearch.search.suggest.completion.CompletionStats; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.function.BiFunction; | ||
import java.util.function.Consumer; | ||
|
||
/** | ||
* This is an {@link Engine} abstraction intended for replica shards when Segment Replication | ||
* is enabled. This Engine which implements this abstract engine does not create an IndexWriter, | ||
* rather it refreshes a {@link NRTReplicationReaderManager} with new Segments when received from an external source. | ||
* The abstraction also helps to provide a different translog manager instance depending on the usecase like no-op | ||
* replication while having remote store enabled on top of segment replication. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public abstract class AbstractNRTReplicationEngine extends Engine { | ||
|
||
private volatile SegmentInfos lastCommittedSegmentInfos; | ||
private final NRTReplicationReaderManager readerManager; | ||
private final CompletionStatsCache completionStatsCache; | ||
private final LocalCheckpointTracker localCheckpointTracker; | ||
private final SetOnce<TranslogManager> translogManager = new SetOnce<>(); | ||
|
||
public AbstractNRTReplicationEngine(EngineConfig engineConfig) { | ||
super(engineConfig); | ||
store.incRef(); | ||
NRTReplicationReaderManager readerManager = null; | ||
TranslogManager translogManagerRef = null; | ||
try { | ||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); | ||
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); | ||
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( | ||
this.lastCommittedSegmentInfos.getUserData().entrySet() | ||
); | ||
this.localCheckpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint); | ||
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); | ||
this.readerManager = readerManager; | ||
this.readerManager.addListener(completionStatsCache); | ||
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { | ||
this.readerManager.addListener(listener); | ||
} | ||
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { | ||
this.readerManager.addListener(listener); | ||
} | ||
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData(); | ||
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); | ||
translogManagerRef = createTranslogManager(translogUUID, translogManager); | ||
translogManager.set(translogManagerRef); | ||
} catch (IOException e) { | ||
IOUtils.closeWhileHandlingException(store::decRef, readerManager); | ||
closeIfCloseable(IOUtils::closeWhileHandlingException, translogManagerRef); | ||
throw new EngineCreationFailureException(shardId, "failed to create engine", e); | ||
} | ||
} | ||
|
||
@Override | ||
public IndexResult index(Index index) throws IOException { | ||
ensureOpen(); | ||
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); | ||
final Translog.Location location = translogManager().add(new Translog.Index(index, indexResult)); | ||
indexResult.setTranslogLocation(location); | ||
indexResult.setTook(System.nanoTime() - index.startTime()); | ||
indexResult.freeze(); | ||
getLocalCheckpointTracker().advanceMaxSeqNo(index.seqNo()); | ||
return indexResult; | ||
} | ||
|
||
@Override | ||
public DeleteResult delete(Delete delete) throws IOException { | ||
ensureOpen(); | ||
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); | ||
final Translog.Location location = translogManager().add(new Translog.Delete(delete, deleteResult)); | ||
deleteResult.setTranslogLocation(location); | ||
deleteResult.setTook(System.nanoTime() - delete.startTime()); | ||
deleteResult.freeze(); | ||
getLocalCheckpointTracker().advanceMaxSeqNo(delete.seqNo()); | ||
return deleteResult; | ||
} | ||
|
||
@Override | ||
public NoOpResult noOp(NoOp noOp) throws IOException { | ||
ensureOpen(); | ||
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); | ||
final Translog.Location location = translogManager().add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); | ||
noOpResult.setTranslogLocation(location); | ||
noOpResult.setTook(System.nanoTime() - noOp.startTime()); | ||
noOpResult.freeze(); | ||
getLocalCheckpointTracker().advanceMaxSeqNo(noOp.seqNo()); | ||
return noOpResult; | ||
} | ||
|
||
protected abstract TranslogManager createTranslogManager(String translogUUID, SetOnce<TranslogManager> translogManager) | ||
throws IOException; | ||
|
||
@Override | ||
public TranslogManager translogManager() { | ||
return translogManager.get(); | ||
} | ||
|
||
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { | ||
// Update the current infos reference on the Engine's reader. | ||
readerManager.updateSegments(infos); | ||
|
||
// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher | ||
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. | ||
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { | ||
this.lastCommittedSegmentInfos = infos; | ||
translogManager().rollTranslogGeneration(); | ||
} | ||
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); | ||
} | ||
|
||
@Override | ||
public String getHistoryUUID() { | ||
return loadHistoryUUID(lastCommittedSegmentInfos.userData); | ||
} | ||
|
||
@Override | ||
public long getWritingBytes() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public CompletionStats completionStats(String... fieldNamePatterns) { | ||
return completionStatsCache.get(fieldNamePatterns); | ||
} | ||
|
||
@Override | ||
public long getIndexThrottleTimeInMillis() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public boolean isThrottled() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException { | ||
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); | ||
} | ||
|
||
@Override | ||
protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(SearcherScope scope) { | ||
return readerManager; | ||
} | ||
|
||
/** | ||
* Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external | ||
* refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher. | ||
* With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by | ||
* comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true. | ||
* Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos. | ||
*/ | ||
@Override | ||
public boolean refreshNeeded() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public Closeable acquireHistoryRetentionLock() { | ||
throw new UnsupportedOperationException("Not implemented"); | ||
} | ||
|
||
@Override | ||
public Translog.Snapshot newChangesSnapshot( | ||
String source, | ||
long fromSeqNo, | ||
long toSeqNo, | ||
boolean requiredFullRange, | ||
boolean accurateCount | ||
) throws IOException { | ||
throw new UnsupportedOperationException("Not implemented"); | ||
} | ||
|
||
@Override | ||
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { | ||
return false; | ||
} | ||
|
||
@Override | ||
public long getMinRetainedSeqNo() { | ||
return localCheckpointTracker.getProcessedCheckpoint(); | ||
} | ||
|
||
@Override | ||
public long getPersistedLocalCheckpoint() { | ||
return localCheckpointTracker.getPersistedCheckpoint(); | ||
} | ||
|
||
@Override | ||
public long getProcessedLocalCheckpoint() { | ||
return localCheckpointTracker.getProcessedCheckpoint(); | ||
} | ||
|
||
@Override | ||
public SeqNoStats getSeqNoStats(long globalCheckpoint) { | ||
return localCheckpointTracker.getStats(globalCheckpoint); | ||
} | ||
|
||
@Override | ||
public long getIndexBufferRAMBytesUsed() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public List<Segment> segments(boolean verbose) { | ||
return Arrays.asList(getSegmentInfo(getLatestSegmentInfos(), verbose)); | ||
} | ||
|
||
@Override | ||
public void refresh(String source) throws EngineException {} | ||
|
||
@Override | ||
public boolean maybeRefresh(String source) throws EngineException { | ||
return false; | ||
} | ||
|
||
@Override | ||
public void writeIndexingBuffer() throws EngineException {} | ||
|
||
@Override | ||
public boolean shouldPeriodicallyFlush() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} | ||
|
||
@Override | ||
public void forceMerge( | ||
boolean flush, | ||
int maxNumSegments, | ||
boolean onlyExpungeDeletes, | ||
boolean upgrade, | ||
boolean upgradeOnlyAncientSegments, | ||
String forceMergeUUID | ||
) throws EngineException, IOException {} | ||
|
||
@Override | ||
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException { | ||
try { | ||
final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory()); | ||
return new GatedCloseable<>(indexCommit, () -> {}); | ||
} catch (IOException e) { | ||
throw new EngineException(shardId, "Unable to build latest IndexCommit", e); | ||
} | ||
} | ||
|
||
@Override | ||
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException { | ||
return acquireLastIndexCommit(false); | ||
} | ||
|
||
@Override | ||
public SafeCommitInfo getSafeCommitInfo() { | ||
return new SafeCommitInfo(localCheckpointTracker.getProcessedCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); | ||
} | ||
|
||
@Override | ||
protected final void closeNoLock(String reason, CountDownLatch closedLatch) { | ||
if (isClosed.compareAndSet(false, true)) { | ||
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() | ||
: "Either the write lock must be held or the engine must be currently be failing itself"; | ||
try { | ||
IOUtils.close(readerManager, store::decRef); | ||
closeIfCloseableOrThrowsException(IOUtils::close, translogManager.get()); | ||
} catch (Exception e) { | ||
logger.warn("failed to close engine", e); | ||
} finally { | ||
logger.debug("engine closed [{}]", reason); | ||
closedLatch.countDown(); | ||
} | ||
} | ||
} | ||
|
||
private void closeIfCloseable(Consumer<Closeable> closeableConsumer, Object object) { | ||
if (object instanceof Closeable) { | ||
closeableConsumer.accept((Closeable) object); | ||
} | ||
} | ||
|
||
private void closeIfCloseableOrThrowsException(CheckedConsumer<Closeable, IOException> closeableConsumer, Object object) | ||
throws IOException { | ||
if (object instanceof Closeable) { | ||
closeableConsumer.accept((Closeable) object); | ||
} | ||
} | ||
|
||
@Override | ||
public void activateThrottling() {} | ||
|
||
@Override | ||
public void deactivateThrottling() {} | ||
|
||
@Override | ||
public int fillSeqNoGaps(long primaryTerm) throws IOException { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public void maybePruneDeletes() {} | ||
|
||
@Override | ||
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {} | ||
|
||
@Override | ||
public long getMaxSeqNoOfUpdatesOrDeletes() { | ||
return localCheckpointTracker.getMaxSeqNo(); | ||
} | ||
|
||
@Override | ||
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {} | ||
|
||
@Override | ||
protected SegmentInfos getLastCommittedSegmentInfos() { | ||
return lastCommittedSegmentInfos; | ||
} | ||
|
||
@Override | ||
protected SegmentInfos getLatestSegmentInfos() { | ||
return readerManager.getSegmentInfos(); | ||
} | ||
|
||
protected LocalCheckpointTracker getLocalCheckpointTracker() { | ||
return localCheckpointTracker; | ||
} | ||
|
||
private DirectoryReader getDirectoryReader() throws IOException { | ||
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. | ||
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we delegate the creation of the TranslogManager to a factory and inject that rather than these 3 NRT engine types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, had to create an abstract engine so that the seg rep can still work for the newer engine. And since there are some other engine methods that had to be overridden, there came the need to create a newer engine altogether.
Around the creation of TranslogManager, this should be doable.