Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Searchbackpressure Service Reader Added #426

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions null/data/batch_metrics_enabled.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
true
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public class MetricAttributes {
public HashSet<String> dimensionNames;

MetricAttributes(String unit, MetricDimension[] dimensions) {

this.unit = unit;
this.dimensionNames = new HashSet<String>();
for (MetricDimension dimension : dimensions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ public class MetricsModel {
MetricUnits.MILLISECOND.toString(),
AllMetrics.ShardIndexingPressureDimension.values()));

// Search Back Pressure Metrics
allMetricsInitializer.put(
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values()));
ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ public final class Version {
* Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
// Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
// Bumping this post the Commons
// Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service
// Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
// change
static final int RCA_MAJ_VERSION = 1;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.rca.framework.api.metrics;


import org.opensearch.performanceanalyzer.rca.framework.api.Metric;

public class SearchBackPressureStats extends Metric {
public SearchBackPressureStats(long evaluationIntervalSeconds) {
super("searchbp_shard_stats_cancellationCount", evaluationIntervalSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ public enum ReaderMetrics implements MeasurementSet {
"FaultDetectionMetricsEmitterExecutionTime",
"millis",
StatsType.LATENCIES,
Statistics.SUM);
Statistics.SUM),
SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME(
"SearchBackPressureMetricsEmitterExecutionTime",
"millis",
StatsType.LATENCIES,
Statistics.SUM),
;

/** What we want to appear as the metric name. */
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void construct() {
// Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds.
// This is resulting in this RCA not getting executed in every 5 seconds.
Rca<ResourceFlowUnit<HotNodeSummary>> threadMetricsRca =
new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
new ThreadMetricsRca(
threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
threadMetricsRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,120 @@ public static void emitGarbageCollectionInfo(
ReaderMetrics.GC_INFO_EMITTER_EXECUTION_TIME, mFinalT - mCurrT);
}

public static void emitSearchBackPressureMetrics(
MetricsDB metricsDB,
SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) {
long mCurrT = System.currentTimeMillis();
Result<Record> searchbp_records = searchBackPressureMetricsSnapShot.fetchAll();

// String SEARCHBP_MODE_DIM = "searchbp_mode";
String SEARCHBP_TYPE_DIM = "SearchBackPressureStats";
String SEARCHBP_TABLE_NAME = "searchbp_stats";

List<String> dims =
new ArrayList<String>() {
{
this.add(SEARCHBP_TYPE_DIM);
}
};

List<String> stats_types =
new ArrayList<String>() {
{
// Shard/Task Stats Cancellation Count
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_CANCELLATIONCOUNT
.toString());
// Shard Stats Resource Heap / CPU Usage
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG
.toString());
// Task Stats Resource Heap / CPU Usage
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX
.toString());
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG
.toString());
}
};

metricsDB.createMetric(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims);

BatchBindStep handle = metricsDB.startBatchPut(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims);

for (Record record : searchbp_records) {
for (String stats_type : stats_types) {
Optional<Object> tmpStatsObj = Optional.ofNullable(record.get(stats_type));
// LOG.info(stats_type + " is: " + tmpStatsObj.map(o ->
// Long.parseLong(o.toString())).toString());

handle.bind(
stats_type,
// the rest are agg fields: sum, avg, min, max which don't make sense for
// searchbackpressure
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L));
}
}

handle.execute();

long mFinalT = System.currentTimeMillis();
LOG.debug(
"Total time taken for writing Search Back Pressure info into metricsDB: {}",
mFinalT - mCurrT);
ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME,
mFinalT - mCurrT);
}

