Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add per model metrics #86

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 74 additions & 28 deletions src/main/java/com/ibm/watson/modelmesh/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.stream.Stream;

import static com.ibm.watson.modelmesh.Metric.*;
import static com.ibm.watson.modelmesh.Metric.MetricType.*;
import static com.ibm.watson.modelmesh.ModelMesh.M;
import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_CUSTOM_ENV_VAR;
import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_METRICS_ENV_VAR;
Expand All @@ -56,14 +57,14 @@
*
*/
interface Metrics extends AutoCloseable {
boolean isPerModelMetricsEnabled();

boolean isEnabled();

void logTimingMetricSince(Metric metric, long prevTime, boolean isNano);

void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano);
void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId, String vModelId);

void logSizeEventMetric(Metric metric, long value);
void logSizeEventMetric(Metric metric, long value, String modelId, String vModelId);

void logGaugeMetric(Metric metric, long value);

Expand Down Expand Up @@ -101,7 +102,7 @@ default void logInstanceStats(final InstanceRecord ir) {
* @param respPayloadSize response payload size in bytes (or -1 if not applicable)
*/
void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize);
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId);

default void registerGlobals() {}

Expand All @@ -111,6 +112,11 @@ default void unregisterGlobals() {}
default void close() {}

Metrics NO_OP_METRICS = new Metrics() {
@Override
public boolean isPerModelMetricsEnabled() {
return false;
}

@Override
public boolean isEnabled() {
return false;
Expand All @@ -120,10 +126,10 @@ public boolean isEnabled() {
public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {}

@Override
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {}
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId, String vModelId){}

@Override
public void logSizeEventMetric(Metric metric, long value) {}
public void logSizeEventMetric(Metric metric, long value, String modelId, String vModelId){}

@Override
public void logGaugeMetric(Metric metric, long value) {}
Expand All @@ -136,7 +142,7 @@ public void logInstanceStats(InstanceRecord ir) {}

@Override
public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize) {}
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {}
};

final class PrometheusMetrics implements Metrics {
Expand All @@ -154,12 +160,14 @@ final class PrometheusMetrics implements Metrics {
private final CollectorRegistry registry;
private final NettyServer metricServer;
private final boolean shortNames;
private final boolean enablePerModelMetrics;
private final EnumMap<Metric, Collector> metricsMap = new EnumMap<>(Metric.class);

public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMetricParams) throws Exception {
int port = 2112;
boolean shortNames = true;
boolean https = true;
boolean enablePerModelMetrics = false;
String memMetrics = "all"; // default to all
for (Entry<String, String> ent : params.entrySet()) {
switch (ent.getKey()) {
Expand All @@ -170,6 +178,9 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
throw new Exception("Invalid metrics port: " + ent.getValue());
}
break;
case "per_model_metrics":
enablePerModelMetrics = "true".equalsIgnoreCase(ent.getValue());
break;
case "fq_names":
shortNames = !"true".equalsIgnoreCase(ent.getValue());
break;
Expand All @@ -188,6 +199,7 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
throw new Exception("Unrecognized metrics config parameter: " + ent.getKey());
}
}
this.enablePerModelMetrics = enablePerModelMetrics;

registry = new CollectorRegistry();
for (Metric m : Metric.values()) {
Expand Down Expand Up @@ -220,10 +232,15 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
}

if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME
|| m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) {
builder.labelNames("method", "code");
|| m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) {
if (this.enablePerModelMetrics && m.type != COUNTER_WITH_HISTO) {
builder.labelNames("method", "code", "modelId", "vModelId");
} else {
builder.labelNames("method", "code");
}
} else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) {
builder.labelNames("modelId", "vModelId");
}

