Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed Aug 24, 2023
1 parent 3dec2c0 commit 2e83bb0
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertTrue(cumulativeUploadsSucceeded > 0 && cumulativeUploadsSucceeded == remoteSegmentStats.getUploadBytesSucceeded());
assertTrue(cumulativeUploadsStarted > 0 && cumulativeUploadsStarted == remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
assertTrue(totalUploadTime > 0 && totalUploadTime == remoteSegmentStats.getTotalUploadTime());
}

/**
Expand Down Expand Up @@ -253,7 +253,15 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
// Ensure that total upload time has non-zero value if there has been segments uploaded from the node
if (cumulativeUploadsStarted > 0) {
assertTrue(totalUploadTime > 0);
}
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
// Ensure that total download time has non-zero value if there has been segments downloaded to the node
if (cumulativeDownloadsStarted > 0) {
assertTrue(totalDownloadTime > 0);
}
assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.CURRENT)) {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
Expand Down Expand Up @@ -260,7 +260,7 @@ public void writeTo(StreamOutput out) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@ public class RemoteSegmentTransferTracker {
/**
* Cumulative sum of size in bytes of segment files for which upload has started during remote refresh.
*/
private volatile long uploadBytesStarted;
private final AtomicLong uploadBytesStarted = new AtomicLong();

/**
* Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh.
*/
private volatile long uploadBytesFailed;
private final AtomicLong uploadBytesFailed = new AtomicLong();

/**
* Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh.
*/
private volatile long uploadBytesSucceeded;
private final AtomicLong uploadBytesSucceeded = new AtomicLong();

/**
* Cumulative sum of count of remote refreshes that have started.
Expand All @@ -126,7 +126,7 @@ public class RemoteSegmentTransferTracker {
/**
* Cumulative sum of time taken in remote refresh (in milliseconds) [Tracked per file]
*/
private volatile long totalUploadTimeInMs;
private AtomicLong totalUploadTimeInMs = new AtomicLong();

/**
* Cumulative sum of rejection counts for this shard.
Expand Down Expand Up @@ -321,31 +321,31 @@ public long getBytesLag() {
}

public long getUploadBytesStarted() {
return uploadBytesStarted;
return uploadBytesStarted.get();
}

public void addUploadBytesStarted(long size) {
uploadBytesStarted += size;
uploadBytesStarted.getAndAdd(size);
}

public long getUploadBytesFailed() {
return uploadBytesFailed;
return uploadBytesFailed.get();
}

public void addUploadBytesFailed(long size) {
uploadBytesFailed += size;
uploadBytesFailed.getAndAdd(size);
}

public long getUploadBytesSucceeded() {
return uploadBytesSucceeded;
return uploadBytesSucceeded.get();
}

public void addUploadBytesSucceeded(long size) {
uploadBytesSucceeded += size;
uploadBytesSucceeded.getAndAdd(size);
}

public long getInflightUploadBytes() {
return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded;
return uploadBytesStarted.get() - uploadBytesFailed.get() - uploadBytesSucceeded.get();
}

public long getTotalUploadsStarted() {
Expand Down Expand Up @@ -531,11 +531,11 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) {
}

public void addTotalUploadTimeInMs(long fileUploadTimeInMs) {
this.totalUploadTimeInMs += fileUploadTimeInMs;
this.totalUploadTimeInMs.addAndGet(fileUploadTimeInMs);
}

public long getTotalUploadTimeInMs() {
return totalUploadTimeInMs;
return totalUploadTimeInMs.get();
}

public DirectoryFileTransferTracker getDirectoryFileTransferTracker() {
Expand All @@ -550,9 +550,9 @@ public RemoteSegmentTransferTracker.Stats stats() {
timeMsLag,
localRefreshSeqNo,
remoteRefreshSeqNo,
uploadBytesStarted,
uploadBytesSucceeded,
uploadBytesFailed,
uploadBytesStarted.get(),
uploadBytesSucceeded.get(),
uploadBytesFailed.get(),
totalUploadsStarted,
totalUploadsSucceeded,
totalUploadsFailed,
Expand All @@ -563,7 +563,7 @@ public RemoteSegmentTransferTracker.Stats stats() {
uploadBytesPerSecMovingAverageReference.get().getAverage(),
uploadTimeMsMovingAverageReference.get().getAverage(),
getBytesLag(),
totalUploadTimeInMs,
totalUploadTimeInMs.get(),
directoryFileTransferTracker.stats()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL
private long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final UploadListener statsListener;

public RemoteStoreRefreshListener(
IndexShard indexShard,
Expand Down Expand Up @@ -117,33 +116,6 @@ public RemoteStoreRefreshListener(
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.statsListener = new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addToLatestUploadedFiles(file);
long fileUploadTime = System.currentTimeMillis() - uploadStartTime;
// Round off upload time to 1 millisecond
fileUploadTime = fileUploadTime > 0 ? fileUploadTime : 1;
segmentTracker.addTotalUploadTimeInMs(fileUploadTime);
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
}
};
}

@Override
Expand Down Expand Up @@ -380,6 +352,32 @@ private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, Acti
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size());

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addTotalUploadTimeInMs(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addTotalUploadTimeInMs(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

Check warning on line 379 in server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java#L377-L379

Added lines #L377 - L379 were not covered by tests
};
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
Expand Down
Loading

0 comments on commit 2e83bb0

Please sign in to comment.