public static void emitAdmissionControlMetrics(
MetricsDB metricsDB, AdmissionControlSnapshot snapshot) {
long mCurrT = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class ReaderMetricsProcessor implements Runnable {
clusterManagerThrottlingMetricsMap;
private NavigableMap<Long, ShardStateMetricsSnapshot> shardStateMetricsMap;
private NavigableMap<Long, AdmissionControlSnapshot> admissionControlMetricsMap;
private NavigableMap<Long, SearchBackPressureMetricsSnapShot> searchBackPressureMetricsMap;

private static final int MAX_DATABASES = 2;
private static final int OS_SNAPSHOTS = 4;
Expand All @@ -81,6 +82,7 @@ public class ReaderMetricsProcessor implements Runnable {
private static final int GC_INFO_SNAPSHOTS = 4;
private static final int CLUSTER_MANAGER_THROTTLING_SNAPSHOTS = 2;
private static final int AC_SNAPSHOTS = 2;
private static final int SEARCH_BP_SNAPSHOTS = 4;
private final String rootLocation;

private final AppContext appContext;
Expand Down Expand Up @@ -125,6 +127,8 @@ public ReaderMetricsProcessor(
gcInfoMap = new TreeMap<>();
clusterManagerThrottlingMetricsMap = new TreeMap<>();
admissionControlMetricsMap = new TreeMap<>();
searchBackPressureMetricsMap = new TreeMap<>();

this.rootLocation = rootLocation;
this.configOverridesApplier = new ConfigOverridesApplier();

Expand Down Expand Up @@ -268,6 +272,7 @@ public void trimOldSnapshots() throws Exception {
trimMap(gcInfoMap, GC_INFO_SNAPSHOTS);
trimMap(clusterManagerThrottlingMetricsMap, CLUSTER_MANAGER_THROTTLING_SNAPSHOTS);
trimMap(admissionControlMetricsMap, AC_SNAPSHOTS);
trimMap(searchBackPressureMetricsMap, SEARCH_BP_SNAPSHOTS);

for (NavigableMap<Long, MemoryDBSnapshot> snap : nodeMetricsMap.values()) {
// do the same thing as OS_SNAPSHOTS. Eventually MemoryDBSnapshot
Expand Down Expand Up @@ -397,6 +402,7 @@ private void emitMetrics(long currWindowStartTime) throws Exception {
emitAdmissionControlMetrics(prevWindowStartTime, metricsDB);
emitClusterManagerMetrics(prevWindowStartTime, metricsDB);
emitClusterManagerThrottlingMetrics(prevWindowStartTime, metricsDB);
emitSearchBackPressureMetrics(prevWindowStartTime, metricsDB);

metricsDB.commit();
metricsDBMap.put(prevWindowStartTime, metricsDB);
Expand Down Expand Up @@ -594,6 +600,19 @@ private void emitClusterManagerThrottlingMetrics(
}
}

private void emitSearchBackPressureMetrics(long prevWindowStartTime, MetricsDB metricsDB)
throws Exception {
if (searchBackPressureMetricsMap.containsKey(prevWindowStartTime)) {
SearchBackPressureMetricsSnapShot prevSearchBPSnapShot =
searchBackPressureMetricsMap.get(prevWindowStartTime);
MetricsEmitter.emitSearchBackPressureMetrics(metricsDB, prevSearchBPSnapShot);
} else {
LOG.debug(
"Search Back Pressure snapshot does not exist for the previous window. "
+ "Not emitting metrics.");
}
}

/**
* OS, Request, Http and cluster_manager first aligns the currentTimeStamp with a 5 second
* interval. In the current format, a file (previously a directory) is written every 5 seconds.
Expand Down Expand Up @@ -679,6 +698,9 @@ is ready so it starts to read that file (go back two windows and
EventProcessor admissionControlProcessor =
AdmissionControlProcessor.build(
currWindowStartTime, conn, admissionControlMetricsMap);
EventProcessor searchBackPressureMetricsProcessor =
SearchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor(
currWindowStartTime, conn, searchBackPressureMetricsMap);

// The event dispatcher dispatches events to each of the registered event processors.
// In addition to event processing each processor has an initialize/finalize function that
Expand All @@ -702,6 +724,7 @@ is ready so it starts to read that file (go back two windows and
eventDispatcher.registerEventProcessor(faultDetectionProcessor);
eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor);
eventDispatcher.registerEventProcessor(admissionControlProcessor);
eventDispatcher.registerEventProcessor(searchBackPressureMetricsProcessor);

eventDispatcher.initializeProcessing(
currWindowStartTime, currWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL);
Expand Down
Loading
Loading