Collector collector = builder.name(m.promName).help(m.description).create();
metricsMap.put(m, collector);
if (!m.global) {
Expand Down Expand Up @@ -251,7 +268,6 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet

this.metricServer = new NettyServer(registry, port, https);
this.shortNames = shortNames;

logger.info("Will expose " + (https ? "https" : "http") + " Prometheus metrics on port " + port
+ " using " + (shortNames ? "short" : "fully-qualified") + " method names");

Expand Down Expand Up @@ -330,6 +346,11 @@ public void close() {
this.metricServer.close();
}

@Override
public boolean isPerModelMetricsEnabled() {
return enablePerModelMetrics;
}

@Override
public boolean isEnabled() {
return true;
Expand All @@ -342,13 +363,21 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {
}

@Override
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {
((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed);
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId, String vModelId) {
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(metric)).labels(modelId, vModelId).observe(isNano ? elapsed / M : elapsed);
} else {
((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed);
}
}

@Override
public void logSizeEventMetric(Metric metric, long value) {
((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier);
public void logSizeEventMetric(Metric metric, long value, String modelId, String vModelId) {
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(metric)).labels(modelId, vModelId).observe(value * metric.newMultiplier);
} else {
((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier);
}
}

@Override
Expand All @@ -365,23 +394,35 @@ public void logCounterMetric(Metric metric) {

@Override
public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize) {
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {
final long elapsedMillis = elapsedNanos / M;
final Histogram timingHisto = (Histogram) metricsMap
.get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME);

String mId = vModelId == null ? modelId : vModelId;
int idx = shortNames ? name.indexOf('/') : -1;
final String methodName = idx == -1 ? name : name.substring(idx + 1);

timingHisto.labels(methodName, code.name()).observe(elapsedMillis);

String methodName = idx == -1 ? name : name.substring(idx + 1);
if (enablePerModelMetrics) {
timingHisto.labels(methodName, code.name(), mId, mId).observe(elapsedMillis);
} else {
timingHisto.labels(methodName, code.name()).observe(elapsedMillis);
}
if (reqPayloadSize != -1) {
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(reqPayloadSize);
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
.labels(methodName, code.name(), mId, mId).observe(reqPayloadSize);
} else {
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(reqPayloadSize);
}
}
if (respPayloadSize != -1) {
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(respPayloadSize);
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
.labels(methodName, code.name(), mId, mId).observe(respPayloadSize);
} else {
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(respPayloadSize);
}
}
}

Expand Down Expand Up @@ -437,6 +478,11 @@ protected StatsDSender createSender(Callable<SocketAddress> addressLookup, int q
+ (shortNames ? "short" : "fully-qualified") + " method names");
}

@Override
public boolean isPerModelMetricsEnabled() {
return false;
}

@Override
public boolean isEnabled() {
return true;
Expand All @@ -454,12 +500,12 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {
}

@Override
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId, String vModelId) {
client.recordExecutionTime(name(metric), isNano ? elapsed / M : elapsed);
}

