From bb1c3cde93487b5a315c52e989cde97c5cc0903d Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:36:57 +0530 Subject: [PATCH] Add total upload and download time from remote to nodes stats Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../indices/stats/IndexStatsIT.java | 2 + .../RemoteSegmentStatsFromNodesStatsIT.java | 55 ++++++++++++------- .../index/remote/RemoteSegmentStats.java | 36 ++++++++++++ .../remote/RemoteSegmentTransferTracker.java | 18 ++++++ .../store/DirectoryFileTransferTracker.java | 38 ++++++++++--- .../transfer/TranslogTransferManager.java | 6 +- .../cluster/node/stats/NodeStatsTests.java | 4 ++ .../stats/RemoteStoreStatsTestHelper.java | 7 ++- 8 files changed, 134 insertions(+), 32 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index af5191d7d2039..1a131a2a7eb3d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -1457,6 +1457,8 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(0, remoteSegmentStats.getTotalUploadTime()); + assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); } /** diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java index 19ad43b503ab7..e0befb0faa0e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java @@ -67,7 +67,8 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { indexSingleDoc(secondIndex, true); long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; - long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0; + long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0; + long totalUploadTime = 0; // Fetch upload stats RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin() .cluster() @@ -77,9 +78,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin() .cluster() @@ -90,9 +92,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; // Fetch nodes stats NodesStatsResponse nodesStatsResponse = client().admin() @@ -104,9 +107,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded()); assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted()); assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); - assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag()); + assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime()); } /** @@ -180,13 +184,16 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(0, remoteSegmentStats.getTotalUploadTime()); + assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); } private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) { for (String dataNode : internalCluster().getDataNodeNames()) { long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0; - long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0; + long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0; + long totalUploadTime = 0, totalDownloadTime = 0; // Fetch upload stats RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin() .cluster() @@ -202,9 +209,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; - total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; + totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] + .getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs; RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin() .cluster() @@ -220,9 +230,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; - total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; + totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] + .getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs; // Fetch nodes stats NodesStatsResponse nodesStatsResponse = client().admin() @@ -237,9 +250,11 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded()); assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted()); assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed()); - assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag()); + assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime()); + assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime()); } } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index 0ff61d49c00f8..1cb1273933973 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -67,6 +67,14 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment { * Used to check for data freshness in the remote store */ private long totalRefreshBytesLag; + /** + * Total time spent in uploading segments from remote store + */ + private long totalUploadTime; + /** + * Total time spent in downloading segments from remote store + */ + private long totalDownloadTime; public RemoteSegmentStats() {} @@ -91,6 +99,8 @@ public RemoteSegmentStats(StreamInput in) throws IOException { */ if (in.getVersion().onOrAfter(Version.V_3_0_0)) { totalRefreshBytesLag = in.readLong(); + totalUploadTime = in.readLong(); + totalDownloadTime = in.readLong(); } } @@ -115,6 +125,8 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) { // Aggregations would be performed on the add method this.maxRefreshBytesLag = trackerStats.bytesLag; this.totalRefreshBytesLag = trackerStats.bytesLag; + this.totalUploadTime = trackerStats.totalUploadTimeInMs; + this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs; } // Getter and setters. All are visible for testing @@ -190,6 +202,22 @@ public void addTotalRefreshBytesLag(long totalRefreshBytesLag) { this.totalRefreshBytesLag += totalRefreshBytesLag; } + public long getTotalUploadTime() { + return totalUploadTime; + } + + public void addTotalUploadTime(long totalUploadTime) { + this.totalUploadTime += totalUploadTime; + } + + public long getTotalDownloadTime() { + return totalDownloadTime; + } + + public void addTotalDownloadTime(long totalDownloadTime) { + this.totalDownloadTime += totalDownloadTime; + } + /** * Adds existing stats. Used for stats roll-ups at index or node level * @@ -206,6 +234,8 @@ public void add(RemoteSegmentStats existingStats) { this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag()); this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag()); this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag(); + this.totalUploadTime += existingStats.totalUploadTime; + this.totalDownloadTime += existingStats.totalDownloadTime; } } @@ -231,6 +261,8 @@ public void writeTo(StreamOutput out) throws IOException { */ if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeLong(totalRefreshBytesLag); + out.writeLong(totalUploadTime); + out.writeLong(totalDownloadTime); } } @@ -258,6 +290,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException { builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag)); builder.endObject(); builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag)); + builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime)); } private void buildDownloadStats(XContentBuilder builder) throws IOException { @@ -266,6 +299,7 @@ private void buildDownloadStats(XContentBuilder builder) throws IOException { builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded)); builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed)); builder.endObject(); + builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalDownloadTime)); } static final class Fields { @@ -287,5 +321,7 @@ static final class Fields { static final String TOTAL_BYTES = "total_bytes"; static final String MAX = "max"; static final String MAX_BYTES = "max_bytes"; + static final String TOTAL_TIME_SPENT = "total_time_spent"; + static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis"; } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 1531f74597a03..2036cda3103f5 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -123,6 +123,11 @@ public class RemoteSegmentTransferTracker { */ private volatile long totalUploadsSucceeded; + /** + * Cumulative sum of time taken in remote refresh (in milliseconds) + */ + private volatile long totalUploadTimeInMs; + /** * Cumulative sum of rejection counts for this shard. */ @@ -509,6 +514,7 @@ boolean isUploadTimeMsAverageReady() { } public void addUploadTimeMs(long timeMs) { + totalUploadTimeInMs += timeMs; synchronized (uploadTimeMsMutex) { this.uploadTimeMsMovingAverageReference.get().record(timeMs); } @@ -525,6 +531,10 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { } } + public long getTotalUploadTimeInMs() { + return totalUploadTimeInMs; + } + public DirectoryFileTransferTracker getDirectoryFileTransferTracker() { return directoryFileTransferTracker; } @@ -550,6 +560,7 @@ public RemoteSegmentTransferTracker.Stats stats() { uploadBytesPerSecMovingAverageReference.get().getAverage(), uploadTimeMsMovingAverageReference.get().getAverage(), getBytesLag(), + totalUploadTimeInMs, directoryFileTransferTracker.stats() ); } @@ -578,6 +589,7 @@ public static class Stats implements Writeable { public final long lastSuccessfulRemoteRefreshBytes; public final double uploadBytesMovingAverage; public final double uploadBytesPerSecMovingAverage; + public final long totalUploadTimeInMs; public final double uploadTimeMovingAverage; public final long bytesLag; public final DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats; @@ -602,6 +614,7 @@ public Stats( double uploadBytesPerSecMovingAverage, double uploadTimeMovingAverage, long bytesLag, + long totalUploadTimeInMs, DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats ) { this.shardId = shardId; @@ -623,6 +636,7 @@ public Stats( this.uploadBytesPerSecMovingAverage = uploadBytesPerSecMovingAverage; this.uploadTimeMovingAverage = uploadTimeMovingAverage; this.bytesLag = bytesLag; + this.totalUploadTimeInMs = totalUploadTimeInMs; this.directoryFileTransferTrackerStats = directoryFileTransferTrackerStats; } @@ -647,6 +661,7 @@ public Stats(StreamInput in) throws IOException { this.uploadBytesPerSecMovingAverage = in.readDouble(); this.uploadTimeMovingAverage = in.readDouble(); this.bytesLag = in.readLong(); + this.totalUploadTimeInMs = in.readLong(); this.directoryFileTransferTrackerStats = in.readOptionalWriteable(DirectoryFileTransferTracker.Stats::new); } catch (IOException e) { throw e; @@ -674,6 +689,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeDouble(uploadBytesPerSecMovingAverage); out.writeDouble(uploadTimeMovingAverage); out.writeLong(bytesLag); + out.writeLong(totalUploadTimeInMs); out.writeOptionalWriteable(directoryFileTransferTrackerStats); } @@ -702,6 +718,7 @@ public boolean equals(Object obj) { && Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0 && Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0 && this.bytesLag == other.bytesLag + && this.totalUploadTimeInMs == other.totalUploadTimeInMs && this.directoryFileTransferTrackerStats.equals(other.directoryFileTransferTrackerStats); } @@ -727,6 +744,7 @@ public int hashCode() { uploadBytesPerSecMovingAverage, uploadTimeMovingAverage, bytesLag, + totalUploadTimeInMs, directoryFileTransferTrackerStats ); } diff --git a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java index 5e12517becaf2..b6a8c28edae14 100644 --- a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java @@ -8,6 +8,7 @@ package org.opensearch.index.store; +import org.apache.lucene.store.Directory; import org.opensearch.common.util.MovingAverage; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -17,33 +18,38 @@ import java.util.Objects; /** - * Tracks the amount of bytes transferred between two {@link org.apache.lucene.store.Directory} instances + * Tracks the amount of bytes transferred between two {@link Directory} instances * * @opensearch.internal */ public class DirectoryFileTransferTracker { /** - * Cumulative size of files (in bytes) attempted to be transferred over from the source {@link org.apache.lucene.store.Directory} + * Cumulative size of files (in bytes) attempted to be transferred over from the source {@link Directory} */ private volatile long transferredBytesStarted; /** - * Cumulative size of files (in bytes) successfully transferred over from the source {@link org.apache.lucene.store.Directory} + * Cumulative size of files (in bytes) successfully transferred over from the source {@link Directory} */ private volatile long transferredBytesFailed; /** - * Cumulative size of files (in bytes) failed in transfer over from the source {@link org.apache.lucene.store.Directory} + * Cumulative size of files (in bytes) failed in transfer over from the source {@link Directory} */ private volatile long transferredBytesSucceeded; /** - * Time in milliseconds for the last successful transfer from the source {@link org.apache.lucene.store.Directory} + * Time in milliseconds for the last successful transfer from the source {@link Directory} */ private volatile long lastTransferTimestampMs; /** - * Provides moving average over the last N total size in bytes of files transferred from the source {@link org.apache.lucene.store.Directory}. + * Cumulative time in milliseconds spent in successful transfers from the source {@link Directory} + */ + private volatile long totalTransferTimeInMs; + + /** + * Provides moving average over the last N total size in bytes of files transferred from the source {@link Directory}. * N is window size */ private volatile MovingAverage transferredBytesMovingAverageReference; @@ -51,7 +57,7 @@ public class DirectoryFileTransferTracker { private volatile long lastSuccessfulTransferInBytes; /** - * Provides moving average over the last N transfer speed (in bytes/s) of segment files transferred from the source {@link org.apache.lucene.store.Directory}. + * Provides moving average over the last N transfer speed (in bytes/s) of segment files transferred from the source {@link Directory}. * N is window size */ private volatile MovingAverage transferredBytesPerSecMovingAverageReference; @@ -84,6 +90,7 @@ public void addTransferredBytesSucceeded(long size, long startTimeInMs) { long currentTimeInMs = System.currentTimeMillis(); updateLastTransferTimestampMs(currentTimeInMs); long timeTakenInMS = Math.max(1, currentTimeInMs - startTimeInMs); + addTotalTransferTimeInMs(timeTakenInMS); addTransferredBytesPerSec((size * 1_000L) / timeTakenInMS); } @@ -123,6 +130,15 @@ public void updateLastTransferTimestampMs(long downloadTimestampInMs) { this.lastTransferTimestampMs = downloadTimestampInMs; } + public void addTotalTransferTimeInMs(long totalTransferTimeInMs) { + this.totalTransferTimeInMs += totalTransferTimeInMs; + } + + // Visible for testing + public long getTotalTransferTimeInMs() { + return totalTransferTimeInMs; + } + public DirectoryFileTransferTracker() { transferredBytesMovingAverageReference = new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE); transferredBytesPerSecMovingAverageReference = new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE); @@ -134,6 +150,7 @@ public DirectoryFileTransferTracker.Stats stats() { transferredBytesFailed, transferredBytesSucceeded, lastTransferTimestampMs, + totalTransferTimeInMs, transferredBytesMovingAverageReference.getAverage(), lastSuccessfulTransferInBytes, transferredBytesPerSecMovingAverageReference.getAverage() @@ -150,6 +167,7 @@ public static class Stats implements Writeable { public final long transferredBytesFailed; public final long transferredBytesSucceeded; public final long lastTransferTimestampMs; + public final long totalTransferTimeInMs; public final double transferredBytesMovingAverage; public final long lastSuccessfulTransferInBytes; public final double transferredBytesPerSecMovingAverage; @@ -159,6 +177,7 @@ public Stats( long transferredBytesFailed, long downloadBytesSucceeded, long lastTransferTimestampMs, + long totalTransferTimeInMs, double transferredBytesMovingAverage, long lastSuccessfulTransferInBytes, double transferredBytesPerSecMovingAverage @@ -167,6 +186,7 @@ public Stats( this.transferredBytesFailed = transferredBytesFailed; this.transferredBytesSucceeded = downloadBytesSucceeded; this.lastTransferTimestampMs = lastTransferTimestampMs; + this.totalTransferTimeInMs = totalTransferTimeInMs; this.transferredBytesMovingAverage = transferredBytesMovingAverage; this.lastSuccessfulTransferInBytes = lastSuccessfulTransferInBytes; this.transferredBytesPerSecMovingAverage = transferredBytesPerSecMovingAverage; @@ -177,6 +197,7 @@ public Stats(StreamInput in) throws IOException { this.transferredBytesFailed = in.readLong(); this.transferredBytesSucceeded = in.readLong(); this.lastTransferTimestampMs = in.readLong(); + this.totalTransferTimeInMs = in.readLong(); this.transferredBytesMovingAverage = in.readDouble(); this.lastSuccessfulTransferInBytes = in.readLong(); this.transferredBytesPerSecMovingAverage = in.readDouble(); @@ -188,6 +209,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(transferredBytesFailed); out.writeLong(transferredBytesSucceeded); out.writeLong(lastTransferTimestampMs); + out.writeLong(totalTransferTimeInMs); out.writeDouble(transferredBytesMovingAverage); out.writeLong(lastSuccessfulTransferInBytes); out.writeDouble(transferredBytesPerSecMovingAverage); @@ -203,6 +225,7 @@ public boolean equals(Object obj) { && transferredBytesFailed == stats.transferredBytesFailed && transferredBytesSucceeded == stats.transferredBytesSucceeded && lastTransferTimestampMs == stats.lastTransferTimestampMs + && totalTransferTimeInMs == stats.totalTransferTimeInMs && Double.compare(stats.transferredBytesMovingAverage, transferredBytesMovingAverage) == 0 && lastSuccessfulTransferInBytes == stats.lastSuccessfulTransferInBytes && Double.compare(stats.transferredBytesPerSecMovingAverage, transferredBytesPerSecMovingAverage) == 0; @@ -215,6 +238,7 @@ public int hashCode() { transferredBytesFailed, transferredBytesSucceeded, lastTransferTimestampMs, + totalTransferTimeInMs, transferredBytesMovingAverage, lastSuccessfulTransferInBytes, transferredBytesPerSecMovingAverage diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1d42e8a546858..f229274280ec9 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -158,7 +158,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans } public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException { - logger.info( + logger.debug( "Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}", primaryTerm, generation, @@ -303,7 +303,7 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna * @param minPrimaryTermToKeep all primary terms below this primary term are deleted. */ public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { - logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep); + logger.debug("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep); transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { @Override public void onResponse(Set folders) { @@ -341,7 +341,7 @@ private void deletePrimaryTermAsync(long primaryTerm) { new ActionListener<>() { @Override public void onResponse(Void unused) { - logger.info("Deleted primary term {}", primaryTerm); + logger.debug("Deleted primary term {}", primaryTerm); } @Override diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index fbe70748adf2d..8a450b99904cf 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -460,6 +460,8 @@ public void testSerialization() throws IOException { assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag()); assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag()); + assertEquals(remoteSegmentStats.getTotalUploadTime(), deserializedRemoteSegmentStats.getTotalUploadTime()); + assertEquals(remoteSegmentStats.getTotalDownloadTime(), deserializedRemoteSegmentStats.getTotalDownloadTime()); } } } @@ -793,6 +795,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { remoteSegmentStats.addTotalRefreshBytesLag(5L); remoteSegmentStats.addMaxRefreshBytesLag(2L); remoteSegmentStats.setMaxRefreshTimeLag(2L); + remoteSegmentStats.addTotalUploadTime(20L); + remoteSegmentStats.addTotalDownloadTime(20L); } return indicesStats; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 7430ccaed725b..e2a0209503976 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -46,6 +46,7 @@ static RemoteSegmentTransferTracker.Stats createStatsForNewPrimary(ShardId shard 0, 0, 0, + 10, createZeroDirectoryFileTransferStats() ); } @@ -71,6 +72,7 @@ static RemoteSegmentTransferTracker.Stats createStatsForNewReplica(ShardId shard 0, 0, 0, + 0, createSampleDirectoryFileTransferStats() ); } @@ -96,16 +98,17 @@ static RemoteSegmentTransferTracker.Stats createStatsForRemoteStoreRestoredPrima 0, 0, 100, + 10, createSampleDirectoryFileTransferStats() ); } static DirectoryFileTransferTracker.Stats createSampleDirectoryFileTransferStats() { - return new DirectoryFileTransferTracker.Stats(10, 0, 10, 12345, 5, 5, 5); + return new DirectoryFileTransferTracker.Stats(10, 0, 10, 12345, 5, 5, 5, 10); } static DirectoryFileTransferTracker.Stats createZeroDirectoryFileTransferStats() { - return new DirectoryFileTransferTracker.Stats(0, 0, 0, 0, 0, 0, 0); + return new DirectoryFileTransferTracker.Stats(0, 0, 0, 0, 0, 0, 0, 0); } static ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) {