Skip to content

Commit

Permalink
Add total upload and download time from remote to nodes stats
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed Aug 21, 2023
1 parent 784a473 commit bb1c3cd
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,8 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,9 +78,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -90,9 +92,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -104,9 +107,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
}

/**
Expand Down Expand Up @@ -180,13 +184,16 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0, totalDownloadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -202,9 +209,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
.cluster()
Expand All @@ -220,9 +230,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -237,9 +250,11 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;
/**
* Total time spent in uploading segments from remote store
*/
private long totalUploadTime;
/**
* Total time spent in downloading segments from remote store
*/
private long totalDownloadTime;

public RemoteSegmentStats() {}

Expand All @@ -91,6 +99,8 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
}
}

Expand All @@ -115,6 +125,8 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
this.totalUploadTime = trackerStats.totalUploadTimeInMs;
this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs;
}

// Getter and setters. All are visible for testing
Expand Down Expand Up @@ -190,6 +202,22 @@ public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

public long getTotalUploadTime() {
return totalUploadTime;
}

public void addTotalUploadTime(long totalUploadTime) {
this.totalUploadTime += totalUploadTime;
}

public long getTotalDownloadTime() {
return totalDownloadTime;
}

public void addTotalDownloadTime(long totalDownloadTime) {
this.totalDownloadTime += totalDownloadTime;
}

/**
* Adds existing stats. Used for stats roll-ups at index or node level
*
Expand All @@ -206,6 +234,8 @@ public void add(RemoteSegmentStats existingStats) {
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
this.totalUploadTime += existingStats.totalUploadTime;
this.totalDownloadTime += existingStats.totalDownloadTime;
}
}

Expand All @@ -231,6 +261,8 @@ public void writeTo(StreamOutput out) throws IOException {
*/
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
}
}

Expand Down Expand Up @@ -258,6 +290,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
Expand All @@ -266,6 +299,7 @@ private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalDownloadTime));
}

static final class Fields {
Expand All @@ -287,5 +321,7 @@ static final class Fields {
static final String TOTAL_BYTES = "total_bytes";
static final String MAX = "max";
static final String MAX_BYTES = "max_bytes";
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public class RemoteSegmentTransferTracker {
*/
private volatile long totalUploadsSucceeded;

/**
* Cumulative sum of time taken in remote refresh (in milliseconds)
*/
private volatile long totalUploadTimeInMs;

/**
* Cumulative sum of rejection counts for this shard.
*/
Expand Down Expand Up @@ -509,6 +514,7 @@ boolean isUploadTimeMsAverageReady() {
}

public void addUploadTimeMs(long timeMs) {
totalUploadTimeInMs += timeMs;
synchronized (uploadTimeMsMutex) {
this.uploadTimeMsMovingAverageReference.get().record(timeMs);
}
Expand All @@ -525,6 +531,10 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) {
}
}

public long getTotalUploadTimeInMs() {
return totalUploadTimeInMs;

Check warning on line 535 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L535

Added line #L535 was not covered by tests
}

public DirectoryFileTransferTracker getDirectoryFileTransferTracker() {
return directoryFileTransferTracker;
}
Expand All @@ -550,6 +560,7 @@ public RemoteSegmentTransferTracker.Stats stats() {
uploadBytesPerSecMovingAverageReference.get().getAverage(),
uploadTimeMsMovingAverageReference.get().getAverage(),
getBytesLag(),
totalUploadTimeInMs,
directoryFileTransferTracker.stats()
);
}
Expand Down Expand Up @@ -578,6 +589,7 @@ public static class Stats implements Writeable {
public final long lastSuccessfulRemoteRefreshBytes;
public final double uploadBytesMovingAverage;
public final double uploadBytesPerSecMovingAverage;
public final long totalUploadTimeInMs;
public final double uploadTimeMovingAverage;
public final long bytesLag;
public final DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats;
Expand All @@ -602,6 +614,7 @@ public Stats(
double uploadBytesPerSecMovingAverage,
double uploadTimeMovingAverage,
long bytesLag,
long totalUploadTimeInMs,
DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats
) {
this.shardId = shardId;
Expand All @@ -623,6 +636,7 @@ public Stats(
this.uploadBytesPerSecMovingAverage = uploadBytesPerSecMovingAverage;
this.uploadTimeMovingAverage = uploadTimeMovingAverage;
this.bytesLag = bytesLag;
this.totalUploadTimeInMs = totalUploadTimeInMs;
this.directoryFileTransferTrackerStats = directoryFileTransferTrackerStats;
}

Expand All @@ -647,6 +661,7 @@ public Stats(StreamInput in) throws IOException {
this.uploadBytesPerSecMovingAverage = in.readDouble();
this.uploadTimeMovingAverage = in.readDouble();
this.bytesLag = in.readLong();
this.totalUploadTimeInMs = in.readLong();
this.directoryFileTransferTrackerStats = in.readOptionalWriteable(DirectoryFileTransferTracker.Stats::new);
} catch (IOException e) {
throw e;
Expand Down Expand Up @@ -674,6 +689,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(uploadBytesPerSecMovingAverage);
out.writeDouble(uploadTimeMovingAverage);
out.writeLong(bytesLag);
out.writeLong(totalUploadTimeInMs);
out.writeOptionalWriteable(directoryFileTransferTrackerStats);
}

Expand Down Expand Up @@ -702,6 +718,7 @@ public boolean equals(Object obj) {
&& Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0
&& Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0
&& this.bytesLag == other.bytesLag
&& this.totalUploadTimeInMs == other.totalUploadTimeInMs
&& this.directoryFileTransferTrackerStats.equals(other.directoryFileTransferTrackerStats);
}

Expand All @@ -727,6 +744,7 @@ public int hashCode() {
uploadBytesPerSecMovingAverage,
uploadTimeMovingAverage,
bytesLag,
totalUploadTimeInMs,

Check warning on line 747 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L747

Added line #L747 was not covered by tests
directoryFileTransferTrackerStats
);
}
Expand Down
Loading

0 comments on commit bb1c3cd

Please sign in to comment.