From b202fd1eba88c8cb6a1bccf736d4e8abc3569207 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 1 Sep 2022 10:44:35 -0700 Subject: [PATCH] [Backport 2.x] Added timing data and more granular stages to SegmentReplicationState (#4367) * Added timing data and more granular stages to SegmentReplicationState (#4222) * Added timing data and more granular stages to SegmentReplicationState This change introduces instrumentation logging that measures the latency of the various stages of segment replication as seen by each replica. Logs have also been added to the source node for checkpoint publishing and checkpoint metadata responses. All logging is currently at the TRACE level. Signed-off-by: Kartik Ganesh * Fixing SegmentReplicationTarget tests Signed-off-by: Kartik Ganesh * Incorporated PR feedback Signed-off-by: Kartik Ganesh * Fixing SegmentReplicationTargetService tests Signed-off-by: Kartik Ganesh Signed-off-by: Kartik Ganesh (cherry picked from commit a2ba3a8c6662b0bac5fc3d73c5029fe323f1192b) * Update changelog for backport pr. Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian Co-authored-by: Kartik Ganesh Co-authored-by: Marc Handalian --- CHANGELOG.md | 1 + .../SegmentReplicationSourceHandler.java | 18 ++++- .../SegmentReplicationSourceService.java | 14 ++++ .../replication/SegmentReplicationState.java | 71 +++++++++++++++---- .../replication/SegmentReplicationTarget.java | 19 +++-- .../SegmentReplicationTargetService.java | 22 +++++- .../checkpoint/PublishCheckpointAction.java | 24 ++++++- .../SegmentReplicationTargetServiceTests.java | 21 ++++-- .../SegmentReplicationTargetTests.java | 20 ++++-- 9 files changed, 176 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b73918fc6cb9a..fa9875d4ff6bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [2.x] ### Added - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) +- Add timing data and more granular stages to SegmentReplicationState ([#4367](https://github.com/opensearch-project/OpenSearch/pull/4367)) ### Changed diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index ce764900e433f..2d21653c1924c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -27,6 +27,7 @@ import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; @@ -104,16 +105,24 @@ class SegmentReplicationSourceHandler { * @param listener {@link ActionListener} that completes with the list of files sent. */ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { + final ReplicationTimer timer = new ReplicationTimer(); if (isReplicating.compareAndSet(false, true) == false) { throw new OpenSearchException("Replication to {} is already running.", shard.shardId()); } future.addListener(listener, OpenSearchExecutors.newDirectExecutorService()); final Closeable releaseResources = () -> IOUtils.close(resources); try { - + timer.start(); final Consumer onFailure = e -> { assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]"); IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + timer.stop(); + logger.trace( + "[replication id {}] Source node failed to send files to target node [{}], timing: {}", + request.getReplicationId(), + request.getTargetNode().getId(), + timer.time() + ); }; RunUnderPrimaryPermit.run(() -> { @@ -151,6 +160,13 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); + timer.stop(); + logger.trace( + "[replication id {}] Source node completed sending files to target node [{}], timing: {}", + request.getReplicationId(), + request.getTargetNode().getId(), + timer.time() + ); } }, onFailure); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index d428459884f97..0cee731fde2cb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; @@ -25,6 +26,7 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; @@ -86,6 +88,8 @@ public SegmentReplicationSourceService( private class CheckpointInfoRequestHandler implements TransportRequestHandler { @Override public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { + final ReplicationTimer timer = new ReplicationTimer(); + timer.start(); final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter( request.getReplicationId(), recoverySettings, @@ -109,6 +113,16 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan copyState.getPendingDeleteFiles() ) ); + timer.stop(); + logger.trace( + new ParameterizedMessage( + "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", + request.getReplicationId(), + copyState.getCheckpoint(), + request.getTargetNode().getId(), + timer.time() + ) + ); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 838c06a4785ef..f865ba1332186 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -8,10 +8,14 @@ package org.opensearch.indices.replication; +import org.opensearch.common.collect.Tuple; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationTimer; +import java.util.ArrayList; +import java.util.List; + /** * ReplicationState implementation to track Segment Replication events. * @@ -26,10 +30,12 @@ public class SegmentReplicationState implements ReplicationState { */ public enum Stage { DONE((byte) 0), - INIT((byte) 1), - - REPLICATING((byte) 2); + REPLICATING((byte) 2), + GET_CHECKPOINT_INFO((byte) 3), + FILE_DIFF((byte) 4), + GET_FILES((byte) 5), + FINALIZE_REPLICATION((byte) 6); private static final Stage[] STAGES = new Stage[Stage.values().length]; @@ -60,13 +66,27 @@ public static Stage fromId(byte id) { private Stage stage; private final ReplicationLuceneIndex index; - private final ReplicationTimer timer; + private final ReplicationTimer overallTimer; + private final ReplicationTimer stageTimer; + private final List> timingData; + private long replicationId; public SegmentReplicationState(ReplicationLuceneIndex index) { stage = Stage.INIT; this.index = index; - timer = new ReplicationTimer(); - timer.start(); + // Timing data will have as many entries as stages, plus one + // additional entry for the overall timer + timingData = new ArrayList<>(Stage.values().length + 1); + overallTimer = new ReplicationTimer(); + stageTimer = new ReplicationTimer(); + stageTimer.start(); + // set an invalid value by default + this.replicationId = -1L; + } + + public SegmentReplicationState(ReplicationLuceneIndex index, long replicationId) { + this(index); + this.replicationId = replicationId; } @Override @@ -74,9 +94,17 @@ public ReplicationLuceneIndex getIndex() { return index; } + public long getReplicationId() { + return replicationId; + } + @Override public ReplicationTimer getTimer() { - return timer; + return overallTimer; + } + + public List> getTimingData() { + return timingData; } public Stage getStage() { @@ -90,6 +118,12 @@ protected void validateAndSetStage(Stage expected, Stage next) { "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])" ); } + // save the timing data for the current step + stageTimer.stop(); + timingData.add(new Tuple<>(stage.name(), stageTimer.time())); + // restart the step timer + stageTimer.reset(); + stageTimer.start(); stage = next; } @@ -97,16 +131,29 @@ public void setStage(Stage stage) { switch (stage) { case INIT: this.stage = Stage.INIT; - getIndex().reset(); break; case REPLICATING: validateAndSetStage(Stage.INIT, stage); - getIndex().start(); + // only start the overall timer once we've started replication + overallTimer.start(); break; - case DONE: + case GET_CHECKPOINT_INFO: validateAndSetStage(Stage.REPLICATING, stage); - getIndex().stop(); - getTimer().stop(); + break; + case FILE_DIFF: + validateAndSetStage(Stage.GET_CHECKPOINT_INFO, stage); + break; + case GET_FILES: + validateAndSetStage(Stage.FILE_DIFF, stage); + break; + case FINALIZE_REPLICATION: + validateAndSetStage(Stage.GET_FILES, stage); + break; + case DONE: + validateAndSetStage(Stage.FINALIZE_REPLICATION, stage); + // add the overall timing data + overallTimer.stop(); + timingData.add(new Tuple<>("OVERALL", overallTimer.time())); break; default: throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]"); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 73d9a2f805d75..a658ffc09d590 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -64,7 +64,7 @@ public SegmentReplicationTarget( super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = checkpoint; this.source = source; - this.state = new SegmentReplicationState(stateIndex); + this.state = new SegmentReplicationState(stateIndex, getId()); this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); } @@ -139,7 +139,9 @@ public void startReplication(ActionListener listener) { final StepListener getFilesListener = new StepListener<>(); final StepListener finalizeListener = new StepListener<>(); + logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId()); // Get list of files to copy from this checkpoint. + state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure); @@ -152,14 +154,16 @@ public void startReplication(ActionListener listener) { private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getFilesListener) throws IOException { + state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata); - logger.debug("Replication diff {}", diff); - // Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot - // from - // source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to - // fail the shard + logger.trace("Replication diff {}", diff); + /* + * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming + * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an + * IllegalStateException to fail the shard + */ if (diff.different.isEmpty() == false) { getFilesListener.onFailure( new IllegalStateException( @@ -177,15 +181,18 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); ActionListener.completeWith(listener, () -> { multiFileWriter.renameAllTempFiles(); final Store store = store(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f699f0edba842..a79ce195ad83b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -116,7 +116,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { - + logger.trace(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint)); // Checks if received checkpoint is already present and ahead then it replaces old received checkpoint if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) { if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { @@ -139,6 +139,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete, timing data: {}", + replicaShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); // if we received a checkpoint during the copy event that is ahead of this // try and process it. if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { @@ -154,6 +162,14 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + replicaShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); if (sendShardFailure == true) { logger.error("replication failure", e); replicaShard.failShard("replication failure", e); @@ -172,9 +188,9 @@ public void startReplication( startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener)); } - public void startReplication(final SegmentReplicationTarget target) { + // pkg-private for integration tests + void startReplication(final SegmentReplicationTarget target) { final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout()); - logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId)); threadPool.generic().execute(new ReplicationRunner(replicationId)); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 8093b6aee88f9..cc51082639cdb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -29,6 +29,7 @@ import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -103,7 +104,10 @@ final void publish(IndexShard indexShard) { // we have to execute under the system context so that if security is enabled the sync is authorized threadContext.markAsSystemContext(); PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); + final ReplicationCheckpoint checkpoint = request.getCheckpoint(); final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + final ReplicationTimer timer = new ReplicationTimer(); + timer.start(); transportService.sendChildRequest( clusterService.localNode(), transportPrimaryAction, @@ -123,12 +127,23 @@ public String executor() { @Override public void handleResponse(ReplicationResponse response) { + timer.stop(); + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] Completed publishing checkpoint [{}], timing: {}", + indexShard.shardId().getId(), + checkpoint, + timer.time() + ) + ); task.setPhase("finished"); taskManager.unregister(task); } @Override public void handleException(TransportException e) { + timer.stop(); + logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time()); task.setPhase("finished"); taskManager.unregister(task); if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { @@ -151,6 +166,13 @@ public void handleException(TransportException e) { } } ); + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] Publishing replication checkpoint [{}]", + checkpoint.getShardId().getId(), + checkpoint + ) + ); } } @@ -168,7 +190,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh Objects.requireNonNull(request); Objects.requireNonNull(replica); ActionListener.completeWith(listener, () -> { - logger.trace("Checkpoint received on replica {}", request); + logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId())); if (request.getCheckpoint().getShardId().equals(replica.shardId())) { replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 2916f4c8152a2..e218f09aad575 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -100,8 +100,8 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { - // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion - target.state().setStage(SegmentReplicationState.Stage.REPLICATING); + // set up stage correctly so the transition in markAsDone succeeds on listener completion + moveTargetToFinalStage(target); final ActionListener listener = invocation.getArgument(0); listener.onResponse(null); return null; @@ -123,7 +123,7 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertEquals(SegmentReplicationState.Stage.REPLICATING, state.getStage()); + assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); assertEquals(expectedError, e.getCause()); assertTrue(sendShardFailure); } @@ -131,8 +131,6 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { - // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion - target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onFailure(expectedError); return null; @@ -271,4 +269,17 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() { sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); verify(spy, times(1)).cancel(any()); } + + /** + * Move the {@link SegmentReplicationTarget} object through its {@link SegmentReplicationState.Stage} values in order + * until the final, non-terminal stage. + */ + private void moveTargetToFinalStage(SegmentReplicationTarget target) { + SegmentReplicationState.Stage[] stageValues = SegmentReplicationState.Stage.values(); + assertEquals(target.state().getStage(), SegmentReplicationState.Stage.INIT); + // Skip the first two stages (DONE and INIT) and iterate until the last value + for (int i = 2; i < stageValues.length; i++) { + target.state().setStage(stageValues[i]); + } + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 1157c463785ac..11217a46b3c69 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -96,7 +97,7 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() ); - SegmentInfos testSegmentInfos; + private SegmentInfos testSegmentInfos; @Override public void setUp() throws Exception { @@ -162,6 +163,7 @@ public void getSegmentFiles( public void onResponse(Void replicationResponse) { try { verify(spyIndexShard, times(1)).finalizeReplication(any(), anyLong()); + segrepTarget.markAsDone(); } catch (IOException ex) { Assert.fail(); } @@ -169,7 +171,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { - logger.error("Unexpected test error", e); + logger.error("Unexpected onFailure", e); Assert.fail(); } }); @@ -213,6 +215,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); + segrepTarget.fail(new OpenSearchException(e), false); } }); } @@ -255,6 +258,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); + segrepTarget.fail(new OpenSearchException(e), false); } }); } @@ -299,6 +303,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause()); + segrepTarget.fail(new OpenSearchException(e), false); } }); } @@ -343,6 +348,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause()); + segrepTarget.fail(new OpenSearchException(e), false); } }); } @@ -384,6 +390,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assert (e instanceof IllegalStateException); + segrepTarget.fail(new OpenSearchException(e), false); } }); } @@ -432,11 +439,13 @@ public void getSegmentFiles( @Override public void onResponse(Void replicationResponse) { logger.info("No error processing checkpoint info"); + segrepTarget.markAsDone(); } @Override public void onFailure(Exception e) { - assert (e instanceof IllegalStateException); + logger.error("Unexpected onFailure", e); + Assert.fail(); } }); } @@ -448,7 +457,7 @@ public void onFailure(Exception e) { * @return * @throws IOException */ - List generateStoreMetadataSnapshot(int docCount) throws IOException { + private List generateStoreMetadataSnapshot(int docCount) throws IOException { List docList = new ArrayList<>(); for (int i = 0; i < docCount; i++) { Document document = new Document(); @@ -480,7 +489,7 @@ List generateStoreMetadataSnapshot(int docCount) throws return Arrays.asList(storeMetadata, storeMetadataWithDeletes); } - public static void deleteContent(Directory directory) throws IOException { + private static void deleteContent(Directory directory) throws IOException { final String[] files = directory.listAll(); final List exceptions = new ArrayList<>(); for (String file : files) { @@ -498,7 +507,6 @@ public static void deleteContent(Directory directory) throws IOException { @Override public void tearDown() throws Exception { super.tearDown(); - segrepTarget.markAsDone(); closeShards(spyIndexShard, indexShard); } }