Skip to content

Commit

Permalink
[Backport 2.x] [Segment Replication] Backport all PR's containing rem…
Browse files Browse the repository at this point in the history
…aining segment replication changes (#4243)

* [Segment Replication] Checkpoint Replay on Replica Shard (#3658)

* Adding Latest Recevied checkpoint, replay checkpoint logic along with tests

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* [Segment Replication] Wire up segment replication with peer recovery and add ITs. (#3743)

* Add null check when computing max segment version.

With segment replication enabled it is possible Lucene does not set the SegmentInfos
min segment version, leaving the default value as null.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Update peer recovery to set the translogUUID of replicas to the UUID generated on the primary.

This change updates the UUID when the translog is created to the value stored in the passed segment userdata.
This is to ensure during failover scenarios that the replica can be promoted and not have a uuid mismatch with the value stored in user data.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Wire up Segment Replication under the feature flag.

This PR wires up segment replication and adds some initial integration tests.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add test to ensure replicas use primary translog uuid with segrep.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Update SegmentReplicationIT to assert previous commit points are valid and SegmentInfos can be built.
Fix nitpicks in PR feedback.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix test with Assert.fail to include a message.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Disable shard idle with segment replication. (#4118)

This change disables shard idle when segment replication is enabled.
Primary shards will only push out new segments on refresh, we do not want to block this based on search behavior.
Replicas will only refresh on an externally provided SegmentInfos, so we do not want search requests to hang waiting for a refresh.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix isAheadOf logic for ReplicationCheckpoint comparison (#4112)

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix waitUntil refresh policy for segrep enabled indices. (#4137)

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add IndexShard#getLatestReplicationCheckpoint behind segrep enable feature flag (#4163)

* Add IndexShard#getLatestReplicationCheckpoint behind segrep enable feature flag

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comment. Move tests to SegmentReplicationIndexShardTests

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Add segrep enbaled index settings in TargetServiceTests, SourceHandlerTests

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* [Segment Replication] Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode. (#4182)

* Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode.

This change fixes segrep to work with multiple shards per node by keying ongoing replications on
allocation ID.  This also updates cancel methods to ensure state is properly cleared on shard cancel.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Clean up cancel methods.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>

* [Bug] [Segment Replication] Update store metadata recovery diff logic to ignore missing files causing exception (#4185)

* Update Store for segment replication dif

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* Update recoveryDiff logic to ingore missing files causing exception on replica during copy

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
Signed-off-by: Suraj Singh <surajrider@gmail.com>
Co-authored-by: Poojita Raj <poojiraj@amazon.com>

* [Segment Replication] Adding PrimaryMode check before publishing checkpoint and processing a received checkpoint. (#4157)

* Adding PrimaryMode check before publishing checkpoint.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying spotless check

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Moving segrep specific tests to SegmentReplicationIndexShardTests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding logic and tests for rejecting checkpoints if shard is in PrimaryMode.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying ./gradlew :server:spotlessApply.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying ./gradlew :server:spotlessApply

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Changing log level to warn in shouldProcessCheckpoint() of IndexShard.java class.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Removing unnecessary lazy logging in shouldProcessCheckpoint().

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* [Segment Replication] Wait for documents to replicate to replica shards (#4236)

* [Segment Replication] Add thread sleep to account for replica lag in delete operations test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments, assertBusy on doc count rather than sleep

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Segment Replication - Remove unnecessary call to markAllocationIdAsInSync. (#4224)

This PR Removes an unnecessary call to markAllocationIdAsInSync on the primary shard when replication events complete.
Recovery will manage this initial call.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Segment Replication - Add additional unit tests for update & delete ops. (#4237)

* Segment Replication - Add additional unit tests for update & delete operations.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Suraj Singh <surajrider@gmail.com>
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
Co-authored-by: Marc Handalian <handalm@amazon.com>
Co-authored-by: Suraj Singh <surajrider@gmail.com>
Co-authored-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
4 people committed Aug 17, 2022
1 parent 9b99c16 commit 8c2f50f
Show file tree
Hide file tree
Showing 27 changed files with 1,365 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.MockScriptPlugin;
Expand Down Expand Up @@ -74,7 +77,11 @@ public class WaitUntilRefreshIT extends OpenSearchIntegTestCase {
@Override
public Settings indexSettings() {
// Use a shorter refresh interval to speed up the tests. We'll be waiting on this interval several times.
return Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms").build();
final Settings.Builder builder = Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms");
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
}
return builder.build();
}

@Before
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.readerManager.addListener(listener);
}
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
Expand Down Expand Up @@ -199,6 +205,18 @@ protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(Search
return readerManager;
}

/**
* Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external
* refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher.
* With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by
* comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true.
* Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos.
*/
@Override
public boolean refreshNeeded() {
return false;
}

@Override
public boolean isTranslogSyncNeeded() {
return translogManager.getTranslog().syncNeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
}
Expand Down
27 changes: 17 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1382,9 +1382,16 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the lastest Replication Checkpoint that shard received
* Returns the latest ReplicationCheckpoint that shard received.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
if (getEngineOrNull() == null) {
return ReplicationCheckpoint.empty(shardId);
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
Expand All @@ -1396,15 +1403,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
segmentInfos.getVersion()
)
)
.orElse(
new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
getProcessedLocalCheckpoint(),
SequenceNumbers.NO_OPS_PERFORMED
)
);
.orElse(ReplicationCheckpoint.empty(shardId));
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
Expand All @@ -1421,6 +1420,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down Expand Up @@ -3774,6 +3777,10 @@ public boolean scheduledRefresh() {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down
95 changes: 77 additions & 18 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
// With segment replication enabled, we compute metadata snapshots from the latest in memory infos.
// In this case we will have SegmentInfos objects fetched from the primary's reader
// where the minSegmentLuceneVersion can be null even though there are segments.
// This is because the SegmentInfos object is not read from a commit/IndexInput, which sets
// minSegmentLuceneVersion.
if (maxVersion == null || version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
Expand Down Expand Up @@ -1097,6 +1102,30 @@ public Map<String, StoreFileMetadata> asMap() {
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();
for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles));
}

/**
* Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the
* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:
Expand Down Expand Up @@ -1134,23 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
Expand Down Expand Up @@ -1185,6 +1199,51 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
return recoveryDiff;
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
} else {
identicalFiles.add(meta);
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
return recoveryDiff;
}

/**
* Returns the number of files in this snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.env.ShardLockObtainFailedException;
Expand All @@ -80,6 +81,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -90,6 +92,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -134,7 +137,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final FailedShardHandler failedShardHandler = new FailedShardHandler();

private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
private final List<IndexEventListener> builtInIndexListener;
private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;
Expand All @@ -148,6 +151,7 @@ public IndicesClusterStateService(
final ClusterService clusterService,
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -165,6 +169,7 @@ public IndicesClusterStateService(
clusterService,
threadPool,
checkpointPublisher,
segmentReplicationTargetService,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -185,6 +190,7 @@ public IndicesClusterStateService(
final ClusterService clusterService,
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final SegmentReplicationTargetService segmentReplicationTargetService,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -198,7 +204,15 @@ public IndicesClusterStateService(
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);

final List<IndexEventListener> indexEventListeners = new ArrayList<>(
Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService)
);
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
indexEventListeners.add(segmentReplicationTargetService);
}
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down Expand Up @@ -514,7 +528,7 @@ private void createIndices(final ClusterState state) {

AllocatedIndex<? extends Shard> indexService = null;
try {
indexService = indicesService.createIndex(indexMetadata, buildInIndexListener, true);
indexService = indicesService.createIndex(indexMetadata, builtInIndexListener, true);
if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(
state.nodes().getClusterManagerNode(),
Expand Down
Loading

0 comments on commit 8c2f50f

Please sign in to comment.