Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last_updated_timestamp for observers #522

Merged
merged 16 commits into from
Apr 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor:
def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
point = metrics_pb2.Point(
timestamp=utils.proto_timestamp_from_time_ns(
metric_record.metric.bind(
metric_record.label_set
).last_update_timestamp
metric_record.aggregator.last_update_timestamp
lzchen marked this conversation as resolved.
Show resolved Hide resolved
)
)
if metric_record.metric.value_type == int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,11 @@ def test_translate_to_collector(self):
self.assertEqual(len(output_metrics[0].timeseries[0].points), 1)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].timestamp.seconds,
record.metric.bind(record.label_set).last_update_timestamp
// 1000000000,
record.aggregator.last_update_timestamp // 1000000000,
)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].timestamp.nanos,
record.metric.bind(record.label_set).last_update_timestamp
% 1000000000,
record.aggregator.last_update_timestamp % 1000000000,
)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].int64_value, 123
Expand Down
14 changes: 3 additions & 11 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util import time_ns

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,7 +70,6 @@ def __init__(
self.value_type = value_type
self.enabled = enabled
self.aggregator = aggregator
self.last_update_timestamp = time_ns()
self._ref_count = 0
self._ref_count_lock = threading.Lock()

Expand All @@ -86,7 +84,6 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool:
return True

def update(self, value: metrics_api.ValueT):
self.last_update_timestamp = time_ns()
self.aggregator.update(value)

def release(self):
Expand All @@ -105,10 +102,8 @@ def ref_count(self):
return self._ref_count

def __repr__(self):
return '{}(data="{}", last_update_timestamp={})'.format(
type(self).__name__,
self.aggregator.current,
self.last_update_timestamp,
return '{}(data="{}")'.format(
type(self).__name__, self.aggregator.current
)


Expand Down Expand Up @@ -346,7 +341,6 @@ def _collect_observers(self) -> None:
if not observer.enabled:
continue

# TODO: capture timestamp?
if not observer.run():
continue

Expand Down Expand Up @@ -429,9 +423,7 @@ def get_label_set(self, labels: Dict[str, str]):


class MeterProvider(metrics_api.MeterProvider):
def __init__(
self, resource: Resource = Resource.create_empty(),
):
def __init__(self, resource: Resource = Resource.create_empty()):
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self.resource = resource

def get_meter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import threading
from collections import namedtuple

from opentelemetry.util import time_ns


class Aggregator(abc.ABC):
"""Base class for aggregators.
Expand Down Expand Up @@ -49,10 +51,12 @@ def __init__(self):
self.current = 0
self.checkpoint = 0
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
self.current += value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
Expand All @@ -62,6 +66,11 @@ def take_checkpoint(self):
def merge(self, other):
with self._lock:
self.checkpoint += other.checkpoint
if self.last_update_timestamp is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like when this code shows up three times, it might be good to refactor it into a common utility method. All the timestamp comparisons seem to be doing the same logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with taking out repetitive code. Would passing an aggregator into a utility method look weird? The observer aggregator also modifies last, so it is not exactly the same; type checking would be needed. Is that logic worth it to simply reduce some duplicate code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considered passing in two Optional[DateTime] objects, and doing the none check there
you could pass the object in itself, but I agree that feels weird

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change that factors out the checking logic and assigns the values as well.

self.last_update_timestamp = other.last_update_timestamp
elif other.last_update_timestamp is not None:
if self.last_update_timestamp < other.last_update_timestamp:
self.last_update_timestamp = other.last_update_timestamp


class MinMaxSumCountAggregator(Aggregator):
Expand All @@ -88,6 +97,7 @@ def __init__(self):
self.current = self._EMPTY
self.checkpoint = self._EMPTY
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
Expand All @@ -100,6 +110,7 @@ def update(self, value):
self.current.sum + value,
self.current.count + 1,
)
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
Expand All @@ -111,6 +122,11 @@ def merge(self, other):
self.checkpoint = self._merge_checkpoint(
self.checkpoint, other.checkpoint
)
if self.last_update_timestamp is None:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self.last_update_timestamp = other.last_update_timestamp
elif other.last_update_timestamp is not None:
if self.last_update_timestamp < other.last_update_timestamp:
self.last_update_timestamp = other.last_update_timestamp


class ObserverAggregator(Aggregator):
Expand All @@ -123,20 +139,25 @@ def __init__(self):
self.mmsc = MinMaxSumCountAggregator()
self.current = None
self.checkpoint = self._TYPE(None, None, None, 0, None)
self.last_update_timestamp = None

def update(self, value):
self.mmsc.update(value)
self.current = value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
self.mmsc.take_checkpoint()
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,)))

def merge(self, other):
self.mmsc.merge(other.mmsc)
self.checkpoint = self._TYPE(
*(
self.mmsc.checkpoint
+ (other.checkpoint.last or self.checkpoint.last,)
)
)
last = self.checkpoint.last
if self.last_update_timestamp is None:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self.last_update_timestamp = other.last_update_timestamp
last = other.checkpoint.last
elif other.last_update_timestamp is not None:
if self.last_update_timestamp < other.last_update_timestamp:
self.last_update_timestamp = other.last_update_timestamp
last = other.checkpoint.last
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,)))
94 changes: 91 additions & 3 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,14 @@ def call_update(counter):
update_total += val
return update_total

def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
counter = CounterAggregator()
counter.update(1.0)
counter.update(2.0)
self.assertEqual(counter.current, 3.0)
self.assertEqual(counter.last_update_timestamp, 123)

def test_checkpoint(self):
counter = CounterAggregator()
Expand All @@ -252,8 +255,10 @@ def test_merge(self):
counter2 = CounterAggregator()
counter.checkpoint = 1.0
counter2.checkpoint = 3.0
counter2.last_update_timestamp = 123
counter.merge(counter2)
self.assertEqual(counter.checkpoint, 4.0)
self.assertEqual(counter.last_update_timestamp, 123)

def test_concurrent_update(self):
counter = CounterAggregator()
Expand Down Expand Up @@ -302,7 +307,9 @@ def call_update(mmsc):
count_ += 1
return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_)

def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
mmsc = MinMaxSumCountAggregator()
# test current values without any update
self.assertEqual(mmsc.current, MinMaxSumCountAggregator._EMPTY)
Expand All @@ -315,6 +322,7 @@ def test_update(self):
self.assertEqual(
mmsc.current, (min(values), max(values), sum(values), len(values))
)
self.assertEqual(mmsc.last_update_timestamp, 123)

def test_checkpoint(self):
mmsc = MinMaxSumCountAggregator()
Expand Down Expand Up @@ -346,6 +354,9 @@ def test_merge(self):
mmsc1.checkpoint = checkpoint1
mmsc2.checkpoint = checkpoint2

mmsc1.last_update_timestamp = 100
mmsc2.last_update_timestamp = 123

mmsc1.merge(mmsc2)

self.assertEqual(
Expand All @@ -354,6 +365,7 @@ def test_merge(self):
checkpoint1, checkpoint2
),
)
self.assertEqual(mmsc1.last_update_timestamp, 123)

def test_merge_checkpoint(self):
func = MinMaxSumCountAggregator._merge_checkpoint
Expand Down Expand Up @@ -427,7 +439,9 @@ def test_concurrent_update_and_checkpoint(self):


class TestObserverAggregator(unittest.TestCase):
def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
observer = ObserverAggregator()
# test current values without any update
self.assertEqual(observer.mmsc.current, (None, None, None, 0))
Expand All @@ -442,6 +456,7 @@ def test_update(self):
observer.mmsc.current,
(min(values), max(values), sum(values), len(values)),
)
self.assertEqual(observer.last_update_timestamp, 123)

self.assertEqual(observer.current, values[-1])

Expand Down Expand Up @@ -477,6 +492,77 @@ def test_merge(self):
observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer1.last_update_timestamp = 100
observer2.last_update_timestamp = 123

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

observer1.merge(observer2)

self.assertEqual(
observer1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
checkpoint2.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 123)

def test_merge_last_updated(self):
observer1 = ObserverAggregator()
observer2 = ObserverAggregator()

mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)

checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,)))

checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,)))

observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer1.last_update_timestamp = 123
observer2.last_update_timestamp = 100

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

observer1.merge(observer2)

self.assertEqual(
observer1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
checkpoint1.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 123)

def test_merge_last_updated_none(self):
observer1 = ObserverAggregator()
observer2 = ObserverAggregator()

mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)

checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,)))

checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,)))

observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer1.last_update_timestamp = None
observer2.last_update_timestamp = 100

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

Expand All @@ -492,6 +578,7 @@ def test_merge(self):
checkpoint2.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 100)

def test_merge_with_empty(self):
observer1 = ObserverAggregator()
Expand All @@ -502,6 +589,7 @@ def test_merge_with_empty(self):

observer1.mmsc.checkpoint = mmsc_checkpoint1
observer1.checkpoint = checkpoint1
observer1.last_update_timestamp = 100

observer1.merge(observer2)

Expand Down
10 changes: 2 additions & 8 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,10 @@ def test_add_incorrect_type(self, logger_mock):
self.assertEqual(bound_counter.aggregator.current, 0)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.time_ns")
def test_update(self, time_mock):
def test_update(self):
aggregator = export.aggregate.CounterAggregator()
bound_counter = metrics.BoundCounter(int, True, aggregator)
time_mock.return_value = 123
bound_counter.update(4.0)
self.assertEqual(bound_counter.last_update_timestamp, 123)
self.assertEqual(bound_counter.aggregator.current, 4.0)


Expand Down Expand Up @@ -428,11 +425,8 @@ def test_record_incorrect_type(self, logger_mock):
)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.time_ns")
def test_update(self, time_mock):
def test_update(self):
aggregator = export.aggregate.MinMaxSumCountAggregator()
bound_measure = metrics.BoundMeasure(int, True, aggregator)
time_mock.return_value = 123
bound_measure.update(4.0)
self.assertEqual(bound_measure.last_update_timestamp, 123)
self.assertEqual(bound_measure.aggregator.current, (4.0, 4.0, 4.0, 1))