diff --git a/pkg/pcp/manifest.json b/pkg/pcp/manifest.json index eb12c1af4fd..0f390b8e646 100644 --- a/pkg/pcp/manifest.json +++ b/pkg/pcp/manifest.json @@ -1,11 +1,5 @@ { "requires": { "cockpit": "239" - }, - "bridges": [ - { - "match": { "payload": "metrics1" }, - "spawn": [ "${libexecdir}/cockpit-pcp" ] - } - ] + } } diff --git a/pyproject.toml b/pyproject.toml index 1f27970458c..ccc59c85df4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,10 @@ module = [ "libvirt_qemu", "pika", + # run without pcp module types + "pcp", + "cpmapi", + # run without gobject-introspection (used from cockpit-client for Gtk) "gi.*", @@ -158,6 +162,7 @@ ignore_names = [ "do_*", "pytest_*", "test[A-Z0-9]*", + "pytestmark", ] ignore_decorators = [ "@*.getter", @@ -200,6 +205,8 @@ wheel_build_env = pkg # All other environments (names like py311-lint, py36-pytest, etc) are isolated # from the system and get their packages from PyPI, according to the specific # test environment being requested. We build the wheel in a common environment. +# These tests will not run the PCP tests as there is no wheel available. +# https://github.com/performancecopilot/pcp/issues/2076 [testenv] package = wheel wheel_build_env = venv-pkg diff --git a/src/cockpit/channels/__init__.py b/src/cockpit/channels/__init__.py index cf255aa5430..14ff6442cf6 100644 --- a/src/cockpit/channels/__init__.py +++ b/src/cockpit/channels/__init__.py @@ -20,6 +20,7 @@ from .http import HttpChannel from .metrics import InternalMetricsChannel from .packages import PackagesChannel +from .pcp import PcpMetricsChannel from .stream import SocketStreamChannel, SubprocessStreamChannel from .trivial import EchoChannel, NullChannel @@ -35,6 +36,7 @@ InternalMetricsChannel, NullChannel, PackagesChannel, + PcpMetricsChannel, SubprocessStreamChannel, SocketStreamChannel, ] diff --git a/src/cockpit/channels/pcp.py b/src/cockpit/channels/pcp.py new file mode 100644 index 00000000000..4bfa6689696 --- /dev/null +++ b/src/cockpit/channels/pcp.py @@ -0,0 +1,675 @@ +# This file is part of Cockpit. +# +# Copyright (C) 2024 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# PCP Channel for Performance Co-pilot Metrics +# +# This is the replacement of the C implementation of cockpit-pcp, it uses the +# python pcp module (ctypes wrapper) to read PCP archives and directly connect +# to PCP daemons. +# +# Cockpit supports basically two different types of sources +# * archive - reads from a PCP archive(s) in either the default configured location `pcp-archive` or a given path. +# * pmcd / direct - connects to the pmcd daemon and reads metrics +# +# The PCP channel differs in the way it delivers data to the user, when +# requesting an archive, it is read in its entirety while when requesting other +# modes the channel will delivers metrics per given interval. +# +# Global channel options: +# * interval - the interval on which to deliver metrics +# * timestamp - timestamp of the first sample (only an option for archives) +# * limit - amount of samples to return, archive only option +# * omit-instances - multi-instances to not show (for example `lo` interface in network metrics) +# * instances - the multi-instances to show (for example only `/dev/sda`) +# +# Metrics +# +# When opening the metrics channel you can specify the metrics you are +# interested in, a PCP metric is described in pmapi as pmDesc: +# +# class pmDesc: +# pmid - unique ID of the metric +# type - data type (PM_TYPE_*) +# indom - the instance domain +# sem - semantics of the value +# units - dimension and units +# +# Important here are the type, Cockpit only supports PM_TYPE_DOUBLE, and integers as PM_TYPE_U64. +# +# The instance domain denotes if the metric is has multiple instances, for +# example a disk metric can represent data for multiple disks. +# +# See `pminfo -f "kernel.all.load"` for example: +# kernel.all.load +# inst [1 or "1 minute"] value 0.5 +# inst [5 or "5 minute"] value 0.68000001 +# inst [15 or "15 minute"] value 0.75999999 +# +# These metrics are delivered to the JavaScript client as a list of values, the +# meta message contains the list of names of instances for the UI to show. + +import asyncio +import ctypes +import glob +import json +import logging +import platform +import sys +import time +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, NamedTuple, Optional, Sequence, Union + +from cockpit.protocol import CockpitProblem + +from ..channel import AsyncChannel, ChannelError +from ..jsonutil import JsonObject, JsonValue, get_int, get_objv, get_str, get_strv + +if TYPE_CHECKING: # pragma: no cover + import cpmapi as c_api + from pcp import pmapi +else: + pmapi = None + c_api = None + +logger = logging.getLogger(__name__) + + +class MetricNotFoundError(CockpitProblem): + pass + + +class PcpMetricInfo(Dict[str, JsonValue]): + def __init__(self, value: JsonObject) -> None: + self.name = get_str(value, 'name') + self.derive = get_str(value, 'derive', '') + self.units = get_str(value, 'units', '') + super().__init__(name=self.name, derive=self.derive, units=self.units) + + +class MetricInfo(NamedTuple): + pmid: int + derive: str + desc: Any + name: str + factor: float + units: Any + instanced: bool # Multi instance metric + + +def try_import_pcp() -> None: + global c_api, pmapi + if c_api is None or pmapi is None: + try: + import cpmapi as c_api + from pcp import pmapi + except ImportError as exc: # pragma: no cover + raise ChannelError('not-supported', message='python3-pcp not installed') from exc + + +class ArchiveInfo: + metric_descriptions: List[MetricInfo] + + def __init__(self, context: 'pmapi.pmContext', start: float, path: str) -> None: + self.context = context + self.start = start + self.path = path + self.metric_descriptions = [] + + def sort_key(self) -> float: + return self.start + + +class PcpMetricsChannel(AsyncChannel): + payload = 'metrics1' + + pcp_dir: str + archive_batch: int + + context: 'pmapi.pmContext' + source: str + interval: int + start_timestamp: int + last_timestamp: float + next_timestamp: float + limit: int + last_samples: Any = None + last_results: 'pmapi.pmResult | None' = None + metric_descriptions: List[MetricInfo] + + def parse_options(self, options: JsonObject): + self.archive_batch = 60 + self.last_timestamp = 0 + self.next_timestamp = 0 + + max_size = sys.maxsize + min_size = -sys.maxsize - 1 + + self.interval = get_int(options, 'interval', 1000) + if self.interval <= 0 or self.interval > max_size: + raise ChannelError('protocol-error', message=f'invalid "interval" value: {self.interval}') + + self.start_timestamp = get_int(options, 'timestamp', 0) + if self.start_timestamp / 1000 < min_size or self.start_timestamp / 1000 > max_size: + raise ChannelError('protocol-error', message=f'invalid "timestamp" value: {self.start_timestamp}') + + if self.start_timestamp < 0: + self.start_timestamp = int((time.time() * 1000) + self.start_timestamp) + + self.metrics = get_objv(options, 'metrics', PcpMetricInfo) + self.limit = get_int(options, 'limit', max_size) + if self.limit <= 0 or self.limit > max_size: + raise ChannelError('protocol-error', message=f'invalid "limit" value: {self.limit}') + + self.instances = get_strv(options, 'instances', '') + self.omit_instances = get_strv(options, 'omit-instances', []) + self.source = get_str(options, 'source', '') + if self.source == '': + raise ChannelError('protocol-error', message='no "source" option specified for metrics channel') + + @staticmethod + def float_to_timeval(timestamp: float) -> 'pmapi.timeval': + sec = int(timestamp / 1000) + usec = int((timestamp % 1000) * 1000) + return pmapi.timeval(sec, usec) + + @staticmethod + def get_context_and_name(source: str) -> 'tuple[str, str]': + if source == "": + raise ChannelError('protocol-error', message='no "source" option specified for metrics channel') + elif source.startswith('/'): + name = source + context_type = c_api.PM_CONTEXT_ARCHIVE + elif source == 'pcp-archive': + hostname = platform.node() + archive_dir = pmapi.pmContext.pmGetConfig("PCP_LOG_DIR") + name = f'{archive_dir}/pmlogger/{hostname}' + context_type = c_api.PM_CONTEXT_ARCHIVE + elif source == 'direct': + name = source + context_type = c_api.PM_CONTEXT_LOCAL + elif source == 'pmcd': + name = 'local:' + context_type = c_api.PM_CONTEXT_HOST + else: + raise ChannelError('not-supported', + message=f'unsupported "source" option specified for metrics: {source}') + + return (name, context_type) + + def get_archives(self, name: str) -> Iterable[ArchiveInfo]: + archives = sorted(self.prepare_archives(name), key=ArchiveInfo.sort_key) + + if len(archives) == 0: + raise ChannelError('not-found') + + # Verify if the given metrics exist in the archive + for archive in archives: + for metric in self.metrics: + metric_desc = None + # HACK: Replicate C bridge behaviour, if a metric is not found + # just return an empty error. If we report anything with a + # message the metrics page won't re-try opening the metric archive. + try: + metric_desc = self.convert_metric_description(archive.context, metric) + except MetricNotFoundError: + raise ChannelError('') from None + + assert metric_desc is not None + archive.metric_descriptions.append(metric_desc) + + return archives + + def convert_metric_description(self, context: 'pmapi.pmContext', metric: JsonObject) -> MetricInfo: + name = get_str(metric, 'name', '') + if name == '': + raise ChannelError('protocol-error', + message='invalid "metrics" option was specified (no name for metric)') + units = get_str(metric, 'units', '') + derive = get_str(metric, 'derive', '') + + try: + pm_ids = context.pmLookupName(name) + except pmapi.pmErr as exc: + if exc.errno() == c_api.PM_ERR_NAME: + logger.error("no such metric: %s", name) + raise MetricNotFoundError('error', message=f'no such metric: {name}') from None + else: + raise ChannelError('internal-error', message=str(exc)) from None + + try: + pm_desc = context.pmLookupDesc(pm_ids[0]) + except pmapi.pmErr as exc: + if exc.errno() == c_api.PM_ERR_NAME: + raise ChannelError('not-found', message=f'no such metric: {name}') from None + else: + raise ChannelError('internal-error', message=str(exc)) from None + + # Multi-instance metrics have a domain defined + instanced = pm_desc.indom != c_api.PM_INDOM_NULL + + if instanced: + if len(self.instances) > 0: + context.pmDelProfile(pm_desc, None) + for instance in self.instances: + try: + instid = context.pmLookupInDom(pm_desc, instance) + context.pmAddProfile(pm_desc, instid) + except pmapi.pmErr as exc: + logger.debug("Unable to add profile: instance=%s err=%s", instance, exc) + + if len(self.omit_instances) > 0: + context.pmAddProfile(pm_desc, None) + for omit_instance in self.omit_instances: + try: + instid = context.pmLookupInDom(pm_desc, omit_instance) + context.pmDelProfile(pm_desc, [instid]) + except pmapi.pmErr as exc: + logger.debug("Unable to remove profile: instance=%s err=%s", omit_instance, exc) + + factor = 1.0 + pm_units = pm_desc.units + if units: + try: + [parsed_units, factor] = context.pmParseUnitsStr(units) + except pmapi.pmErr as exc: + if exc.errno() == c_api.PM_ERR_NAME: + raise ChannelError('not-found', message=f'no such metric: {name}') from None + else: + raise ChannelError('internal-error', message=str(exc)) from None + + self.try_convert_unit(context, pm_desc, pm_units) + + if units != parsed_units or factor != 1.0: + pm_units = parsed_units + + return MetricInfo(pmid=pm_ids[0], + name=name, + derive=derive, + desc=pm_desc, + factor=factor, + units=pm_units, + instanced=instanced) + + @staticmethod + def try_convert_unit(context, pm_desc, pm_units) -> None: + """Try to convert a dummy value to validate that the metric is convertible""" + dummy = pmapi.pmAtomValue() + dummy.d = 0.0 + try: + context.pmConvScale(c_api.PM_TYPE_DOUBLE, dummy, [pm_desc], 0, pm_units) + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + + @staticmethod + def prepare_archives(archive_dir: str) -> Iterable[ArchiveInfo]: + indexes = glob.glob(glob.escape(archive_dir) + '/*.index') + for archive_path in indexes: + logger.debug('opening archive: %r', archive_path) + try: + context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path) + log_label = context.pmGetArchiveLabel() + archive_start = float(log_label.start) * 1000 + yield ArchiveInfo(context, archive_start, archive_path) + except pmapi.pmErr as exc: + if exc.errno() != c_api.PM_ERR_LOGFILE: + raise ChannelError('not-found', message=f'could not read archive {archive_path}') from None + + @staticmethod + def semantic_val(sem_id: int) -> str: + if sem_id == c_api.PM_SEM_COUNTER: + return "counter" + elif sem_id == c_api.PM_SEM_INSTANT: + return "instant" + elif sem_id == c_api.PM_SEM_DISCRETE: + return "discrete" + + return "" + + def send_meta(self, results: 'pmapi.pmResult', context: 'pmapi.pmContext') -> None: + metrics = [] + + for metric_desc in self.metric_descriptions: + desc: Dict[str, Union[str, List[str]]] = {"name": metric_desc.name} + + if metric_desc.derive: + desc['derive'] = metric_desc.derive + + if metric_desc.factor == 1.0: + desc['units'] = str(metric_desc.units) + else: + desc['units'] = f"{context.pmUnitsStr(metric_desc.units)}*{1.0 / metric_desc.factor}" + + semantics = self.semantic_val(metric_desc.desc.sem) + if semantics != "": + desc['semantics'] = self.semantic_val(metric_desc.desc.sem) + + # We would like to use `context.pmGetInDom(indom=metric_desc.desc.indom)` + # This returns an set of ([instids], [names]) + # + # But we can't use this as it has no guarrentee for the instance ids order which we need. + if metric_desc.instanced: + insts: List[str] = [] + for i in range(results.contents.numpmid): + pmid = results.contents.get_pmid(i) + if metric_desc.pmid != pmid: + continue + + for j in range(results.contents.get_numval(i)): + value = results.contents.get_vlist(i, j) + instance_desc = context.pmNameInDom(metric_desc.desc, value.inst) + insts.append(instance_desc) + + desc['instances'] = insts + + metrics.append(desc) + + now = int(time.time()) * 1000 + timestamp = int(results.contents.timestamp.tv_sec * 1000 + + results.contents.timestamp.tv_usec / 1000) + self.send_json(source=self.source, + interval=self.interval, + timestamp=timestamp, + metrics=metrics, + now=now) + + def needs_meta_update(self, results: 'pmapi.pmResult') -> bool: + """ + If a multi-instance metric changes its instances we need to send a new + meta message when these change. For example when an drive or ethernet + card is removed out. + """ + + last_results = self.last_results + if last_results is None: + return True + + # PCP guarantees order of numpmid between results + for i in range(results.contents.numpmid): + if not self.metric_descriptions[i].instanced: + continue + + numval1 = results.contents.get_numval(i) + numval2 = last_results.contents.get_numval(i) + + if numval1 != numval2: + return True + + for j in range(numval1): + if results.contents.get_inst(i, j) != last_results.contents.get_inst(i, j): + return True + + return False + + def sample(self, context: 'pmapi.pmContext', archive_batch: int, limit: int, total_fetched: int) -> int: + # HACK: pmFetch only takes an array of ctypes.c_uint, no native type, alternative keep pmids. + pmids = (ctypes.c_uint * len(self.metric_descriptions))() + for i, metric in enumerate(self.metric_descriptions): + pmids[i] = metric.pmid + + while True: + fetched: List[Any] = [] + try: + for _ in range(archive_batch): + if total_fetched == limit: + # Direct sample type + if context.type != c_api.PM_CONTEXT_ARCHIVE: + return total_fetched + self.send_updates(fetched) + logger.debug('Reached limit "%s", stopping', self.limit) + return total_fetched + # Consider using the fetchGroup API https://pcp.readthedocs.io/en/latest/PG/PMAPI.html#fetchgroup-operation + results = context.pmFetch(pmids) + + # The metrics channel sends a meta message the first time + # we open the channel and whenever instanced metrics change. + if self.needs_meta_update(results): + # Flush all metrics and send new meta, but only if there is data + if fetched: + self.send_updates(fetched) + fetched.clear() + self.send_meta(results, context) + + fetched.append(self.parse_fetched_results(context, results)) + self.last_results = results + total_fetched += 1 + + self.send_updates(fetched) + fetched.clear() + except pmapi.pmErr as exc: + logger.debug('Fetching error: %r, fetched %r', exc, fetched) + if exc.errno() != c_api.PM_ERR_EOL: + raise ChannelError('internal-error', message=str(exc)) from None + + if len(fetched) > 0: + self.send_updates(fetched) + + break + + return total_fetched + + def parse_fetched_results(self, context: 'pmapi.pmContext', results: Any) -> Any: + metrics = list(self.metrics) + samples: Dict[str, Optional[float] | list[Optional[float]]] = {} + + samples['timestamp'] = float(results.contents.timestamp) + for i in range(results.contents.numpmid): + values: Optional[List[Optional[float]] | Optional[float]] = None + numval: int = results.contents.get_numval(i) + metric_desc = self.metric_descriptions[i] + content_type = metric_desc.desc.type + + # negative numval is an error code we ignore + if numval < 0: + samples[metrics[i].name] = None + continue + + # Unsupported types + if content_type == c_api.PM_TYPE_AGGREGATE or \ + content_type == c_api.PM_TYPE_EVENT or \ + content_type == c_api.PM_TYPE_STRING: + samples[metrics[i].name] = None + continue + + if not metric_desc.instanced: + values = self.build_sample(context, results, metric_desc, i, 0) + else: + vals: List[Optional[float]] = [] + for j in range(numval): + vals.append(self.build_sample(context, results, metric_desc, i, j)) + values = vals + + samples[metrics[i].name] = values + + return samples + + def build_sample(self, context, results, metric_desc: MetricInfo, metric: int, instance: int) -> Optional[float]: + pmid = results.contents.get_pmid(metric) + logger.debug("build_sample pmid=%d, metric=%d, instance=%d", pmid, metric, instance) + + # PCP throws an error when we try to convert a metric with numval <= + # instanceid, the C bridge returns NAN. We return the Python equivalent None. + valueset = results.contents.get_vset(metric) + if valueset.numval <= instance: + return None + + valfmt = results.contents.get_valfmt(metric) + value = results.contents.get_vlist(metric, instance) + content_type = metric_desc.desc.type + + # Make sure we keep the least 48 significant bits of 64 bit numbers + # since "delta" and "rate" derivation works on those, and the whole + # 64 don't fit into a double. + sample_value = None + atom = None + if content_type == c_api.PM_TYPE_64: + try: + atom = context.pmExtractValue(valfmt, + value, + c_api.PM_TYPE_64, + c_api.PM_TYPE_64) + sample_value = atom.ll & ((1 << 48) - 1) + except Exception as exc: + logger.exception("Unable to extract PCP TYPE_64 value %s", exc) + elif content_type == c_api.PM_TYPE_U64: + try: + atom = context.pmExtractValue(valfmt, + value, + c_api.PM_TYPE_U64, + c_api.PM_TYPE_U64) + sample_value = (atom.ull << 16) >> 16 + except Exception as exc: + logger.exception("Unable to extract PCP TYPE_U64 value %s", exc) + else: + try: + atom = context.pmExtractValue(valfmt, + value, + content_type, + c_api.PM_TYPE_DOUBLE) + sample_value = atom.d + except Exception as exc: + logger.exception("Unable to extract PCP value %s", exc) + + # If requested units don't match metrics convert them. + if metric_desc.desc.units != metric_desc.units: + try: + dummy = pmapi.pmAtomValue() + dummy.d = sample_value + converted_atom = context.pmConvScale(c_api.PM_TYPE_DOUBLE, + dummy, + [metric_desc.desc], + 0, + metric_desc.units) + sample_value = converted_atom.d * metric_desc.factor + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + + return sample_value + + # HACK: copied from internalmetrics + def calculate_sample_rate(self, value: float, old_value: Optional[float]) -> Any: + if old_value is not None: + return (value - old_value) / (self.next_timestamp - self.last_timestamp) + else: + return False + + def send_updates(self, samples: Sequence[Any]) -> None: + data: list[list[float | list[float]]] = [] + last_samples = self.last_samples or {} + + for sample in samples: + assert isinstance(sample['timestamp'], float) + self.next_timestamp = sample['timestamp'] + sampled_values: list[float | list[float]] = [] + + for metricinfo in self.metrics: + value = sample[metricinfo.name] + old_value = last_samples.get(metricinfo.name, None) + + if isinstance(value, list): + if metricinfo.derive == 'rate': + tmp = [] + for index, val in enumerate(value): + old_val = None + if old_value is not None: + try: + old_val = old_value[index] + except IndexError: + pass + tmp.append(self.calculate_sample_rate(val, old_val)) + sampled_values.append(tmp) + else: + sampled_values.append(value) + else: + if metricinfo.derive == 'rate' and value is not None: + sampled_values.append(self.calculate_sample_rate(value, old_value)) + else: + if value is None: + sampled_values.append(False) + else: + sampled_values.append(value) + + data.append(sampled_values) + self.last_timestamp = self.next_timestamp + last_samples = sample + + self.last_samples = last_samples + + self.send_data(json.dumps(data).encode()) + + def sample_archives(self, archives): + total_fetched = 0 + for i, archive in enumerate(archives): + # Set metric_descriptions to the current archive + self.metric_descriptions = archive.metric_descriptions + + # Reset last samples and results + self.last_results = None + self.last_samples = None + timestamp = self.start_timestamp + + # TODO can this be smarter? + # continue when current archive isn't last and next archive starts before timestamp + if i != len(archives) - 1 and archives[i + 1].start < timestamp: + continue + + if timestamp < archive.start: + timestamp = int(archive.start) + + context = archive.context + try: + context.pmSetMode(c_api.PM_MODE_INTERP | c_api.PM_XTB_SET(c_api.PM_TIME_MSEC), + self.float_to_timeval(timestamp), self.interval) + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + + total_fetched = self.sample(archive.context, self.archive_batch, self.limit, total_fetched) + + def prepare_direct_context(self, name: str, context_type: str) -> 'pmapi.pmContext': + try: + direct_context = pmapi.pmContext(context_type, name) + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + + for metric in self.metrics: + metric_desc = None + try: + metric_desc = self.convert_metric_description(direct_context, metric) + except MetricNotFoundError: + raise ChannelError('') from None + assert metric_desc is not None + self.metric_descriptions.append(metric_desc) + + return direct_context + + async def run(self, options: JsonObject) -> None: + self.metric_descriptions = [] + logger.debug('metrics pcp-archive open: %r, channel: %r', options, self.channel) + + self.parse_options(options) + try_import_pcp() + name, context_type = self.get_context_and_name(self.source) + + if context_type == c_api.PM_CONTEXT_ARCHIVE: + archives = self.get_archives(name) + self.ready() + self.sample_archives(archives) + else: + direct_context = self.prepare_direct_context(name, context_type) + self.ready() + + while True: + self.sample(direct_context, 1, 1, 0) + await asyncio.sleep(self.interval / 1000) diff --git a/test/browser/main.fmf b/test/browser/main.fmf index 0bd7e701a3e..c59da58a015 100644 --- a/test/browser/main.fmf +++ b/test/browser/main.fmf @@ -36,7 +36,7 @@ - cockpit - cockpit-storaged # for at least swap metrics on storage page - - cockpit-pcp + - python3-pcp # build/test infra dependencies - podman # required by tests @@ -61,7 +61,7 @@ - cockpit - cockpit-storaged # for at least swap metrics on storage page - - cockpit-pcp + - python3-pcp # build/test infra dependencies - podman # required by tests diff --git a/test/common/testlib.py b/test/common/testlib.py index 106b9ba3d29..73b084ba6fc 100644 --- a/test/common/testlib.py +++ b/test/common/testlib.py @@ -2016,6 +2016,9 @@ def login_and_go( "(direct|pcp-archive): instance name lookup failed:.*", "(direct|pcp-archive): couldn't create pcp archive context for.*", + # PCP Python bridge + "cockpit.channels.pcp-ERROR: no such metric: .*", + # timedatex.service shuts down after timeout, runs into race condition with property watching ".*org.freedesktop.timedate1: couldn't get all properties.*Error:org.freedesktop.DBus.Error.NoReply.*", ] diff --git a/test/pytest/test_bridge.py b/test/pytest/test_bridge.py index b285fb02232..da1907d734f 100644 --- a/test/pytest/test_bridge.py +++ b/test/pytest/test_bridge.py @@ -743,8 +743,10 @@ async def serve_page(reader, writer): args = {'spawn': ['cat']} else: args = {'unix': srv} - elif payload == 'metrics1': + elif payload == 'metrics1' and channeltype.restrictions: args['metrics'] = [{'name': 'memory.free'}] + elif payload == 'metrics1': + pytest.skip('no PCP metric data') elif payload == 'dbus-json3': if not os.path.exists('/run/dbus/system_bus_socket'): pytest.skip('no dbus') diff --git a/test/pytest/test_pcp.py b/test/pytest/test_pcp.py new file mode 100644 index 00000000000..5c1f0ff7d5c --- /dev/null +++ b/test/pytest/test_pcp.py @@ -0,0 +1,670 @@ +import argparse +import asyncio +import datetime +import json +import sys +import time +from typing import Iterable + +import pytest + +# Skip import when PCP is not available (for example in our tox env without system packages) +try: + from cpmapi import ( + PM_ID_NULL, + PM_INDOM_NULL, + PM_SEM_COUNTER, + PM_SEM_DISCRETE, + PM_SEM_INSTANT, + PM_TYPE_64, + PM_TYPE_DOUBLE, + PM_TYPE_STRING, + PM_TYPE_U32, + PM_TYPE_U64, + ) + from pcp import pmi +except ImportError: + pytestmark = pytest.mark.skip("PCP not available") + +from cockpit.bridge import Bridge + +from .mocktransport import MockTransport + + +@pytest.fixture +def bridge() -> Bridge: + bridge = Bridge(argparse.Namespace(privileged=False, beipack=False)) + bridge.superuser_bridges = list(bridge.superuser_rule.bridges) # type: ignore[attr-defined] + return bridge + + +@pytest.fixture +def no_init_transport(event_loop: asyncio.AbstractEventLoop, bridge: Bridge) -> Iterable[MockTransport]: + transport = MockTransport(bridge) + try: + yield transport + finally: + transport.stop(event_loop) + + +@pytest.fixture +def transport(no_init_transport: MockTransport) -> MockTransport: + no_init_transport.init() + return no_init_transport + + +@pytest.fixture +def broken_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('mock-archives') + + with open(pcp_dir / '0.index', 'w') as f: + f.write("not a pcp index file") + with open(pcp_dir / '0.meta', 'w') as f: + f.write("not a pcp meta file") + with open(pcp_dir / '0.0', 'w') as f: + f.write("not a pcp sample file") + + return pcp_dir + + +@pytest.fixture +def discrete_metric_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('discrete-archive') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_64, PM_INDOM_NULL, + PM_SEM_DISCRETE, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + + archive_1.pmiPutValue("mock.value", None, "4") + archive_1.pmiWrite(0, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def big_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('big-archive') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_64, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + for i in range(1000): + archive_1.pmiPutValue("mock.value", None, str(i)) + archive_1.pmiWrite(i, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('mock-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + archive_1.pmiPutValue("mock.value", None, "10") + archive_1.pmiWrite(0, 0) + archive_1.pmiPutValue("mock.value", None, "11") + archive_1.pmiWrite(1, 0) + archive_1.pmiPutValue("mock.value", None, "12") + archive_1.pmiWrite(2, 0) + archive_1.pmiEnd() + + archive_2 = pmi.pmiLogImport(f"{pcp_dir}/1") + archive_2.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, + PM_INDOM_NULL, PM_SEM_INSTANT, + archive_2.pmiUnits(0, 0, 0, 0, 0, 0)) + archive_2.pmiAddMetric("mock.late", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_2.pmiUnits(0, 0, 0, 0, 0, 0)) + archive_2.pmiPutValue("mock.value", None, "13") + archive_2.pmiPutValue("mock.late", None, "30") + archive_2.pmiWrite(3, 0) + archive_2.pmiPutValue("mock.value", None, "14") + archive_2.pmiPutValue("mock.late", None, "31") + archive_2.pmiWrite(4, 0) + archive_2.pmiPutValue("mock.value", None, "15") + archive_2.pmiPutValue("mock.late", None, "32") + archive_2.pmiWrite(5, 0) + archive_2.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def timestamps_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('timestamps-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + + timestamp = int(datetime.datetime.fromisoformat('2023-01-01').timestamp()) + archive_1.pmiPutValue("mock.value", None, "10") + archive_1.pmiWrite(timestamp, 0) + + timestamp = int(datetime.datetime.fromisoformat('2023-06-01').timestamp()) + archive_1.pmiPutValue("mock.value", None, "11") + archive_1.pmiWrite(timestamp, 0) + + timestamp = int(datetime.datetime.fromisoformat('2023-12-01').timestamp()) + archive_1.pmiPutValue("mock.value", None, "12") + archive_1.pmiWrite(timestamp, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def instances_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('instances-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + domain = 60 # Linux kernel + pmid = archive_1.pmiID(domain, 2, 0) + indom = archive_1.pmiInDom(domain, 2) + units = archive_1.pmiUnits(0, 0, 0, 0, 0, 0) + + archive_1.pmiAddMetric("kernel.all.load", pmid, PM_TYPE_DOUBLE, indom, + PM_SEM_INSTANT, units) + archive_1.pmiAddInstance(indom, "1 minute", 1) + archive_1.pmiAddInstance(indom, "5 minute", 5) + archive_1.pmiAddInstance(indom, "15 minute", 15) + + archive_1.pmiPutValue("kernel.all.load", "1 minute", "1.0") + archive_1.pmiPutValue("kernel.all.load", "5 minute", "5.0") + archive_1.pmiPutValue("kernel.all.load", "15 minute", "15.0") + archive_1.pmiWrite(0, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def instances_change_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('instances-change-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + domain = 60 # Linux kernel + pmid = archive_1.pmiID(domain, 2, 0) + indom = archive_1.pmiInDom(domain, 2) + units = archive_1.pmiUnits(0, 0, 0, 0, 0, 0) + + archive_1.pmiAddMetric("network.interface.total.bytes", pmid, PM_TYPE_U32, indom, + PM_SEM_COUNTER, units) + archive_1.pmiAddInstance(indom, "lo", 1) + archive_1.pmiAddInstance(indom, "eth0", 2) + archive_1.pmiAddInstance(indom, "eth1", 3) + + archive_1.pmiPutValue("network.interface.total.bytes", "lo", "1") + archive_1.pmiPutValue("network.interface.total.bytes", "eth0", "0") + archive_1.pmiPutValue("network.interface.total.bytes", "eth1", "1") + archive_1.pmiWrite(1597663539413, 0) + + archive_1.pmiPutValue("network.interface.total.bytes", "lo", "2") + archive_1.pmiPutValue("network.interface.total.bytes", "eth0", "1") + archive_1.pmiWrite(1597663539483, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def instances_rate_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('instances-rate-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + domain = 60 # Linux kernel + pmid = archive_1.pmiID(domain, 2, 0) + indom = archive_1.pmiInDom(domain, 2) + units = archive_1.pmiUnits(0, 0, 0, 0, 0, 0) + + archive_1.pmiAddMetric("disk.value", pmid, PM_TYPE_U32, indom, + PM_SEM_INSTANT, units) + archive_1.pmiAddInstance(indom, "sda", 1) + archive_1.pmiAddInstance(indom, "sdb", 2) + archive_1.pmiAddInstance(indom, "sdc", 3) + + archive_1.pmiPutValue("disk.value", "sda", "1") + archive_1.pmiPutValue("disk.value", "sdb", "1") + archive_1.pmiPutValue("disk.value", "sdc", "1") + archive_1.pmiWrite(0, 0) + + archive_1.pmiPutValue("disk.value", "sda", "1") + archive_1.pmiPutValue("disk.value", "sdb", "1") + archive_1.pmiPutValue("disk.value", "sdc", "2") + archive_1.pmiWrite(1, 0) + + archive_1.pmiPutValue("disk.value", "sda", "1") + archive_1.pmiPutValue("disk.value", "sdb", "10") + archive_1.pmiPutValue("disk.value", "sdc", "3") + archive_1.pmiWrite(2, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def unsupported_metric_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('unsupported-metric-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_STRING, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + + archive_1.pmiPutValue("mock.value", None, b"10") + archive_1.pmiWrite(0, 0) + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def mem_avail_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('mem-avail-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + # https://github.com/performancecopilot/pcp/blob/766a78e631998e97196eeed9cc36631f30add74b/src/collectl2pcp/metrics.c#L339 + # pminfo -m -f "mem.util.available" + domain = 60 # Linux kernel + pmid = archive_1.pmiID(domain, 1, 58) + units = archive_1.pmiUnits(1, 0, 0, 1, 0, 0) + + archive_1.pmiAddMetric("mem.util.available", pmid, PM_TYPE_U64, PM_INDOM_NULL, + PM_SEM_INSTANT, units) + + archive_1.pmiPutValue("mem.util.available", None, "19362828") + archive_1.pmiWrite(0, 0) + + # archive_1.pmiPutValue("mem.util.available", None, "19186852") + # archive_1.pmiWrite(0, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +def assert_metrics_meta(meta, source, timestamp=0, interval=1000): + assert meta['timestamp'] == timestamp + assert meta['interval'] == interval + assert meta['source'] == source + + +@pytest.mark.asyncio +async def test_pcp_open_error(transport, archive): + await transport.check_open('metrics1', source=str(archive), interval=-10, problem='protocol-error', + reply_keys={'message': 'invalid "interval" value: -10'}) + await transport.check_open('metrics1', problem='protocol-error', + reply_keys={'message': 'no "source" option specified for metrics channel'}) + await transport.check_open('metrics1', source="bazinga", problem='not-supported', + reply_keys={'message': 'unsupported "source" option specified for metrics: bazinga'}) + await transport.check_open('metrics1', source="/non-existant", problem='not-found') + await transport.check_open('metrics1', source=str(archive), + metrics=[{"name": ""}], + problem='protocol-error', + reply_keys={'message': 'invalid "metrics" option was specified (no name for metric)'}) + await transport.check_open('metrics1', source=str(archive), limit=-10, problem='protocol-error', + reply_keys={'message': 'invalid "limit" value: -10'}) + invalid_timestamp_size = sys.maxsize * 1000 + 3 + await transport.check_open('metrics1', source=str(archive), timestamp=invalid_timestamp_size, + problem='protocol-error', + reply_keys={'message': f'invalid "timestamp" value: {invalid_timestamp_size}'}) + await transport.check_open('metrics1', problem='protocol-error', + reply_keys={'message': 'no "source" option specified for metrics channel'}) + + +@pytest.mark.asyncio +async def test_pcp_open(transport, archive): + _ = await transport.check_open('metrics1', source=str(archive), + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + # C bridge + # {"timestamp":0,"now":1708092219642,"interval":1000, + # "metrics":[{"name":"mock.value","units":"","semantics":"instant"}]} + + assert_metrics_meta(meta, str(archive)) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + assert 'derive' not in metric + assert metric['semantics'] == 'instant' + + # assert_sample (tc, "[[10],[11],[12]]"); + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[10], [11], [12]] + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + assert 'derive' not in metric + assert metric['semantics'] == 'instant' + + # C bridge sends a META message per archive + + # assert_sample (tc, "[[13],[14],[15]]"); + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[13], [14], [15]] + + +@pytest.mark.asyncio +async def test_pcp_big_archive(transport, big_archive): + _ = await transport.check_open('metrics1', source=str(big_archive), + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(big_archive)) + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + assert 'derive' not in metric + assert metric['semantics'] == 'instant' + + _, data = await transport.next_frame() + data = json.loads(data) + # archives batch size is hardcoded to 60 + assert data == [[i] for i in range(60)] + + +@pytest.mark.asyncio +async def test_pcp_instances_option(transport, instances_archive): + _ = await transport.check_open('metrics1', source=str(instances_archive), + metrics=[{"name": "kernel.all.load"}], instances=["1 minute"]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(instances_archive)) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'kernel.all.load' + assert 'derive' not in metric + assert metric['semantics'] == 'instant' + assert metric['instances'] == ['1 minute'] + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[[1.0]]] + + +@pytest.mark.asyncio +async def test_pcp_omit_instances_option(transport, instances_archive): + _ = await transport.check_open('metrics1', source=str(instances_archive), + metrics=[{"name": "kernel.all.load"}], omit_instances=["1 minute"]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(instances_archive)) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'kernel.all.load' + assert 'derive' not in metric + assert metric['semantics'] == 'instant' + assert metric['instances'] == ['15 minute', '5 minute'] + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[[15.0, 5.0]]] + + +@pytest.mark.asyncio +async def test_pcp_instances(transport, instances_archive): + _ = await transport.check_open('metrics1', source=str(instances_archive), + metrics=[{"name": "kernel.all.load"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(instances_archive)) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'kernel.all.load' + assert 'derive' not in metric + assert metric['semantics'] == 'instant' + assert metric['instances'] == ['15 minute', '1 minute', '5 minute'] + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[[15.0, 1.0, 5.0]]] + + +@pytest.mark.asyncio +async def test_pcp_timestamp(transport, timestamps_archive): + timestamp = int(datetime.datetime.fromisoformat('2023-07-01').timestamp()) * 1000 + _ = await transport.check_open('metrics1', source=str(timestamps_archive), + metrics=[{"name": "mock.value"}], limit=1, + timestamp=timestamp) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(timestamps_archive), timestamp=timestamp) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + + # One exact sample at start timestamp + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[11.0]] + + +@pytest.mark.asyncio +async def test_pcp_negative_timestamp(transport, timestamps_archive): + """ Given a negative timestamp the current time is taken and substracted + with the given timestamp """ + + timestamp = int(datetime.datetime.fromisoformat('2023-07-01').timestamp()) * 1000 + relative_timestamp = int(time.time() * 1000) - timestamp + _ = await transport.check_open('metrics1', source=str(timestamps_archive), + metrics=[{"name": "mock.value"}], limit=1, + timestamp=-relative_timestamp) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + # time.time() is not exact + assert (meta['timestamp'] - timestamp) < 10 + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + + # One exact sample at start timestamp + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[11.0]] + + +@pytest.mark.asyncio +async def test_pcp_limit_archive(transport, big_archive): + _ = await transport.check_open('metrics1', source=str(big_archive), + limit=30, + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + assert_metrics_meta(meta, str(big_archive)) + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[i] for i in range(30)] + + _, data = await transport.next_frame() + data = json.loads(data) + assert data['command'] == 'close' + + +@pytest.mark.asyncio +async def test_pcp_broken_archive(transport, broken_archive): + await transport.check_open('metrics1', source=str(broken_archive), + metrics=[{"name": "mock.value", "derive": "rate"}], + problem='not-found', + reply_keys={'message': f'could not read archive {broken_archive}/0.index'}) + + +@pytest.mark.asyncio +async def test_pcp_instances_change(transport, instances_change_archive): + _ = await transport.check_open('metrics1', source=str(instances_change_archive), + metrics=[{"name": "network.interface.total.bytes"}], limit=2) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(instances_change_archive), 1597663539413000, 1000) + metrics = meta['metrics'][0] + assert metrics['instances'] == ['eth1', 'lo', 'eth0'] + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[[1, 1, 0]]] + + # eth1 is unplugged, new meta message + _, data = await transport.next_frame() + meta = json.loads(data) + assert_metrics_meta(meta, str(instances_change_archive), 1597663539414000, 1000) + metrics = meta['metrics'][0] + assert metrics['instances'] == ['lo', 'eth0'] + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[[1, 0]]] + + +@pytest.mark.asyncio +async def test_pcp_scale_memory_unit(transport, mem_avail_archive): + _ = await transport.check_open('metrics1', source=str(mem_avail_archive), + metrics=[{"name": "mem.util.available", "units": "bytes"}], limit=1) + + _, data = await transport.next_frame() + meta = json.loads(data) + + assert_metrics_meta(meta, str(mem_avail_archive), 0, 1000) + metric = meta['metrics'][0] + assert metric['name'] == "mem.util.available" + assert metric['units'] == "byte" + assert metric['semantics'] == "instant" + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[19827535872]] + + +@pytest.mark.asyncio +async def test_pcp_discrete_metric(transport, discrete_metric_archive): + _ = await transport.check_open('metrics1', source=str(discrete_metric_archive), + metrics=[{"name": "mock.value"}]) + _, data = await transport.next_frame() + + meta = json.loads(data) + assert_metrics_meta(meta, str(discrete_metric_archive), 0, 1000) + metric = meta['metrics'][0] + assert metric['name'] == "mock.value" + assert metric['units'] == "" + assert metric['semantics'] == "discrete" + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[4]] + + +@pytest.mark.asyncio +async def test_pcp_rate_metric(transport, archive): + _ = await transport.check_open('metrics1', source=str(archive), + metrics=[{"name": "mock.value", "derive": "rate"}]) + _, data = await transport.next_frame() + + meta = json.loads(data) + assert_metrics_meta(meta, str(archive)) + metric = meta['metrics'][0] + assert metric['name'] == "mock.value" + assert metric['units'] == "" + assert metric['semantics'] == "instant" + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[False], [1.0], [1.0]] + + # new archive, new meta + _, data = await transport.next_frame() + meta = json.loads(data) + assert_metrics_meta(meta, str(archive), timestamp=3000) + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[False], [1.0], [1.0]] + + +@pytest.mark.asyncio +async def test_pcp_rate_instances(transport, instances_rate_archive): + _ = await transport.check_open('metrics1', source=str(instances_rate_archive), + metrics=[{"name": "disk.value", "derive": "rate"}]) + _, data = await transport.next_frame() + + meta = json.loads(data) + assert_metrics_meta(meta, str(instances_rate_archive)) + metric = meta['metrics'][0] + assert metric['name'] == "disk.value" + assert metric['derive'] == "rate" + assert metric['semantics'] == "instant" + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[[False, False, False]], [[1.0, 0.0, 0.0]], [[1.0, 0.0, 9.0]]] + + _, data = await transport.next_frame() + data = json.loads(data) + print(data) + + +@pytest.mark.asyncio +async def test_pcp_unsupported_metric(transport, unsupported_metric_archive): + """Verify that the PCP channel does not crash on an unknown metric type""" + _ = await transport.check_open('metrics1', source=str(unsupported_metric_archive), + metrics=[{"name": "mock.value"}]) + _, data = await transport.next_frame() + + meta = json.loads(data) + assert_metrics_meta(meta, str(unsupported_metric_archive), 0, 1000) diff --git a/test/verify/check-metrics b/test/verify/check-metrics index 4985f923a29..8e6e32786cf 100755 --- a/test/verify/check-metrics +++ b/test/verify/check-metrics @@ -1282,7 +1282,14 @@ class TestMetricsPackages(packagelib.PackageCase): return if m.image.startswith("debian") or m.image.startswith("ubuntu"): - m.execute("dpkg --purge cockpit-pcp-dbgsym || true; dpkg --purge cockpit-pcp pcp redis redis-server") + # TODO: remove conditional when all images have python3-pcp and a Python PCP bridge + m.execute(""" + if dpkg -l python3-pcp; then + dpkg --purge cockpit-pcp-dbgsym || true; dpkg --purge python3-pcp cockpit-pcp pcp redis redis-server + else + dpkg --purge cockpit-pcp-dbgsym || true; dpkg --purge cockpit-pcp pcp redis redis-server + fi + """) # HACK: pcp does not clean up correctly on Debian https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=986074 m.execute("rm -f /etc/systemd/system/pmlogger.service.requires/pmlogger_farm.service") else: