From c9dbd90219ca03f09aec1880e011027ba125674f Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Mon, 14 Aug 2023 15:56:15 +0530 Subject: [PATCH] [Remote Store] Add remote segment transfer stats on NodesStats API (#9168) --------- Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- CHANGELOG.md | 2 + .../indices/stats/IndexStatsIT.java | 40 +++ .../RemoteSegmentStatsFromNodesStatsIT.java | 237 ++++++++++++++++++ .../RemoteStoreBaseIntegTestCase.java | 23 +- .../index/engine/SegmentsStats.java | 25 +- .../index/remote/RemoteSegmentStats.java | 236 +++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 13 + .../cluster/node/stats/NodeStatsTests.java | 54 +++- .../index/shard/IndexShardTests.java | 33 +++ 9 files changed, 655 insertions(+), 8 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b26f5b26c5f73..77ed030ecafd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792)) - [Remote Store] Restrict user override for remote store index level settings ([#8812](https://github.com/opensearch-project/OpenSearch/pull/8812)) - Removed blocking wait in TransportGetSnapshotsAction which was exhausting generic threadpool ([#8377](https://github.com/opensearch-project/OpenSearch/pull/8377)) +- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718)) +- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 233c8811ca6f4..54bf3306f5514 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -49,6 +49,8 @@ import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchType; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.UUIDs; import org.opensearch.core.action.support.DefaultShardOperationFailedException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.core.common.bytes.BytesReference; @@ -56,6 +58,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; @@ -66,6 +69,7 @@ import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesQueryCache; @@ -1418,6 +1422,42 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti assertThat(executionFailures.get(), emptyCollectionOf(Exception.class)); } + public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() { + String indexName = "test-index"; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen(indexName); + assertEquals( + RestStatus.CREATED, + client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource("field", "value1", "field2", "value1") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + .status() + ); + ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).get().getShards()[0]; + RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats(); + assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats); + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats(primaryNodeName(indexName)).get(); + RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes() + .get(0) + .getIndices() + .getSegments() + .getRemoteSegmentStats(); + assertZeroRemoteSegmentStats(remoteSegmentStatsFromNodesStats); + } + + private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { + assertEquals(0, remoteSegmentStats.getUploadBytesStarted()); + assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(0, remoteSegmentStats.getUploadBytesFailed()); + assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); + assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded()); + assertEquals(0, remoteSegmentStats.getDownloadBytesFailed()); + assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); + } + /** * Persist the global checkpoint on all shards of the given index into disk. * This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value. diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java new file mode 100644 index 0000000000000..a92c80b9cf840 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java @@ -0,0 +1,237 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.Before; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.concurrent.TimeUnit; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteSegmentStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase { + private static final String INDEX_NAME = "remote-index-1"; + private static final int DATA_NODE_COUNT = 2; + private static final int CLUSTER_MANAGER_NODE_COUNT = 3; + + @Before + public void setup() { + setupCustomCluster(); + setupRepo(false); + } + + private void setupCustomCluster() { + internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT); + internalCluster().startDataOnlyNodes(DATA_NODE_COUNT); + ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT); + } + + /** + * - Creates two indices with single primary shard, pinned to a single node. + * - Index documents in both of them and forces a fresh for both + * - Polls the _remotestore/stats API for individual index level stats + * - Adds up requisite fields from the API output, repeats this for the 2nd index + * - Polls _nodes/stats and verifies that the total values at node level adds up + * to the values capture in the previous step + */ + public void testNodesStatsParityWithOnlyPrimaryShards() { + String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new); + String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)]; + String firstIndex = INDEX_NAME + "1"; + String secondIndex = INDEX_NAME + "2"; + + // Create first index + createIndex( + firstIndex, + Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build() + ); + ensureGreen(firstIndex); + indexSingleDoc(firstIndex, true); + + // Create second index + createIndex( + secondIndex, + Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build() + ); + ensureGreen(secondIndex); + indexSingleDoc(secondIndex, true); + + long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; + long max_bytes_lag = 0, max_time_lag = 0; + // Fetch upload stats + RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin() + .cluster() + .prepareRemoteStoreStats(firstIndex, "0") + .setLocal(true) + .get(); + cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + + RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin() + .cluster() + .prepareRemoteStoreStats(secondIndex, "0") + .setLocal(true) + .get(); + cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + + // Fetch nodes stats + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats(randomDataNode) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) + .get(); + RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats(); + assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted()); + assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); + assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); + } + + /** + * - Creates two indices with single primary shard and single replica + * - Index documents in both of them and forces a fresh for both + * - Polls the _remotestore/stats API for individual index level stats + * - Adds up requisite fields from the API output for both indices + * - Polls _nodes/stats and verifies that the total values at node level adds up + * to the values capture in the previous step + * - Repeats the above 3 steps for the second node + */ + public void testNodesStatsParityWithReplicaShards() throws Exception { + String firstIndex = INDEX_NAME + "1"; + String secondIndex = INDEX_NAME + "2"; + + createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build()); + ensureGreen(firstIndex); + indexSingleDoc(firstIndex, true); + + // Create second index + createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build()); + ensureGreen(secondIndex); + indexSingleDoc(secondIndex, true); + + assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS); + } + + /** + * Ensures that node stats shows 0 values for dedicated cluster manager nodes + * since cluster manager nodes does not participate in indexing + */ + public void testZeroRemoteStatsOnNodesStatsForClusterManager() { + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + refresh(INDEX_NAME); + NodesStatsResponse nodesStatsResponseForClusterManager = client().admin() + .cluster() + .prepareNodesStats(internalCluster().getClusterManagerName()) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) + .get(); + assertTrue( + nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode() + && !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode() + ); + assertZeroRemoteSegmentStats( + nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats() + ); + NodesStatsResponse nodesStatsResponseForDataNode = client().admin() + .cluster() + .prepareNodesStats(primaryNodeName(INDEX_NAME)) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) + .get(); + assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode()); + RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes() + .get(0) + .getIndices() + .getSegments() + .getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getUploadBytesStarted() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + + private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { + assertEquals(0, remoteSegmentStats.getUploadBytesStarted()); + assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(0, remoteSegmentStats.getUploadBytesFailed()); + assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); + assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded()); + assertEquals(0, remoteSegmentStats.getDownloadBytesFailed()); + assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); + } + + private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) { + for (String dataNode : internalCluster().getDataNodeNames()) { + long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; + long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0; + long max_bytes_lag = 0, max_time_lag = 0; + // Fetch upload stats + RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin() + .cluster() + .prepareRemoteStoreStats(firstIndex, "0") + .setLocal(true) + .get(); + cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; + cumulativeDownloadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] + .getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; + cumulativeDownloadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] + .getStats().directoryFileTransferTrackerStats.transferredBytesStarted; + cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] + .getStats().directoryFileTransferTrackerStats.transferredBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + + RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin() + .cluster() + .prepareRemoteStoreStats(secondIndex, "0") + .setLocal(true) + .get(); + cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; + cumulativeDownloadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] + .getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; + cumulativeDownloadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] + .getStats().directoryFileTransferTrackerStats.transferredBytesStarted; + cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] + .getStats().directoryFileTransferTrackerStats.transferredBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + + // Fetch nodes stats + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats(dataNode) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) + .get(); + RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats(); + assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted()); + assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); + assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded()); + assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted()); + assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed()); + assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 617510bf8222c..e6834a4a667c9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -13,7 +13,9 @@ import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; @@ -126,10 +128,17 @@ public Settings indexSettings() { } protected IndexResponse indexSingleDoc(String indexName) { - return client().prepareIndex(indexName) + return indexSingleDoc(indexName, false); + } + + protected IndexResponse indexSingleDoc(String indexName, boolean forceRefresh) { + IndexRequestBuilder indexRequestBuilder = client().prepareIndex(indexName) .setId(UUIDs.randomBase64UUID()) - .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) - .get(); + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)); + if (forceRefresh) { + indexRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + return indexRequestBuilder.get(); } protected BulkResponse indexBulk(String indexName, int numDocs) { @@ -208,7 +217,13 @@ protected void putRepository(Path path, String repoName) { } protected void setupRepo() { - internalCluster().startClusterManagerOnlyNode(); + setupRepo(true); + } + + protected void setupRepo(boolean startDedicatedClusterManager) { + if (startDedicatedClusterManager) { + internalCluster().startClusterManagerOnlyNode(); + } absolutePath = randomRepoPath().toAbsolutePath(); putRepository(absolutePath); absolutePath2 = randomRepoPath().toAbsolutePath(); diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index 1fd7f30237d0b..9b977db43c3f2 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -39,6 +39,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.remote.RemoteSegmentStats; import java.io.IOException; import java.util.Collections; @@ -58,7 +59,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { private long maxUnsafeAutoIdTimestamp = Long.MIN_VALUE; private long bitsetMemoryInBytes; private final Map fileSizes; - + private final RemoteSegmentStats remoteSegmentStats; private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L); /* @@ -91,6 +92,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { public SegmentsStats() { fileSizes = new HashMap<>(); + remoteSegmentStats = new RemoteSegmentStats(); } public SegmentsStats(StreamInput in) throws IOException { @@ -111,6 +113,12 @@ public SegmentsStats(StreamInput in) throws IOException { bitsetMemoryInBytes = in.readLong(); maxUnsafeAutoIdTimestamp = in.readLong(); fileSizes = in.readMap(StreamInput::readString, StreamInput::readLong); + // TODO Update to 2_10_0 when we backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + remoteSegmentStats = in.readOptionalWriteable(RemoteSegmentStats::new); + } else { + remoteSegmentStats = new RemoteSegmentStats(); + } } public void add(long count) { @@ -133,6 +141,10 @@ public void addBitsetMemoryInBytes(long bitsetMemoryInBytes) { this.bitsetMemoryInBytes += bitsetMemoryInBytes; } + public void addRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { + this.remoteSegmentStats.add(remoteSegmentStats); + } + public void addFileSizes(final Map newFileSizes) { newFileSizes.forEach((k, v) -> this.fileSizes.merge(k, v, (a, b) -> { assert a != null; @@ -151,6 +163,7 @@ public void add(SegmentsStats mergeStats) { addVersionMapMemoryInBytes(mergeStats.versionMapMemoryInBytes); addBitsetMemoryInBytes(mergeStats.bitsetMemoryInBytes); addFileSizes(mergeStats.fileSizes); + addRemoteSegmentStats(mergeStats.remoteSegmentStats); } /** @@ -198,6 +211,11 @@ public Map getFileSizes() { return Collections.unmodifiableMap(this.fileSizes); } + /** Returns remote_store based stats **/ + public RemoteSegmentStats getRemoteSegmentStats() { + return remoteSegmentStats; + } + /** * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs @@ -221,6 +239,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.VERSION_MAP_MEMORY_IN_BYTES, Fields.VERSION_MAP_MEMORY, getVersionMapMemory()); builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory()); builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp); + remoteSegmentStats.toXContent(builder, params); builder.startObject(Fields.FILE_SIZES); for (Map.Entry entry : fileSizes.entrySet()) { builder.startObject(entry.getKey()); @@ -287,6 +306,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(bitsetMemoryInBytes); out.writeLong(maxUnsafeAutoIdTimestamp); out.writeMap(this.fileSizes, StreamOutput::writeString, StreamOutput::writeLong); + // TODO Update to 2_10_0 when we backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(remoteSegmentStats); + } } public void clearFileSizes() { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java new file mode 100644 index 0000000000000..f834f4ad9583d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -0,0 +1,236 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.shard.IndexShard; + +import java.io.IOException; + +/** + * Tracks remote store segment download and upload stats + * Used for displaying remote store stats in IndicesStats/NodeStats API + * + * @opensearch.internal + */ +public class RemoteSegmentStats implements Writeable, ToXContentFragment { + /** + * Cumulative bytes attempted to be uploaded to remote store + */ + private long uploadBytesStarted; + /** + * Cumulative bytes failed to be uploaded to the remote store + */ + private long uploadBytesFailed; + /** + * Cumulative bytes successfully uploaded to the remote store + */ + private long uploadBytesSucceeded; + /** + * Cumulative bytes attempted to be downloaded from the remote store + */ + private long downloadBytesStarted; + /** + * Cumulative bytes failed to be downloaded from the remote store + */ + private long downloadBytesFailed; + /** + * Cumulative bytes successfully downloaded from the remote store + */ + private long downloadBytesSucceeded; + /** + * Maximum refresh lag (in milliseconds) between local and the remote store + * Used to check for data freshness in the remote store + */ + private long maxRefreshTimeLag; + /** + * Maximum refresh lag (in bytes) between local and the remote store + * Used to check for data freshness in the remote store + */ + private long maxRefreshBytesLag; + + public RemoteSegmentStats() {} + + public RemoteSegmentStats(StreamInput in) throws IOException { + uploadBytesStarted = in.readLong(); + uploadBytesFailed = in.readLong(); + uploadBytesSucceeded = in.readLong(); + downloadBytesStarted = in.readLong(); + downloadBytesFailed = in.readLong(); + downloadBytesSucceeded = in.readLong(); + maxRefreshTimeLag = in.readLong(); + maxRefreshBytesLag = in.readLong(); + } + + /** + * Constructor to retrieve metrics from {@link RemoteSegmentTransferTracker.Stats} which is used in {@link RemoteStoreStats} and + * provides verbose index level stats of segments transferred to the remote store. + *

+ * This method is used in {@link IndexShard} to port over a subset of metrics to be displayed in IndexStats and subsequently rolled up to NodesStats + * + * @param trackerStats: Source {@link RemoteSegmentTransferTracker.Stats} object from which metrics would be retrieved + */ + public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) { + this.uploadBytesStarted = trackerStats.uploadBytesStarted; + this.uploadBytesFailed = trackerStats.uploadBytesFailed; + this.uploadBytesSucceeded = trackerStats.uploadBytesSucceeded; + this.downloadBytesSucceeded = trackerStats.directoryFileTransferTrackerStats.transferredBytesSucceeded; + this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted; + this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed; + this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs; + this.maxRefreshBytesLag = trackerStats.bytesLag; + } + + // Getter and setters. All are visible for testing + public long getUploadBytesStarted() { + return uploadBytesStarted; + } + + public void addUploadBytesStarted(long uploadsStarted) { + this.uploadBytesStarted += uploadsStarted; + } + + public long getUploadBytesFailed() { + return uploadBytesFailed; + } + + public void addUploadBytesFailed(long uploadsFailed) { + this.uploadBytesFailed += uploadsFailed; + } + + public long getUploadBytesSucceeded() { + return uploadBytesSucceeded; + } + + public void addUploadBytesSucceeded(long uploadsSucceeded) { + this.uploadBytesSucceeded += uploadsSucceeded; + } + + public long getDownloadBytesStarted() { + return downloadBytesStarted; + } + + public void addDownloadBytesStarted(long downloadsStarted) { + this.downloadBytesStarted += downloadsStarted; + } + + public long getDownloadBytesFailed() { + return downloadBytesFailed; + } + + public void addDownloadBytesFailed(long downloadsFailed) { + this.downloadBytesFailed += downloadsFailed; + } + + public long getDownloadBytesSucceeded() { + return downloadBytesSucceeded; + } + + public void addDownloadBytesSucceeded(long downloadsSucceeded) { + this.downloadBytesSucceeded += downloadsSucceeded; + } + + public long getMaxRefreshTimeLag() { + return maxRefreshTimeLag; + } + + public void setMaxRefreshTimeLag(long maxRefreshTimeLag) { + this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, maxRefreshTimeLag); + } + + public long getMaxRefreshBytesLag() { + return maxRefreshBytesLag; + } + + public void setMaxRefreshBytesLag(long maxRefreshBytesLag) { + this.maxRefreshBytesLag = maxRefreshBytesLag; + } + + /** + * Adds existing stats. Used for stats roll-ups at index or node level + * + * @param existingStats: Existing {@link RemoteSegmentStats} to add + */ + public void add(RemoteSegmentStats existingStats) { + if (existingStats != null) { + this.uploadBytesStarted += existingStats.getUploadBytesStarted(); + this.uploadBytesSucceeded += existingStats.getUploadBytesSucceeded(); + this.uploadBytesFailed += existingStats.getUploadBytesFailed(); + this.downloadBytesStarted += existingStats.getDownloadBytesStarted(); + this.downloadBytesFailed += existingStats.getDownloadBytesFailed(); + this.downloadBytesSucceeded += existingStats.getDownloadBytesSucceeded(); + this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag()); + this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag()); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(uploadBytesStarted); + out.writeLong(uploadBytesFailed); + out.writeLong(uploadBytesSucceeded); + out.writeLong(downloadBytesStarted); + out.writeLong(downloadBytesFailed); + out.writeLong(downloadBytesSucceeded); + out.writeLong(maxRefreshTimeLag); + out.writeLong(maxRefreshBytesLag); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.REMOTE_STORE); + builder.startObject(Fields.UPLOAD); + builder.startObject(Fields.TOTAL_UPLOADS); + builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted)); + builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded)); + builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed)); + builder.endObject(); + builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag)); + builder.humanReadableField( + Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS, + Fields.MAX_REFRESH_SIZE_LAG, + new ByteSizeValue(maxRefreshBytesLag) + ); + builder.endObject(); + builder.startObject(Fields.DOWNLOAD); + builder.startObject(Fields.TOTAL_DOWNLOADS); + builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(downloadBytesStarted)); + builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded)); + builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed)); + builder.endObject(); + builder.endObject(); + builder.endObject(); + return builder; + } + + static final class Fields { + static final String REMOTE_STORE = "remote_store"; + static final String UPLOAD = "upload"; + static final String DOWNLOAD = "download"; + static final String TOTAL_UPLOADS = "total_uploads"; + static final String TOTAL_DOWNLOADS = "total_downloads"; + static final String STARTED = "started"; + static final String STARTED_BYTES = "started_bytes"; + static final String FAILED = "failed"; + static final String FAILED_BYTES = "failed_bytes"; + static final String SUCCEEDED = "succeeded"; + static final String SUCCEEDED_BYTES = "succeeded_bytes"; + static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag"; + static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis"; + static final String MAX_REFRESH_SIZE_LAG = "max_refresh_size_lag"; + static final String MAX_REFRESH_SIZE_LAG_IN_MILLIS = "max_refresh_size_lag_in_bytes"; + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 16559eeabfd9b..440f175aaf46d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -145,6 +145,7 @@ import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -332,6 +333,7 @@ Runnable getGlobalCheckpointSyncer() { private final Store remoteStore; private final BiFunction translogFactorySupplier; private final boolean isTimeSeriesIndex; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; private final List internalRefreshListener = new ArrayList<>(); @@ -544,6 +546,11 @@ public QueryCachingPolicy getQueryCachingPolicy() { return cachingPolicy; } + /** Only used for testing **/ + protected RemoteRefreshSegmentPressureService getRemoteRefreshSegmentPressureService() { + return remoteRefreshSegmentPressureService; + } + @Override public void updateShardState( final ShardRouting newRouting, @@ -1377,6 +1384,12 @@ public MergeStats mergeStats() { public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { SegmentsStats segmentsStats = getEngine().segmentsStats(includeSegmentFileSizes, includeUnloadedSegments); segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes()); + // Populate remote_store stats only if the index is remote store backed + if (indexSettings.isRemoteStoreEnabled()) { + segmentsStats.addRemoteSegmentStats( + new RemoteSegmentStats(remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId).stats()) + ); + } return segmentsStats; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 32f27f3a60e42..9c685a948dd5d 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,8 @@ package org.opensearch.action.admin.cluster.node.stats; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; @@ -44,6 +46,8 @@ import org.opensearch.http.HttpStats; import org.opensearch.core.indices.breaker.AllCircuitBreakerStats; import org.opensearch.core.indices.breaker.CircuitBreakerStats; +import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.indices.NodeIndicesStats; import org.opensearch.ingest.IngestStats; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.jvm.JvmStats; @@ -76,7 +80,7 @@ public class NodeStatsTests extends OpenSearchTestCase { public void testSerialization() throws IOException { - NodeStats nodeStats = createNodeStats(); + NodeStats nodeStats = createNodeStats(true); try (BytesStreamOutput out = new BytesStreamOutput()) { nodeStats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { @@ -436,11 +440,35 @@ public void testSerialization() throws IOException { assertEquals(weightedRoutingStats.getFailOpenCount(), deserializedWeightedRoutingStats.getFailOpenCount()); } + + NodeIndicesStats nodeIndicesStats = nodeStats.getIndices(); + NodeIndicesStats deserializedNodeIndicesStats = deserializedNodeStats.getIndices(); + if (nodeIndicesStats == null) { + assertNull(deserializedNodeIndicesStats); + } else { + RemoteSegmentStats remoteSegmentStats = nodeIndicesStats.getSegments().getRemoteSegmentStats(); + RemoteSegmentStats deserializedRemoteSegmentStats = deserializedNodeIndicesStats.getSegments().getRemoteSegmentStats(); + assertEquals(remoteSegmentStats.getDownloadBytesStarted(), deserializedRemoteSegmentStats.getDownloadBytesStarted()); + assertEquals( + remoteSegmentStats.getDownloadBytesSucceeded(), + deserializedRemoteSegmentStats.getDownloadBytesSucceeded() + ); + assertEquals(remoteSegmentStats.getDownloadBytesFailed(), deserializedRemoteSegmentStats.getDownloadBytesFailed()); + assertEquals(remoteSegmentStats.getUploadBytesStarted(), deserializedRemoteSegmentStats.getUploadBytesStarted()); + assertEquals(remoteSegmentStats.getUploadBytesSucceeded(), deserializedRemoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed()); + assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag()); + } } } } public static NodeStats createNodeStats() { + return createNodeStats(false); + } + + public static NodeStats createNodeStats(boolean remoteStoreStats) { DiscoveryNode node = new DiscoveryNode( "test_node", buildNewFakeTransportAddress(), @@ -718,11 +746,14 @@ public static NodeStats createNodeStats() { weightedRoutingStats = WeightedRoutingStats.getInstance(); weightedRoutingStats.updateFailOpenCount(); - // TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet + NodeIndicesStats indicesStats = getNodeIndicesStats(remoteStoreStats); + + // TODO: Only remote_store based aspects of NodeIndicesStats are being tested here. + // It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now return new NodeStats( node, randomNonNegativeLong(), - null, + indicesStats, osStats, processStats, jvmStats, @@ -747,6 +778,23 @@ public static NodeStats createNodeStats() { ); } + private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { + NodeIndicesStats indicesStats = null; + if (remoteStoreStats) { + indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>()); + RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); + remoteSegmentStats.addUploadBytesStarted(10L); + remoteSegmentStats.addUploadBytesSucceeded(10L); + remoteSegmentStats.addUploadBytesFailed(1L); + remoteSegmentStats.addDownloadBytesStarted(10L); + remoteSegmentStats.addDownloadBytesSucceeded(10L); + remoteSegmentStats.addDownloadBytesFailed(1L); + remoteSegmentStats.setMaxRefreshBytesLag(5L); + remoteSegmentStats.setMaxRefreshTimeLag(2L); + } + return indicesStats; + } + private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 36805f275d1b4..73bfc12bffb8d 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -124,6 +124,8 @@ import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.mapper.Uid; import org.opensearch.index.mapper.VersionFieldMapper; +import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -1813,6 +1815,31 @@ public Set getPendingDeletions() throws IOException { } } + public void testShardStatsWithRemoteStoreEnabled() throws IOException { + IndexShard shard = newStartedShard( + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT") + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build() + ); + RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteRefreshSegmentPressureService() + .getRemoteRefreshSegmentTracker(shard.shardId); + populateSampleRemoteStoreStats(remoteRefreshSegmentTracker); + ShardStats shardStats = new ShardStats( + shard.routingEntry(), + shard.shardPath(), + new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), + shard.commitStats(), + shard.seqNoStats(), + shard.getRetentionLeaseStats() + ); + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertEquals(remoteRefreshSegmentTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted()); + assertEquals(remoteRefreshSegmentTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(remoteRefreshSegmentTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed()); + closeShards(shard); + } + public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); // refresh on: finalize and end of recovery @@ -4874,4 +4901,10 @@ public void testRecordsForceMerges() throws IOException { assertThat(thirdForceMergeUUID, equalTo(secondForceMergeRequest.forceMergeUUID())); closeShards(shard); } + + private void populateSampleRemoteStoreStats(RemoteSegmentTransferTracker tracker) { + tracker.addUploadBytesStarted(10L); + tracker.addUploadBytesSucceeded(10L); + tracker.addUploadBytesFailed(10L); + } }