Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Oct 16, 2020
1 parent 284072c commit b17530e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,21 @@ def _get_data_points(

if isinstance(sdk_metric_record.aggregator, SumAggregator):
value = sdk_metric_record.aggregator.checkpoint

elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator):
# FIXME: How are values to be interpreted from this aggregator?
raise Exception("MinMaxSumCount aggregator data not supported")

elif isinstance(sdk_metric_record.aggregator, HistogramAggregator):
# FIXME: How are values to be interpreted from this aggregator?
raise Exception("Histogram aggregator data not supported")

elif isinstance(sdk_metric_record.aggregator, LastValueAggregator):
value = sdk_metric_record.aggregator.checkpoint

elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator):
value = sdk_metric_record.aggregator.checkpoint.last

print(
"sdk_metric_record.aggregator.last_checkpoint_timestamp:\t{}".format(
sdk_metric_record.aggregator.last_checkpoint_timestamp
)
)
print(
"sdk_metric_record.aggregator.last_update_timestamp:\t{}".format(
sdk_metric_record.aggregator.last_update_timestamp
)
)
if sdk_metric_record.aggregator.last_checkpoint_timestamp > sdk_metric_record.aggregator.last_update_timestamp:
print("larger: sdk_metric_record.aggregator.last_checkpoint_timestamp")
else:
print("larger: sdk_metric_record.aggregator.last_update_timestamp")
print()

return [
data_point_class(
labels=[
Expand All @@ -108,19 +96,11 @@ def _get_data_points(
) for label_key, label_value in sdk_metric_record.labels
],
value=value,
# start_time_unix_nano=1,
# time_unix_nano=2,
# start_time_unix_nano=(
# sdk_metric_record.aggregator.last_checkpoint_timestamp
# ),
# time_unix_nano=(
# sdk_metric_record.aggregator.last_update_timestamp
# ),
start_time_unix_nano=(
sdk_metric_record.aggregator.last_update_timestamp
sdk_metric_record.aggregator.initial_checkpoint_timestamp
),
time_unix_nano=(
sdk_metric_record.aggregator.last_checkpoint_timestamp
sdk_metric_record.aggregator.last_update_timestamp
),
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def __init__(
self._runtime_gc_count_labels = self._labels.copy()

# ERROR: received cumulative expected gauge
# Error: timestamps produce an error
"""
self.meter.register_observer(
callback=self._get_system_cpu_time,
name="system.cpu.time",
Expand All @@ -173,6 +175,7 @@ def __init__(
value_type=float,
observer_type=SumObserver,
)
"""

self.meter.register_observer(
callback=self._get_system_cpu_utilization,
Expand Down Expand Up @@ -237,7 +240,6 @@ def __init__(
# observer_type=SumObserver,
# )

# Works fine
self.meter.register_observer(
callback=self._get_system_disk_io,
name="system.disk.io",
Expand All @@ -247,7 +249,6 @@ def __init__(
observer_type=SumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_system_disk_operations,
name="system.disk.operations",
Expand All @@ -257,7 +258,6 @@ def __init__(
observer_type=SumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_system_disk_time,
name="system.disk.time",
Expand All @@ -267,7 +267,6 @@ def __init__(
observer_type=SumObserver,
)

# Seems to work fine, may cause an error when cancelled
self.meter.register_observer(
callback=self._get_system_disk_merged,
name="system.disk.merged",
Expand Down Expand Up @@ -295,7 +294,6 @@ def __init__(
# observer_type=ValueObserver,
# )

# Works fine
self.meter.register_observer(
callback=self._get_system_network_dropped_packets,
name="system.network.dropped_packets",
Expand All @@ -305,7 +303,6 @@ def __init__(
observer_type=SumObserver,
)

# Seems to work fine, may cause an error when cancelled
self.meter.register_observer(
callback=self._get_system_network_packets,
name="system.network.packets",
Expand All @@ -315,7 +312,6 @@ def __init__(
observer_type=SumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_system_network_errors,
name="system.network.errors",
Expand All @@ -325,7 +321,6 @@ def __init__(
observer_type=SumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_system_network_io,
name="system.network.io",
Expand All @@ -335,7 +330,6 @@ def __init__(
observer_type=SumObserver,
)

# Seems to work fine, may cause an error when cancelled
self.meter.register_observer(
callback=self._get_system_network_connections,
name="system.network.connections",
Expand All @@ -345,7 +339,6 @@ def __init__(
observer_type=UpDownSumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_runtime_memory,
name="runtime.{}.memory".format(self._python_implementation),
Expand All @@ -357,7 +350,6 @@ def __init__(
observer_type=SumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_runtime_cpu_time,
name="runtime.{}.cpu_time".format(self._python_implementation),
Expand All @@ -369,7 +361,6 @@ def __init__(
observer_type=SumObserver,
)

# Works fine
self.meter.register_observer(
callback=self._get_runtime_gc_count,
name="runtime.{}.gc_count".format(self._python_implementation),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class Aggregator(abc.ABC):
def __init__(self, config=None):
self._lock = threading.Lock()
self.last_update_timestamp = 0
self.last_checkpoint_timestamp = 0
self.initial_checkpoint_timestamp = 0
self.checkpointed = True
if config is not None:
self.config = config
else:
Expand All @@ -42,21 +43,25 @@ def __init__(self, config=None):
@abc.abstractmethod
def update(self, value):
"""Updates the current with the new value."""
if self.checkpointed:
self.initial_checkpoint_timestamp = time_ns()
self.checkpointed = False
self.last_update_timestamp = time_ns()

@abc.abstractmethod
def take_checkpoint(self):
"""Stores a snapshot of the current value."""
self.last_checkpoint_timestamp = time_ns()
self.checkpointed = True

@abc.abstractmethod
def merge(self, other):
"""Combines two aggregator values."""
self.last_update_timestamp = max(
self.last_update_timestamp, other.last_update_timestamp
)
self.last_checkpoint_timestamp = max(
self.last_checkpoint_timestamp, other.last_checkpoint_timestamp
self.initial_checkpoint_timestamp = max(
self.initial_checkpoint_timestamp,
other.initial_checkpoint_timestamp
)

def _verify_type(self, other):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import threading
from time import sleep

from opentelemetry.context import attach, detach, set_value
from opentelemetry.metrics import Meter
Expand Down Expand Up @@ -61,3 +62,46 @@ def tick(self):
detach(token)
# Perform post-exporting logic
self.meter.processor.finished_collection()


class DebugController:
"""A debug controller, used to replace Push controller when debugging
Push controller uses a thread which makes it hard to use the IPython
debugger. This controller does not use a thread, but relies on the user
manually calling its ``run`` method to start the controller.
Args:
meter: The meter used to collect metrics.
exporter: The exporter used to export metrics.
interval: The collect/export interval in seconds.
"""

daemon = True

def __init__(
self, meter: Meter, exporter: MetricsExporter, interval: float
):
super().__init__()
self.meter = meter
self.exporter = exporter
self.interval = interval

def run(self):
while True:
self.tick()
sleep(self.interval)

def shutdown(self):
# Run one more collection pass to flush metrics batched in the meter
self.tick()

def tick(self):
# Collect all of the meter's metrics to be exported
self.meter.collect()
# Export the collected metrics
token = attach(set_value("suppress_instrumentation", True))
self.exporter.export(self.meter.processor.checkpoint_set())
detach(token)
# Perform post-exporting logic
self.meter.processor.finished_collection()

0 comments on commit b17530e

Please sign in to comment.