Skip to content

Commit

Permalink
Add usage tracking metrics for Kafka clients. (#658)
Browse files Browse the repository at this point in the history
* Add usage tracking metrics for Kafka clients.

* Fix double import lint error

* [Mega-Linter] Apply linters fixes

* Create version util file and add metrics to consumer.

* Address linting errors.

* Add missing semi-colon.

* [Mega-Linter] Apply linters fixes

* Bump tests.

Co-authored-by: Hannah Stepanek <hstepanek@newrelic.com>
Co-authored-by: hmstepanek <hmstepanek@users.noreply.github.com>
Co-authored-by: umaannamalai <umaannamalai@users.noreply.github.com>
  • Loading branch information
4 people committed Oct 17, 2022
1 parent 65246e7 commit b9bca3d
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 18 deletions.
9 changes: 9 additions & 0 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __init__(self, application, enabled=None, source=None):
self._loop_time = 0.0

self._frameworks = set()
self._message_brokers = set()

self._frozen_path = None

Expand Down Expand Up @@ -545,6 +546,10 @@ def __exit__(self, exc, value, tb):
for framework, version in self._frameworks:
self.record_custom_metric("Python/Framework/%s/%s" % (framework, version), 1)

if self._message_brokers:
for message_broker, version in self._message_brokers:
self.record_custom_metric("Python/MessageBroker/%s/%s" % (message_broker, version), 1)

if self._settings.distributed_tracing.enabled:
# Sampled and priority need to be computed at the end of the
# transaction when distributed tracing or span events are enabled.
Expand Down Expand Up @@ -1676,6 +1681,10 @@ def add_framework_info(self, name, version=None):
if name:
self._frameworks.add((name, version))

def add_messagebroker_info(self, name, version=None):
if name:
self._message_brokers.add((name, version))

def dump(self, file):
"""Dumps details about the transaction to the file object."""

Expand Down
23 changes: 23 additions & 0 deletions newrelic/common/package_version_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys


def get_package_version(name):
# importlib was introduced into the standard library starting in Python3.8.
if "importlib" in sys.modules and hasattr(sys.modules["importlib"], "metadata"):
return sys.modules["importlib"].metadata.version(name) # pylint: disable=E1101
elif "pkg_resources" in sys.modules:
return sys.modules["pkg_resources"].get_distribution(name).version
17 changes: 3 additions & 14 deletions newrelic/core/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sysconfig

import newrelic
from newrelic.common.package_version_utils import get_package_version
from newrelic.common.system_info import (
logical_processor_count,
physical_processor_count,
Expand All @@ -37,18 +38,6 @@

def environment_settings():
"""Returns an array of arrays of environment settings"""

# Find version resolver.

get_version = None
# importlib was introduced into the standard library starting in Python3.8.
if "importlib" in sys.modules and hasattr(sys.modules["importlib"], "metadata"):
get_version = sys.modules["importlib"].metadata.version
elif "pkg_resources" in sys.modules:

def get_version(name): # pylint: disable=function-redefined
return sys.modules["pkg_resources"].get_distribution(name).version

env = []

# Agent information.
Expand Down Expand Up @@ -186,7 +175,7 @@ def get_version(name): # pylint: disable=function-redefined
dispatcher.append(("Dispatcher Version", hypercorn.__version__))
else:
try:
dispatcher.append(("Dispatcher Version", get_version("hypercorn")))
dispatcher.append(("Dispatcher Version", get_package_version("hypercorn")))
except Exception:
pass

Expand Down Expand Up @@ -237,7 +226,7 @@ def get_version(name): # pylint: disable=function-redefined
continue

try:
version = get_version(name)
version = get_package_version(name)
plugins.append("%s (%s)" % (name, version))
except Exception:
plugins.append(name)
Expand Down
4 changes: 4 additions & 0 deletions newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from newrelic.api.time_trace import notice_error
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper
from newrelic.common.package_version_utils import get_package_version

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,6 +57,8 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
else:
topic = kwargs.get("topic", None)

transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

with MessageTrace(
library="Kafka",
operation="Produce",
Expand Down Expand Up @@ -161,6 +164,7 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs):
name = "Named/%s" % destination_name
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

return record

Expand Down
13 changes: 9 additions & 4 deletions newrelic/hooks/messagebroker_kafkapython.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
function_wrapper,
wrap_function_wrapper,
)
from newrelic.common.package_version_utils import get_package_version

HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
Expand All @@ -48,6 +49,8 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
headers = list(headers) if headers else []

transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))

with MessageTrace(
library="Kafka",
operation="Produce",
Expand Down Expand Up @@ -112,6 +115,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
message_count = 1

transaction = current_transaction(active_only=False)

if not transaction:
transaction = MessageTransaction(
application=application_instance(),
Expand All @@ -124,7 +128,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
source=wrapped,
)
instance._nr_transaction = transaction
transaction.__enter__()
transaction.__enter__() # pylint: disable=C2801

# Obtain consumer client_id to send up as agent attribute
if hasattr(instance, "config") and "client_id" in instance.config:
Expand All @@ -143,12 +147,13 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
name = "Named/%s" % destination_name
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))

return record


def wrap_KafkaProducer_init(wrapped, instance, args, kwargs):
get_config_key = lambda key: kwargs.get(key, instance.DEFAULT_CONFIG[key]) # noqa: E731
get_config_key = lambda key: kwargs.get(key, instance.DEFAULT_CONFIG[key]) # pylint: disable=C3001 # noqa: E731

kwargs["key_serializer"] = wrap_serializer(
instance, "Serialization/Key", "MessageBroker", get_config_key("key_serializer")
Expand All @@ -162,13 +167,13 @@ def wrap_KafkaProducer_init(wrapped, instance, args, kwargs):

class NewRelicSerializerWrapper(ObjectProxy):
def __init__(self, wrapped, serializer_name, group_prefix):
ObjectProxy.__init__.__get__(self)(wrapped)
ObjectProxy.__init__.__get__(self)(wrapped) # pylint: disable=W0231

self._nr_serializer_name = serializer_name
self._nr_group_prefix = group_prefix

def serialize(self, topic, object):
wrapped = self.__wrapped__.serialize
wrapped = self.__wrapped__.serialize # pylint: disable=W0622
args = (topic, object)
kwargs = {}

Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_confluentkafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def _test():


def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
from confluent_kafka import __version__ as version

transaction_name = (
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
)
Expand All @@ -72,6 +74,7 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
custom_metrics=[
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
("Python/MessageBroker/Confluent-Kafka/%s" % version, 1),
],
background_task=True,
)
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def producer_callback(err, msg):


def test_trace_metrics(topic, send_producer_message):
from confluent_kafka import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
unscoped_metrics = scoped_metrics
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
Expand All @@ -73,6 +75,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)],
background_task=True,
)
@background_task()
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_kafkapython/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def _test():


def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
from kafka.version import __version__ as version

transaction_name = (
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
)
Expand All @@ -69,6 +71,7 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
custom_metrics=[
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
("Python/MessageBroker/Kafka-Python/%s" % version, 1),
],
background_task=True,
)
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_kafkapython/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@


def test_trace_metrics(topic, send_producer_message):
from kafka.version import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
unscoped_metrics = scoped_metrics
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
Expand All @@ -36,6 +38,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Kafka-Python/%s" % version, 1)],
background_task=True,
)
@background_task()
Expand Down

0 comments on commit b9bca3d

Please sign in to comment.