Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last_updated_timestamp for observers #522

Merged
merged 16 commits into from
Apr 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor:
def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
point = metrics_pb2.Point(
timestamp=utils.proto_timestamp_from_time_ns(
metric_record.metric.bind(
metric_record.label_set
).last_update_timestamp
metric_record.aggregator.last_update_timestamp
lzchen marked this conversation as resolved.
Show resolved Hide resolved
)
)
if metric_record.metric.value_type == int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,11 @@ def test_translate_to_collector(self):
self.assertEqual(len(output_metrics[0].timeseries[0].points), 1)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].timestamp.seconds,
record.metric.bind(record.label_set).last_update_timestamp
// 1000000000,
record.aggregator.last_update_timestamp // 1000000000,
)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].timestamp.nanos,
record.metric.bind(record.label_set).last_update_timestamp
% 1000000000,
record.aggregator.last_update_timestamp % 1000000000,
)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].int64_value, 123
Expand Down
14 changes: 3 additions & 11 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util import time_ns

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,7 +70,6 @@ def __init__(
self.value_type = value_type
self.enabled = enabled
self.aggregator = aggregator
self.last_update_timestamp = time_ns()
self._ref_count = 0
self._ref_count_lock = threading.Lock()

Expand All @@ -86,7 +84,6 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool:
return True

def update(self, value: metrics_api.ValueT):
self.last_update_timestamp = time_ns()
self.aggregator.update(value)

def release(self):
Expand All @@ -105,10 +102,8 @@ def ref_count(self):
return self._ref_count

def __repr__(self):
return '{}(data="{}", last_update_timestamp={})'.format(
type(self).__name__,
self.aggregator.current,
self.last_update_timestamp,
return '{}(data="{}")'.format(
type(self).__name__, self.aggregator.current
)


Expand Down Expand Up @@ -346,7 +341,6 @@ def _collect_observers(self) -> None:
if not observer.enabled:
continue

# TODO: capture timestamp?
if not observer.run():
continue

Expand Down Expand Up @@ -429,9 +423,7 @@ def get_label_set(self, labels: Dict[str, str]):


class MeterProvider(metrics_api.MeterProvider):
def __init__(
self, resource: Resource = Resource.create_empty(),
):
def __init__(self, resource: Resource = Resource.create_empty()):
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self.resource = resource

def get_meter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import threading
from collections import namedtuple

from opentelemetry.util import time_ns


class Aggregator(abc.ABC):
"""Base class for aggregators.
Expand Down Expand Up @@ -49,10 +51,12 @@ def __init__(self):
self.current = 0
self.checkpoint = 0
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
self.current += value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
Expand All @@ -62,6 +66,9 @@ def take_checkpoint(self):
def merge(self, other):
with self._lock:
self.checkpoint += other.checkpoint
self.last_update_timestamp = (
other.last_update_timestamp or self.last_update_timestamp
lzchen marked this conversation as resolved.
Show resolved Hide resolved
)


class MinMaxSumCountAggregator(Aggregator):
Expand All @@ -88,6 +95,7 @@ def __init__(self):
self.current = self._EMPTY
self.checkpoint = self._EMPTY
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
Expand All @@ -100,6 +108,7 @@ def update(self, value):
self.current.sum + value,
self.current.count + 1,
)
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
Expand All @@ -111,6 +120,9 @@ def merge(self, other):
self.checkpoint = self._merge_checkpoint(
self.checkpoint, other.checkpoint
)
self.last_update_timestamp = (
other.last_update_timestamp or self.last_update_timestamp
)


class ObserverAggregator(Aggregator):
Expand All @@ -123,10 +135,12 @@ def __init__(self):
self.mmsc = MinMaxSumCountAggregator()
self.current = None
self.checkpoint = self._TYPE(None, None, None, 0, None)
self.last_update_timestamp = None

def update(self, value):
self.mmsc.update(value)
self.current = value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
self.mmsc.take_checkpoint()
Expand All @@ -137,6 +151,9 @@ def merge(self, other):
self.checkpoint = self._TYPE(
*(
self.mmsc.checkpoint
+ (other.checkpoint.last or self.checkpoint.last,)
+ (other.checkpoint.last or self.checkpoint.last or 0,)
mauriciovasquezbernal marked this conversation as resolved.
Show resolved Hide resolved
)
)
self.last_update_timestamp = (
other.last_update_timestamp or self.last_update_timestamp
)
23 changes: 20 additions & 3 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,14 @@ def call_update(counter):
update_total += val
return update_total

def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
counter = CounterAggregator()
counter.update(1.0)
counter.update(2.0)
self.assertEqual(counter.current, 3.0)
self.assertEqual(counter.last_update_timestamp, 123)

def test_checkpoint(self):
counter = CounterAggregator()
Expand All @@ -252,8 +255,10 @@ def test_merge(self):
counter2 = CounterAggregator()
counter.checkpoint = 1.0
counter2.checkpoint = 3.0
counter2.last_update_timestamp = 123
counter.merge(counter2)
self.assertEqual(counter.checkpoint, 4.0)
self.assertEqual(counter.last_update_timestamp, 123)

def test_concurrent_update(self):
counter = CounterAggregator()
Expand Down Expand Up @@ -302,7 +307,9 @@ def call_update(mmsc):
count_ += 1
return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_)

def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
mmsc = MinMaxSumCountAggregator()
# test current values without any update
self.assertEqual(mmsc.current, MinMaxSumCountAggregator._EMPTY)
Expand All @@ -315,6 +322,7 @@ def test_update(self):
self.assertEqual(
mmsc.current, (min(values), max(values), sum(values), len(values))
)
self.assertEqual(mmsc.last_update_timestamp, 123)

def test_checkpoint(self):
mmsc = MinMaxSumCountAggregator()
Expand Down Expand Up @@ -346,6 +354,8 @@ def test_merge(self):
mmsc1.checkpoint = checkpoint1
mmsc2.checkpoint = checkpoint2

mmsc2.last_update_timestamp = 123

mmsc1.merge(mmsc2)

self.assertEqual(
Expand All @@ -354,6 +364,7 @@ def test_merge(self):
checkpoint1, checkpoint2
),
)
self.assertEqual(mmsc1.last_update_timestamp, 123)

def test_merge_checkpoint(self):
func = MinMaxSumCountAggregator._merge_checkpoint
Expand Down Expand Up @@ -427,7 +438,9 @@ def test_concurrent_update_and_checkpoint(self):


class TestObserverAggregator(unittest.TestCase):
def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
observer = ObserverAggregator()
# test current values without any update
self.assertEqual(observer.mmsc.current, (None, None, None, 0))
Expand All @@ -442,6 +455,7 @@ def test_update(self):
observer.mmsc.current,
(min(values), max(values), sum(values), len(values)),
)
self.assertEqual(observer.last_update_timestamp, 123)

self.assertEqual(observer.current, values[-1])

Expand Down Expand Up @@ -477,6 +491,8 @@ def test_merge(self):
observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer2.last_update_timestamp = 123

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

Expand All @@ -492,6 +508,7 @@ def test_merge(self):
checkpoint2.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 123)

def test_merge_with_empty(self):
observer1 = ObserverAggregator()
Expand Down
10 changes: 2 additions & 8 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,10 @@ def test_add_incorrect_type(self, logger_mock):
self.assertEqual(bound_counter.aggregator.current, 0)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.time_ns")
def test_update(self, time_mock):
def test_update(self):
aggregator = export.aggregate.CounterAggregator()
bound_counter = metrics.BoundCounter(int, True, aggregator)
time_mock.return_value = 123
bound_counter.update(4.0)
self.assertEqual(bound_counter.last_update_timestamp, 123)
self.assertEqual(bound_counter.aggregator.current, 4.0)


Expand Down Expand Up @@ -428,11 +425,8 @@ def test_record_incorrect_type(self, logger_mock):
)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.time_ns")
def test_update(self, time_mock):
def test_update(self):
aggregator = export.aggregate.MinMaxSumCountAggregator()
bound_measure = metrics.BoundMeasure(int, True, aggregator)
time_mock.return_value = 123
bound_measure.update(4.0)
self.assertEqual(bound_measure.last_update_timestamp, 123)
self.assertEqual(bound_measure.aggregator.current, (4.0, 4.0, 4.0, 1))