diff --git a/pyproject.toml b/pyproject.toml index 4a2951f65326..9c6d05cf4ceb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,10 @@ module = [ "libvirt_qemu", "pika", + # run without pcp module types + "pcp", + "cpmapi", + # run without gobject-introspection (used from cockpit-client for Gtk) "gi.*", @@ -193,6 +197,7 @@ wheel_build_env = pkg # All other environments (names like py311-lint, py36-pytest, etc) are isolated # from the system and get their packages from PyPI, according to the specific # test environment being requested. We build the wheel in a common environment. +# Skip PCP as the PyPi package provides no wheel, requires compilation and is outdated (5.0) [testenv] package = wheel wheel_build_env = venv-pkg @@ -208,6 +213,6 @@ deps = pytest: pytest-xdist allowlist_externals = test/static-code commands = - pytest: python3 -m pytest -opythonpath= {posargs} + pytest: python3 -m pytest -k 'not pcp' -opythonpath= {posargs} lint: test/static-code --tap """ diff --git a/src/cockpit/channels/__init__.py b/src/cockpit/channels/__init__.py index cf255aa54305..14ff6442cf64 100644 --- a/src/cockpit/channels/__init__.py +++ b/src/cockpit/channels/__init__.py @@ -20,6 +20,7 @@ from .http import HttpChannel from .metrics import InternalMetricsChannel from .packages import PackagesChannel +from .pcp import PcpMetricsChannel from .stream import SocketStreamChannel, SubprocessStreamChannel from .trivial import EchoChannel, NullChannel @@ -35,6 +36,7 @@ InternalMetricsChannel, NullChannel, PackagesChannel, + PcpMetricsChannel, SubprocessStreamChannel, SocketStreamChannel, ] diff --git a/src/cockpit/channels/pcp.py b/src/cockpit/channels/pcp.py new file mode 100644 index 000000000000..bb8471fbd915 --- /dev/null +++ b/src/cockpit/channels/pcp.py @@ -0,0 +1,543 @@ +# This file is part of Cockpit. +# +# Copyright (C) 2024 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import glob +import json +import logging +import platform +import sys +import time +from collections import defaultdict +from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Sequence + +from ..channel import AsyncChannel, ChannelError +from ..jsonutil import JsonObject, JsonValue, get_int, get_objv, get_str, get_strv + +if TYPE_CHECKING: + import cpmapi as c_api + from pcp import pmapi +else: + pmapi = None + c_api = None + +logger = logging.getLogger(__name__) + + +Sample = Mapping[str, float | list[float] | None] + + +class PcpMetricInfo(dict[str, JsonValue]): + def __init__(self, value: JsonObject) -> None: + self.name = get_str(value, 'name') + self.derive = get_str(value, 'derive', '') + super().__init__(name=self.name, derive=self.derive) + + +class MetricInfo(NamedTuple): + id: int + derive: str + desc: str + name: str + factor: float + units: str + units_bf: str + + +def try_import_pcp() -> None: + global c_api, pmapi + if c_api is None or pmapi is None: + try: + import cpmapi as c_api + from pcp import pmapi + except ImportError as exc: + raise ChannelError('not-supported', message='python3-pcp not installed') from exc + + +class ArchiveInfo: + def __init__(self, context: 'pmapi.pmContext', start: float, path: str) -> None: + self.context = context + self.start = start + self.path = path + + def sort_key(self) -> float: + return self.start + + def __repr__(self): + return f"ArchiveInfo({self.path})" + + +class PcpMetricsChannel(AsyncChannel): + payload = 'metrics1' + + pcp_dir: str + archive_batch: int = 60 + + context: 'pmapi.pmContext' + source: str + interval: int + need_meta: bool = True + start_timestamp: int + last_timestamp: float = 0 + next_timestamp: float = 0 + limit: int = 0 + last_samples: Sample | None = None + + @staticmethod + def float_to_timeval(timestamp: float) -> 'pmapi.timeval': + sec = int(timestamp / 1000) + usec = int((timestamp % 1000) * 1000) + return pmapi.timeval(sec, usec) + + @staticmethod + def get_context_and_name(source: str): + if source == "": + raise ChannelError('protocol-error', message='no "source" option specified for metrics channel') + elif source.startswith('/'): + name = source + context_type = c_api.PM_CONTEXT_ARCHIVE + elif source == 'pcp-archive': + hostname = platform.node() + archive_dir = f'{pmapi.pmContext.pmGetConfig("PCP_LOG_DIR")}/pmlogger/{hostname}' + name = f'{archive_dir}/pmlogger/{hostname}' + context_type = c_api.PM_CONTEXT_ARCHIVE + elif source == 'direct': + name = None + context_type = c_api.PM_CONTEXT_LOCAL + elif source == 'pmcd': + name = 'local:' + context_type = c_api.PM_CONTEXT_HOST + else: + raise ChannelError('not-supported', + message=f'unsupported "source" option specified for metrics: {source}') + + return (name, context_type) + + @staticmethod + def convert_metric_description(context: 'pmapi.pmContext', metric: JsonObject): + name = get_str(metric, 'name', '') + if not name: + raise ChannelError('protocol-error', + message='invalid "metrics" option was specified (no name for metric)') + units = get_str(metric, 'units', '') + derive = get_str(metric, 'derive', '') + print("metrics", metric) + + try: + pm_ids = context.pmLookupName(name) + except pmapi.pmErr as exc: + if exc.errno() == c_api.PM_ERR_NAME: + print('err', exc) + raise ChannelError('not-found', message=f'no such metric: {name}') from None + else: + raise ChannelError('internal-error', message=str(exc)) from None + + # TODO: optimise by using pmLookupDesc? + try: + pm_desc = context.pmLookupDesc(pm_ids[0]) + except pmapi.pmErr as exc: + if exc.errno() == c_api.PM_ERR_NAME: + raise ChannelError('not-found', message=f'no such metric: {name}') from None + else: + raise ChannelError('internal-error', message=str(exc)) from None + + # TODO: take care of this later... + if units: + try: + [units_buf, factor] = context.pmParseUnitsStr(units) + print(units_buf, factor) + except pmapi.pmErr as exc: + if exc.errno() == c_api.PM_ERR_NAME: + raise ChannelError('not-found', message=f'no such metric: {name}') from None + else: + raise ChannelError('internal-error', message=str(exc)) from None + else: + factor = 1.0 + units = pm_desc.units + + return MetricInfo(id=pm_ids[0], + name=name, + derive=derive, + desc=pm_desc, + factor=factor, + units=units, + units_bf="") + + @staticmethod + def prepare_archives(archive_dir: str) -> Iterable[ArchiveInfo]: + indexes = glob.glob(glob.escape(archive_dir) + '/*.index') + for archive_path in indexes: + logger.debug('opening archive: %r', archive_path) + try: + context = pmapi.pmContext(c_api.PM_CONTEXT_ARCHIVE, archive_path) + log_label = context.pmGetArchiveLabel() + archive_start = float(log_label.start) * 1000 + yield ArchiveInfo(context, archive_start, archive_path) + except pmapi.pmErr as exc: + if exc.errno() != c_api.PM_ERR_LOGFILE: + raise ChannelError('not-found', message=f'could not read archive {archive_path}') from None + + @staticmethod + def semantic_val(sem_id: int): + if sem_id == c_api.PM_SEM_COUNTER: + return "counter" + elif sem_id == c_api.PM_SEM_INSTANT: + return "instant" + elif sem_id == c_api.PM_SEM_DISCRETE: + return "discrete" + + def send_meta(self, archive) -> None: + # C build_meta in cockpitpcpmetrics.c + metrics = [] + for metric_desc in archive.metric_descriptions: + desc = {"name": metric_desc.name} + + if metric_desc.derive: + desc['derive'] = metric_desc.derive + + if metric_desc.factor == 1.0: + desc['units'] = str(metric_desc.units) # XXX: verify + else: + raise NotImplementedError('') + # gchar *name = g_strdup_printf + # ("%s*%g", pmUnitsStr(self->metrics[i].units), 1.0/self->metrics[i].factor); + + # if metric_desc.indom != c_api.PM_INDOM_NULL: + # pass + # # TODO: instances + + desc['semantic'] = self.semantic_val(metric_desc.desc.sem) + + metrics.append(desc) + + now = int(time.time()) + self.send_json(source=self.source, interval=self.interval, + timestamp=self.start_timestamp, metrics=metrics, + now=now * 1000) + + self.need_meta = False + + def parse_options(self, options: JsonObject): + max_size = sys.maxsize + min_size = -sys.maxsize - 1 + + self.interval = get_int(options, 'interval', 1000) + if self.interval <= 0 or self.interval > max_size: + raise ChannelError('protocol-error', message=f'invalid "interval" value: {self.interval}') + + self.start_timestamp = get_int(options, 'timestamp', 0) + if self.start_timestamp / 1000 < min_size or self.start_timestamp / 1000 > max_size: + raise ChannelError('protocol-error', message=f'invalid "timestamp" value: {self.start_timestamp}') + + # Timestamp is a negative number, calculate the time in epoch + if self.start_timestamp < 0: + self.start_timestamp = int((time.time() * 1000) + self.start_timestamp) + + self.metrics = get_objv(options, 'metrics', PcpMetricInfo) + self.limit = get_int(options, 'limit', max_size) + if self.limit <= 0 or self.limit > max_size: + raise ChannelError('protocol-error', message=f'invalid "limit" value: {self.limit}') + + self.instances = get_strv(options, 'instances', '') + self.omit_instances = get_strv(options, 'omit-instances', []) + self.source = get_str(options, 'source', '') + if self.source == '': + raise ChannelError('protocol-error', message='no "source" option specified for metrics channel') + + def sample(self, archive, total_fetched): + context = archive.context + + # HACK: this is some utter sillyness, maybe we can construct our own pcp.pmapi.c_uint_Array_1 + # pmids = [metric.id for metric in metric_descriptions] + pmids = context.pmLookupName([metric.name for metric in self.metrics]) + descs = context.pmLookupDescs(pmids) + + while True: + fetched = [] + try: + for _ in range(self.archive_batch): + if total_fetched == self.limit: + self.send_updates(archive, fetched) + logger.debug('Reached limit "%s", stopping', self.limit) + return total_fetched + # Consider using the fetchGroup API https://pcp.readthedocs.io/en/latest/PG/PMAPI.html#fetchgroup-operation + # HACK: This is some pcp weirdness where it only accepts a PCP type list and not a Python list + # PMIDS + results = context.pmFetch(pmids) + fetched.append(self.parse_fetched_results(context, results, descs)) + total_fetched += 1 + + self.send_updates(archive, fetched) + fetched.clear() + except pmapi.pmErr as exc: + logger.debug('Fetching error: %r, fetched %r', exc, fetched) + if exc.errno() != c_api.PM_ERR_EOL: + raise ChannelError('internal-error', message=str(exc)) from None + + if len(fetched) > 0: + self.send_updates(archive, fetched) + + break + + return total_fetched + + def parse_fetched_results(self, context: 'pmapi.pmContext', results: Any, descs: Any) -> Sample: + metrics = list(self.metrics) + samples: dict[str, float | list[float]] = {} + + samples['timestamp'] = float(results.contents.timestamp) + for i in range(results.contents.numpmid): + valueset = results.contents.get_vset(i) + values: dict[str, float] | float = defaultdict() + numval: int = results.contents.get_numval(i) + + # negative numval is an error code we ignore + if numval < 0: + pass # continue? + # TODO: don't pass descs, look up via archive.metrics_descriptions? + elif descs[i].indom == c_api.PM_INDOM_NULL: # Single instance + values = self.build_sample(context, results, descs, i, 0) + else: # Multi instance + vals = [] + print("CONTENT", dir(valueset)) + print("bonk", valueset.numval, valueset.contents.vlist, descs) + for j in range(numval - 1): + vals.append(self.build_sample(context, results, descs, i, j)) + values = vals + # raise NotImplementedError('multi value handling, see C code') + + samples[metrics[i].name] = values + # values: dict[str, float] | float = defaultdict() + # instances: list[str] | None = None + # value_count = results.contents.get_numval(i) + # + # if value_count > 1: + # _, instances = context.pmGetInDom(indom=descs[i].contents.indom) + # + # content_type = descs[i].contents.type + # print(value_count, instances, content_type) + # for j in range(value_count): + # atom = context.pmExtractValue(results.contents.get_valfmt(i), + # results.contents.get_vlist(i, j), + # content_type, + # content_type) + # + # if value_count > 1: + # assert isinstance(instances, list) + # assert isinstance(values, dict) + # values[instances[j]] = atom.dref(content_type) + # else: + # # TODO does float() need to be here? + # values = float(atom.dref(content_type)) + # + # samples[metrics[i].name] = values + + return samples + + def build_sample(self, context, results, descs, metric, instance): + try: + desc = descs[metric] + except IndexError: + logging.debug("no description found for metric=%s", metric) + return + + # Unsupported type + content_type = desc.type + # TODO: PM_TYPE_AGGREGATE_FULL? or PM_TYPE_STRING? + if content_type == c_api.PM_TYPE_AGGREGATE or content_type == c_api.PM_TYPE_EVENT: + return + + valueset = results.contents.get_vset(metric) + if valueset.numval <= instance: + return + # TODO: This check seems to be there for multi value types as the C code passes `j` along. + # if (result->vset[metric]->numval <= instance) + # return; + + valfmt = results.contents.get_valfmt(metric) + print("contexts length", valueset.contents.vlist) + value = results.contents.get_vlist(metric, instance) + print("build_sample", valfmt, value, content_type) + sample_value = None + if content_type == c_api.PM_TYPE_64: + atom = context.pmExtractValue(valfmt, + value, + c_api.PM_TYPE_64, + c_api.PM_TYPE_64) + sample_value = (atom.ll << 16) >> 16 + elif content_type == c_api.PM_TYPE_U64: + atom = context.pmExtractValue(valfmt, + value, + c_api.PM_TYPE_64, + c_api.PM_TYPE_64) + sample_value = (atom.ull << 16) >> 16 + else: + try: + atom = context.pmExtractValue(valfmt, + value, + content_type, + c_api.PM_TYPE_DOUBLE) + except Exception as exc: + print("BORK", exc) + + sample_value = atom.d + # print(atom.dref(content_type)) + + # TODO: handle the case where requested units are != pcp given units + # and scale them using pmConvScale + return sample_value + + def calculate_sample_rate(self, value: float, old_value: float | None) -> float | bool: + if old_value is not None and self.last_timestamp: + return (value - old_value) / (self.next_timestamp - self.last_timestamp) + else: + return False + + def send_updates(self, archive, samples: Sequence[Sample]) -> None: + # data: List[List[Union[float, List[Optional[Union[float, bool]]]]]] = [] + data: list[list[float | list[float]]] = [] + last_samples = self.last_samples or {} + print(samples, self.metrics) + + for sample in samples: + assert isinstance(sample['timestamp'], float) + self.next_timestamp = sample['timestamp'] + sampled_values: list[float | list[float]] = [] + for metricinfo in self.metrics: + value = sample[metricinfo.name] + old_value = last_samples.get(metricinfo.name, None) + + logger.debug('old %r new %r', old_value, value) + + if isinstance(value, list): + # TODO Multi value instances? + pass + elif isinstance(value, Mapping): + # If the old value wasn't an equivalent a mapping, we need a meta + if not isinstance(old_value, Mapping) or value.keys() != old_value.keys(): + self.need_meta = True + old_value = {} + + if metricinfo.derive == 'rate': + instances = tuple(self.calculate_sample_rate(value[key], old_value.get(key)) for key in value) + sampled_values.append(instances) + else: + sampled_values.append(tuple(value.values())) + else: + assert isinstance(value, float) + + # If the old value was a mapping, we need a meta + if isinstance(old_value, Mapping): + self.need_meta = True + old_value = None + + if metricinfo.derive == 'rate': + sampled_values.append(self.calculate_sample_rate(value, old_value)) + else: + sampled_values.append(value) + + data.append(sampled_values) + self.last_timestamp = self.next_timestamp + last_samples = sample + + if self.need_meta: + self.send_meta(archive) + + self.last_samples = last_samples + self.send_data(json.dumps(data).encode()) + + def sample_archives(self, archives): + total_fetched = 0 + for i, archive in enumerate(archives): + timestamp = self.start_timestamp + + # TODO can this be smarter? + # continue when curent archive isn't last and next archive starts before timestamp + if i != len(archives) - 1 and archives[i + 1].start < timestamp: + continue + + if timestamp < archive.start: + timestamp = int(archive.start) + + context = archive.context + logger.debug('timestamp: %r', timestamp) + logger.debug('archive_start: %r', archive.start) + logger.debug('archive_end: %r', context.pmGetArchiveEnd()) + try: + context.pmSetMode(c_api.PM_MODE_INTERP | c_api.PM_XTB_SET(c_api.PM_TIME_MSEC), + self.float_to_timeval(timestamp), self.interval) + except pmapi.pmErr as exc: + raise ChannelError('internal-error', message=str(exc)) from None + + total_fetched = self.sample(archive, total_fetched) + if total_fetched == self.limit: + return True + else: + return True + + async def run(self, options: JsonObject) -> None: + logger.debug('metrics pcp-archive open: %r, channel: %r', options, self.channel) + + self.parse_options(options) + + try_import_pcp() + + name, context_type = self.get_context_and_name(self.source) + archives = [] + + if context_type == c_api.PM_CONTEXT_ARCHIVE: + archives = sorted(self.prepare_archives(name), key=ArchiveInfo.sort_key) + else: # host/local + ... + + if len(archives) == 0: + raise ChannelError('not-found') + + # Verify all metrics + for archive in archives: + archive.metric_descriptions = [] + for metric in self.metrics: + metric_desc = self.convert_metric_description(archive.context, metric) + archive.metric_descriptions.append(metric_desc) + + # TODO: port from prepare_current_context + # Basically this filters the given instances/omitted instances + if metric_desc.desc.indom != c_api.PM_INDOM_NULL: + if self.instances: + ... + elif self.omit_instances: + ... + + self.ready() + + self.sample_archives(archives) + + self.done() + + # This should be try to read the BATCH_SIZE, then wait on an interval and repeat until EOL + # while True: + # + # if all_read: + # return + # + # try: + # await asyncio.wait_for(self.read(), self.interval / 1000) + # except asyncio.TimeoutError: + # # Continue the while loop, we use wait_for as an interval timer. + # continue + # + # # self.send_meta() + # # construct a meta message diff --git a/test/pytest/test_bridge.py b/test/pytest/test_bridge.py index 8773c3c11f5c..1b18e9933186 100644 --- a/test/pytest/test_bridge.py +++ b/test/pytest/test_bridge.py @@ -743,8 +743,10 @@ async def serve_page(reader, writer): args = {'spawn': ['cat']} else: args = {'unix': srv} - elif payload == 'metrics1': + elif payload == 'metrics1' and channeltype.restrictions: args['metrics'] = [{'name': 'memory.free'}] + elif payload == 'metrics1': + pytest.skip('no PCP metric data') elif payload == 'dbus-json3': if not os.path.exists('/run/dbus/system_bus_socket'): pytest.skip('no dbus') diff --git a/test/pytest/test_pcp.py b/test/pytest/test_pcp.py new file mode 100644 index 000000000000..cf93f706c3d9 --- /dev/null +++ b/test/pytest/test_pcp.py @@ -0,0 +1,364 @@ +import argparse +import asyncio +import datetime +import json +import time +from typing import Iterable + +import pytest + +# Skip tests when PCP is not available (for example in our tox env) +try: + from cpmapi import PM_ID_NULL, PM_INDOM_NULL, PM_SEM_INSTANT, PM_TYPE_U32 + from pcp import pmi +except ImportError: + import unittest + unittest.skip("PCP not available") + +from cockpit.bridge import Bridge + +from .mocktransport import MockTransport + + +# HACK: copied +@pytest.fixture +def bridge() -> Bridge: + bridge = Bridge(argparse.Namespace(privileged=False, beipack=False)) + bridge.superuser_bridges = list(bridge.superuser_rule.bridges) # type: ignore[attr-defined] + return bridge + + +# HACK: copied +@pytest.fixture +def no_init_transport(event_loop: asyncio.AbstractEventLoop, bridge: Bridge) -> Iterable[MockTransport]: + transport = MockTransport(bridge) + try: + yield transport + finally: + transport.stop(event_loop) + + +# HACK: copied +@pytest.fixture +def transport(no_init_transport: MockTransport) -> MockTransport: + no_init_transport.init() + return no_init_transport + + +@pytest.fixture +def broken_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('mock-archives') + + with open(pcp_dir / '0.index', 'w') as f: + f.write("not a pcp index file") + with open(pcp_dir / '0.meta', 'w') as f: + f.write("not a pcp meta file") + with open(pcp_dir / '0.0', 'w') as f: + f.write("not a pcp sample file") + + return pcp_dir + + +@pytest.fixture +def big_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('big-archive') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + for i in range(1000): + archive_1.pmiPutValue("mock.value", None, str(i)) + archive_1.pmiWrite(i, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('mock-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + archive_1.pmiPutValue("mock.value", None, "10") + archive_1.pmiWrite(0, 0) + archive_1.pmiPutValue("mock.value", None, "11") + archive_1.pmiWrite(1, 0) + archive_1.pmiPutValue("mock.value", None, "12") + archive_1.pmiWrite(2, 0) + archive_1.pmiEnd() + + archive_2 = pmi.pmiLogImport(f"{pcp_dir}/1") + archive_2.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, + PM_INDOM_NULL, PM_SEM_INSTANT, + archive_2.pmiUnits(0, 0, 0, 0, 0, 0)) + archive_2.pmiAddMetric("mock.late", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_2.pmiUnits(0, 0, 0, 0, 0, 0)) + archive_2.pmiPutValue("mock.value", None, "13") + archive_2.pmiPutValue("mock.late", None, "30") + archive_2.pmiWrite(3, 0) + archive_2.pmiPutValue("mock.value", None, "14") + archive_2.pmiPutValue("mock.late", None, "31") + archive_2.pmiWrite(4, 0) + archive_2.pmiPutValue("mock.value", None, "15") + archive_2.pmiPutValue("mock.late", None, "32") + archive_2.pmiWrite(5, 0) + archive_2.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def timestamps_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('timestamps-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + + timestamp = int(datetime.datetime.fromisoformat('2023-01-01').timestamp()) + archive_1.pmiPutValue("mock.value", None, "10") + archive_1.pmiWrite(timestamp, 0) + + timestamp = int(datetime.datetime.fromisoformat('2023-06-01').timestamp()) + archive_1.pmiPutValue("mock.value", None, "11") + archive_1.pmiWrite(timestamp, 0) + + timestamp = int(datetime.datetime.fromisoformat('2023-12-01').timestamp()) + archive_1.pmiPutValue("mock.value", None, "12") + archive_1.pmiWrite(timestamp, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +@pytest.fixture +def instances_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('instances-archives') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + + domain = 60 # Linux kernel + pmid = archive_1.pmiID(domain, 2, 0) + indom = archive_1.pmiInDom(domain, 2) + units = archive_1.pmiUnits(0, 0, 0, 0, 0, 0) + + archive_1.pmiAddMetric("kernel.all.load", pmid, PM_TYPE_U32, indom, + PM_SEM_INSTANT, units) + archive_1.pmiAddInstance(indom, "1 minute", 1) + archive_1.pmiAddInstance(indom, "5 minute", 5) + archive_1.pmiAddInstance(indom, "15 minute", 15) + + # create a record + archive_1.pmiPutValue("kernel.all.load", "1 minute", "1") + archive_1.pmiPutValue("kernel.all.load", "5 minute", "5") + archive_1.pmiPutValue("kernel.all.load", "15 minute", "15") + archive_1.pmiWrite(0, 0) + + archive_1.pmiEnd() + + return pcp_dir + + +def assert_metrics_meta(meta, source, timestamp=0, interval=1000): + assert meta['timestamp'] == timestamp + assert meta['interval'] == interval + assert meta['source'] == source + + +@pytest.mark.asyncio +async def test_pcp_open_error(transport, archive): + await transport.check_open('metrics1', source=str(archive), interval=-10, problem='protocol-error', + reply_keys={'message': 'invalid "interval" value: -10'}) + await transport.check_open('metrics1', problem='protocol-error', + reply_keys={'message': 'no "source" option specified for metrics channel'}) + await transport.check_open('metrics1', source="bazinga", problem='not-supported', + reply_keys={'message': 'unsupported "source" option specified for metrics: bazinga'}) + await transport.check_open('metrics1', source="/non-existant", problem='not-found') + await transport.check_open('metrics1', source=str(archive), + metrics=[{"name": "mock.blah", "derive": "rate"}], + problem='not-found', + reply_keys={'message': 'no such metric: mock.blah'}) + + +@pytest.mark.asyncio +async def test_pcp_open(transport, archive): + ch = await transport.check_open('metrics1', source=str(archive), + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + # C bridge + # {"timestamp":0,"now":1708092219642,"interval":1000, + # "metrics":[{"name":"mock.value","units":"","semantics":"instant"}]} + + assert_metrics_meta(meta, str(archive)) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + assert 'derive' not in metric + assert metric['semantic'] == 'instant' + + # assert_sample (tc, "[[10],[11],[12]]"); + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[10], [11], [12]] + + # C bridge sends a META message per archive + + # assert_sample (tc, "[[13],[14],[15]]"); + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[13], [14], [15]] + + transport.check_close(channel=ch) + + +@pytest.mark.asyncio +async def test_pcp_big_archive(transport, big_archive): + _ = await transport.check_open('metrics1', source=str(big_archive), + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(big_archive)) + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + assert 'derive' not in metric + assert metric['semantic'] == 'instant' + + _, data = await transport.next_frame() + data = json.loads(data) + # archives batch size is hardcoded to 60 + # TODO import batch size? + assert data == [[i] for i in range(60)] + + +@pytest.mark.asyncio +async def xtest_pcp_instances(transport, instances_archive): + # {"timestamp":0,"now":1708527691229,"interval":1000,"metrics":[{"name":"kernel.all.load", + # "instances":["15 minute","1 minute","5 minute"],"units":"","semantics":"instant"}]} + # [[[15,1,5]]]36 + + # ch1 + # {"timestamp":0,"now":1713451658090,"interval":1000, + # "metrics":[{"name":"kernel.all.load","instances":["15 minute","1 minute","5 minute"], + # "units":"","semantics":"instant"}]}16 + # ch1 + # [[[15,1,5]]]36 + _ = await transport.check_open('metrics1', source=str(instances_archive), + metrics=[{"name": "kernel.all.load"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + print(meta) + + assert_metrics_meta(meta, str(instances_archive)) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'kernel.all.load' + assert 'derive' not in metric + assert metric['semantic'] == 'instant' + assert metric['instances'] == ['15 minute', '1 minute', '5 minute'] + + _, data = await transport.next_frame() + data = json.loads(data) + print("data", data) + + +@pytest.mark.asyncio +async def test_pcp_timestamp(transport, timestamps_archive): + timestamp = int(datetime.datetime.fromisoformat('2023-07-01').timestamp()) * 1000 + ch = await transport.check_open('metrics1', source=str(timestamps_archive), + metrics=[{"name": "mock.value"}], limit=1, + timestamp=timestamp) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + assert_metrics_meta(meta, str(timestamps_archive), timestamp=timestamp) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + + # One exact sample at start timestamp + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[11.0]] + + transport.check_close(channel=ch) + + +@pytest.mark.asyncio +async def test_pcp_negative_timestamp(transport, timestamps_archive): + """ Given a negative timestamp the current time is taken and substracted + with the given timestamp """ + + timestamp = int(datetime.datetime.fromisoformat('2023-07-01').timestamp()) * 1000 + relative_timestamp = int(time.time() * 1000) - timestamp + ch = await transport.check_open('metrics1', source=str(timestamps_archive), + metrics=[{"name": "mock.value"}], limit=1, + timestamp=-relative_timestamp) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + # time.time() is not exact + assert (meta['timestamp'] - timestamp) < 10 + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + + # One exact sample at start timestamp + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[11.0]] + + transport.check_close(channel=ch) + + +@pytest.mark.asyncio +async def test_pcp_limit_archive(transport, big_archive): + + ch = await transport.check_open('metrics1', source=str(big_archive), + limit=30, + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + assert_metrics_meta(meta, str(big_archive)) + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[i] for i in range(30)] + + transport.check_close(channel=ch) + + +@pytest.mark.asyncio +async def test_pcp_broken_archive(transport, broken_archive): + await transport.check_open('metrics1', source=str(broken_archive), + metrics=[{"name": "mock.value", "derive": "rate"}], + problem='not-found', + reply_keys={'message': f'could not read archive {broken_archive}/0.index'})