@Override
public void logSizeEventMetric(Metric metric, long value) {
public void logSizeEventMetric(Metric metric, long value, String modelId, String vModelId) {
if (!legacy) {
value *= metric.newMultiplier;
}
Expand Down Expand Up @@ -497,7 +543,7 @@ static String[] getOkTags(String method, boolean shortName) {

@Override
public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize) {
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {
final StatsDClient client = this.client;
final long elapsedMillis = elapsedNanos / M;
final String countName = name(external ? API_REQUEST_COUNT : INVOKE_MODEL_COUNT);
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/com/ibm/watson/modelmesh/ModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -1966,7 +1966,7 @@ final synchronized boolean doRemove(final boolean evicted,
// "unload" event if explicit unloading isn't enabled.
// Otherwise, this gets recorded in a callback set in the
// CacheEntry.unload(int) method
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false);
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false, modelId, "");
metrics.logCounterMetric(Metric.UNLOAD_MODEL);
}
}
Expand Down Expand Up @@ -2037,7 +2037,7 @@ public void onSuccess(Boolean reallyHappened) {
//TODO probably only log if took longer than a certain time
long tookMillis = msSince(beforeNanos);
logger.info("Unload of " + modelId + " completed in " + tookMillis + "ms");
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false);
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false, modelId, "");
metrics.logCounterMetric(Metric.UNLOAD_MODEL);
}
// else considered trivially succeeded because the corresponding
Expand Down Expand Up @@ -2158,7 +2158,7 @@ public final void run() {
long queueStartTimeNanos = getAndResetLoadingQueueStartTimeNanos();
if (queueStartTimeNanos > 0) {
long queueDelayMillis = (nanoTime() - queueStartTimeNanos) / M;
metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis);
metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis, modelId, "");
// Only log if the priority value is "in the future" which indicates
// that there is or were runtime requests waiting for this load.
// Otherwise we don't care about arbitrary delays here
Expand Down Expand Up @@ -2228,7 +2228,7 @@ public final void run() {
loadingTimeStats(modelType).recordTime(tookMillis);
logger.info("Load of model " + modelId + " type=" + modelType + " completed in " + tookMillis
+ "ms");
metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false);
metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false, modelId, "");
metrics.logCounterMetric(Metric.LOAD_MODEL);
} catch (Throwable t) {
loadFuture = null;
Expand Down Expand Up @@ -2388,7 +2388,7 @@ protected final void complete(LoadedRuntime<T> result, Throwable error) {
if (size > 0) {
long sizeBytes = size * UNIT_SIZE;
logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes));
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes);
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId, "");
} else {
try {
long before = nanoTime();
Expand All @@ -2397,9 +2397,9 @@ protected final void complete(LoadedRuntime<T> result, Throwable error) {
long took = msSince(before), sizeBytes = size * UNIT_SIZE;
logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes)
+ " sizing took " + took + "ms");
metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false);
metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false, modelId, "");
// this is actually a size (bytes), not a "time"
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes);
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId, "");
}
} catch (Exception e) {
if (!isInterruption(e) && state == SIZING) {
Expand Down Expand Up @@ -2722,7 +2722,7 @@ protected void beforeInvoke(int requestWeight)
//noinspection ThrowFromFinallyBlock
throw new ModelNotHereException(instanceId, modelId);
}
metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false);
metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false, modelId, "");
}
}
}
Expand Down Expand Up @@ -2901,7 +2901,7 @@ public void onEviction(String key, CacheEntry<?> ce, long lastUsed) {
logger.info("Evicted " + (failed ? "failed model record" : "model") + " " + key
+ " from local cache, last used " + readableTime(millisSinceLastUsed) + " ago (" + lastUsed
+ "ms), invoked " + ce.getTotalInvocationCount() + " times");
metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false);
metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false, ce.modelId, "");
metrics.logCounterMetric(Metric.EVICT_MODEL);
}

Expand Down Expand Up @@ -3989,9 +3989,10 @@ else if (mr.getInstanceIds().containsKey(instanceId)) {
throw t;
} finally {
if (methodStartNanos > 0L && metrics.isEnabled()) {
String[] extraLabels = new String[]{modelId};
// only logged here in non-grpc (legacy) mode
metrics.logRequestMetrics(true, getRequestMethodName(method, args),
nanoTime() - methodStartNanos, metricStatusCode, -1, -1);
nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, "");
}
curThread.setName(threadNameBefore);
}
Expand Down Expand Up @@ -4450,7 +4451,7 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
long delayMillis = msSince(beforeNanos);
logger.info("Cache miss for model invocation, held up " + delayMillis + "ms");
metrics.logCounterMetric(Metric.CACHE_MISS);
metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false);
metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false, ce.modelId, "");
}
}
} else {
Expand Down Expand Up @@ -4528,7 +4529,7 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
ce.afterInvoke(weight, tookNanos);
if (code != null && metrics.isEnabled()) {
metrics.logRequestMetrics(false, getRequestMethodName(method, args),
tookNanos, code, -1, -1);
tookNanos, code, -1, -1, ce.modelId, "");
}
}
}
Expand Down
Loading