Skip to content

Commit

Permalink
[Remote Store] Add total upload and download time from remote store t…
Browse files Browse the repository at this point in the history
…o nodes stats (opensearch-project#9454)

---------

Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 authored and Gagan Juneja committed Aug 28, 2023
1 parent feba270 commit f915212
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 121 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393) [#9454](https://github.com/opensearch-project/OpenSearch/pull/9454))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,8 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,9 +78,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -90,9 +92,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -101,12 +104,13 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertTrue(cumulativeUploadsSucceeded > 0 && cumulativeUploadsSucceeded == remoteSegmentStats.getUploadBytesSucceeded());
assertTrue(cumulativeUploadsStarted > 0 && cumulativeUploadsStarted == remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertTrue(totalUploadTime > 0 && totalUploadTime == remoteSegmentStats.getTotalUploadTime());
}

/**
Expand Down Expand Up @@ -180,13 +184,16 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0, totalDownloadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -202,9 +209,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
.cluster()
Expand All @@ -220,9 +230,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -237,9 +250,19 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
// Ensure that total upload time has non-zero value if there has been segments uploaded from the node
if (cumulativeUploadsStarted > 0) {
assertTrue(totalUploadTime > 0);
}
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
// Ensure that total download time has non-zero value if there has been segments downloaded to the node
if (cumulativeDownloadsStarted > 0) {
assertTrue(totalDownloadTime > 0);
}
assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;
/**
* Total time spent in uploading segments to remote store
*/
private long totalUploadTime;
/**
* Total time spent in downloading segments from remote store
*/
private long totalDownloadTime;

public RemoteSegmentStats() {}

Expand All @@ -89,8 +97,10 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.CURRENT)) {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
}
}

Expand All @@ -115,9 +125,12 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
this.totalUploadTime = trackerStats.totalUploadTimeInMs;
this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs;
}

// Getter and setters. All are visible for testing
// Setters are only used for testing
public long getUploadBytesStarted() {
return uploadBytesStarted;
}
Expand Down Expand Up @@ -190,6 +203,22 @@ public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

public long getTotalUploadTime() {
return totalUploadTime;
}

public void addTotalUploadTime(long totalUploadTime) {
this.totalUploadTime += totalUploadTime;
}

public long getTotalDownloadTime() {
return totalDownloadTime;
}

public void addTotalDownloadTime(long totalDownloadTime) {
this.totalDownloadTime += totalDownloadTime;
}

/**
* Adds existing stats. Used for stats roll-ups at index or node level
*
Expand All @@ -206,6 +235,8 @@ public void add(RemoteSegmentStats existingStats) {
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
this.totalUploadTime += existingStats.getTotalUploadTime();
this.totalDownloadTime += existingStats.getTotalDownloadTime();
}
}

Expand All @@ -229,8 +260,10 @@ public void writeTo(StreamOutput out) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
}
}

Expand Down Expand Up @@ -258,6 +291,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
Expand All @@ -266,6 +300,7 @@ private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalDownloadTime));
}

static final class Fields {
Expand All @@ -287,5 +322,7 @@ static final class Fields {
static final String TOTAL_BYTES = "total_bytes";
static final String MAX = "max";
static final String MAX_BYTES = "max_bytes";
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
}
}
Loading

0 comments on commit f915212

Please sign in to comment.