diff --git a/newrelic/hooks/datastore_aioredis.py b/newrelic/hooks/datastore_aioredis.py index 276f116e4..daced369a 100644 --- a/newrelic/hooks/datastore_aioredis.py +++ b/newrelic/hooks/datastore_aioredis.py @@ -60,7 +60,12 @@ def _nr_wrapper_AioRedis_method_(wrapped, instance, args, kwargs): # Method will return synchronously without executing, # it will be added to the command stack and run later. aioredis_version = get_package_version_tuple("aioredis") - if aioredis_version and aioredis_version < (2,): + # This conditional is for versions of aioredis that are outside + # New Relic's supportability window but will still work. New + # Relic does not provide testing/support for this. In order to + # keep functionality without affecting coverage metrics, this + # segment is excluded from coverage analysis. + if aioredis_version and aioredis_version < (2,): # pragma: no cover # AioRedis v1 uses a RedisBuffer instead of a real connection for queueing up pipeline commands from aioredis.commands.transaction import _RedisBuffer @@ -135,7 +140,12 @@ async def wrap_Connection_send_command(wrapped, instance, args, kwargs): return await wrapped(*args, **kwargs) -def wrap_RedisConnection_execute(wrapped, instance, args, kwargs): +# This wrapper is for versions of aioredis that are outside +# New Relic's supportability window but will still work. New +# Relic does not provide testing/support for this. In order to +# keep functionality without affecting coverage metrics, this +# segment is excluded from coverage analysis. +def wrap_RedisConnection_execute(wrapped, instance, args, kwargs): # pragma: no cover # RedisConnection in aioredis v1 returns a future instead of using coroutines transaction = current_transaction() if not transaction: @@ -203,6 +213,11 @@ def instrument_aioredis_connection(module): if hasattr(module.Connection, "send_command"): wrap_function_wrapper(module, "Connection.send_command", wrap_Connection_send_command) - if hasattr(module, "RedisConnection"): + # This conditional is for versions of aioredis that are outside + # New Relic's supportability window but will still work. New + # Relic does not provide testing/support for this. In order to + # keep functionality without affecting coverage metrics, this + # segment is excluded from coverage analysis. + if hasattr(module, "RedisConnection"): # pragma: no cover if hasattr(module.RedisConnection, "execute"): wrap_function_wrapper(module, "RedisConnection.execute", wrap_RedisConnection_execute) diff --git a/newrelic/hooks/datastore_redis.py b/newrelic/hooks/datastore_redis.py index 15adace70..bd6897230 100644 --- a/newrelic/hooks/datastore_redis.py +++ b/newrelic/hooks/datastore_redis.py @@ -18,11 +18,65 @@ from newrelic.api.transaction import current_transaction from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper +_redis_client_sync_methods = { + "acl_dryrun", + "auth", + "bgrewriteaof", + "bitfield", + "blmpop", + "bzmpop", + "client", + "command", + "command_docs", + "command_getkeysandflags", + "command_info", + "debug_segfault", + "expiretime", + "failover", + "hello", + "latency_doctor", + "latency_graph", + "latency_histogram", + "lcs", + "lpop", + "lpos", + "memory_doctor", + "memory_help", + "monitor", + "pexpiretime", + "psetex", + "psync", + "pubsub", + "renamenx", + "rpop", + "script_debug", + "sentinel_ckquorum", + "sentinel_failover", + "sentinel_flushconfig", + "sentinel_get_master_addr_by_name", + "sentinel_master", + "sentinel_masters", + "sentinel_monitor", + "sentinel_remove", + "sentinel_reset", + "sentinel_sentinels", + "sentinel_set", + "sentinel_slaves", + "shutdown", + "sort", + "sort_ro", + "spop", + "srandmember", + "unwatch", + "watch", + "zlexcount", + "zrevrangebyscore", +} + -_redis_client_methods = { +_redis_client_async_methods = { "acl_cat", "acl_deluser", - "acl_dryrun", "acl_genpass", "acl_getuser", "acl_help", @@ -51,11 +105,8 @@ "arrlen", "arrpop", "arrtrim", - "auth", - "bgrewriteaof", "bgsave", "bitcount", - "bitfield", "bitfield_ro", "bitop_and", "bitop_not", @@ -64,13 +115,11 @@ "bitop", "bitpos", "blmove", - "blmpop", "blpop", "brpop", "brpoplpush", "byrank", "byrevrank", - "bzmpop", "bzpopmax", "bzpopmin", "card", @@ -91,7 +140,6 @@ "client_trackinginfo", "client_unblock", "client_unpause", - "client", "cluster_add_slots", "cluster_addslots", "cluster_count_failure_report", @@ -118,10 +166,7 @@ "cluster_slots", "cluster", "command_count", - "command_docs", "command_getkeys", - "command_getkeysandflags", - "command_info", "command_list", "command", "commit", @@ -137,7 +182,6 @@ "createrule", "dbsize", "debug_object", - "debug_segfault", "debug_sleep", "debug", "decr", @@ -160,10 +204,8 @@ "exists", "expire", "expireat", - "expiretime", "explain_cli", "explain", - "failover", "fcall_ro", "fcall", "flushall", @@ -192,7 +234,6 @@ "getrange", "getset", "hdel", - "hello", "hexists", "hget", "hgetall", @@ -220,13 +261,9 @@ "insertnx", "keys", "lastsave", - "latency_doctor", - "latency_graph", - "latency_histogram", "latency_history", "latency_latest", "latency_reset", - "lcs", "lindex", "linsert", "list", @@ -235,8 +272,6 @@ "lmpop", "loadchunk", "lolwut", - "lpop", - "lpos", "lpush", "lpushx", "lrange", @@ -245,8 +280,6 @@ "ltrim", "madd", "max", - "memory_doctor", - "memory_help", "memory_malloc_stats", "memory_purge", "memory_stats", @@ -261,7 +294,6 @@ "module_load", "module_loadex", "module_unload", - "monitor", "move", "mrange", "mrevrange", @@ -277,21 +309,17 @@ "persist", "pexpire", "pexpireat", - "pexpiretime", "pfadd", "pfcount", "pfmerge", "ping", "profile", - "psetex", "psubscribe", - "psync", "pttl", "publish", "pubsub_channels", "pubsub_numpat", "pubsub_numsub", - "pubsub", "punsubscribe", "quantile", "query", @@ -303,7 +331,6 @@ "readonly", "readwrite", "rename", - "renamenx", "replicaof", "reserve", "reset", @@ -312,7 +339,6 @@ "revrange", "revrank", "role", - "rpop", "rpoplpush", "rpush", "rpushx", @@ -322,7 +348,6 @@ "scan", "scandump", "scard", - "script_debug", "script_exists", "script_flush", "script_kill", @@ -331,24 +356,11 @@ "sdiffstore", "search", "select", - "sentinel_ckquorum", - "sentinel_failover", - "sentinel_flushconfig", - "sentinel_get_master_addr_by_name", - "sentinel_master", - "sentinel_masters", - "sentinel_monitor", - "sentinel_remove", - "sentinel_reset", - "sentinel_sentinels", - "sentinel_set", - "sentinel_slaves", "set", "setbit", "setex", "setnx", "setrange", - "shutdown", "sinter", "sintercard", "sinterstore", @@ -361,11 +373,7 @@ "smembers", "smismember", "smove", - "sort_ro", - "sort", "spellcheck", - "spop", - "srandmember", "srem", "sscan_iter", "sscan", @@ -393,10 +401,8 @@ "type", "unlink", "unsubscribe", - "unwatch", "wait", "waitaof", - "watch", "xack", "xadd", "xautoclaim", @@ -432,7 +438,6 @@ "zinter", "zintercard", "zinterstore", - "zlexcount", "zmpop", "zmscore", "zpopmax", @@ -449,7 +454,6 @@ "zremrangebyscore", "zrevrange", "zrevrangebylex", - "zrevrangebyscore", "zrevrank", "zscan_iter", "zscan", @@ -458,6 +462,8 @@ "zunionstore", } +_redis_client_methods = _redis_client_sync_methods.union(_redis_client_async_methods) + _redis_multipart_commands = set(["client", "cluster", "command", "config", "debug", "sentinel", "slowlog", "script"]) _redis_operation_re = re.compile(r"[-\s]+") @@ -506,7 +512,6 @@ def _nr_wrapper_Redis_method_(wrapped, instance, args, kwargs): def _wrap_asyncio_Redis_method_wrapper(module, instance_class_name, operation): - @function_wrapper async def _nr_wrapper_asyncio_Redis_async_method_(wrapped, instance, args, kwargs): transaction = current_transaction() @@ -518,6 +523,7 @@ async def _nr_wrapper_asyncio_Redis_async_method_(wrapped, instance, args, kwarg def _nr_wrapper_asyncio_Redis_method_(wrapped, instance, args, kwargs): from redis.asyncio.client import Pipeline + if isinstance(instance, Pipeline): return wrapped(*args, **kwargs) @@ -592,10 +598,11 @@ def instrument_redis_client(module): def instrument_asyncio_redis_client(module): if hasattr(module, "Redis"): class_ = getattr(module, "Redis") - for operation in _redis_client_methods: + for operation in _redis_client_async_methods: if hasattr(class_, operation): _wrap_asyncio_Redis_method_wrapper(module, "Redis", operation) + def instrument_redis_commands_core(module): _instrument_redis_commands_module(module, "CoreCommands") diff --git a/tests/datastore_redis/test_asyncio.py b/tests/datastore_redis/test_asyncio.py new file mode 100644 index 000000000..97c1b7853 --- /dev/null +++ b/tests/datastore_redis/test_asyncio.py @@ -0,0 +1,100 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +import pytest +from testing_support.db_settings import redis_settings +from testing_support.fixture.event_loop import event_loop as loop # noqa: F401 +from testing_support.util import instance_hostname +from testing_support.validators.validate_transaction_metrics import ( + validate_transaction_metrics, +) + +from newrelic.api.background_task import background_task +from newrelic.common.package_version_utils import get_package_version_tuple + +# Settings + +DB_SETTINGS = redis_settings()[0] +REDIS_VERSION = get_package_version_tuple("redis") + +# Metrics + +_enable_scoped_metrics = [("Datastore/operation/Redis/publish", 3)] + +_enable_rollup_metrics = [ + ("Datastore/all", 3), + ("Datastore/allOther", 3), + ("Datastore/Redis/all", 3), + ("Datastore/Redis/allOther", 3), + ("Datastore/operation/Redis/publish", 3), + ("Datastore/instance/Redis/%s/%s" % (instance_hostname(DB_SETTINGS["host"]), DB_SETTINGS["port"]), 3), +] + +# Tests + + +@pytest.fixture() +def client(loop): # noqa + import redis.asyncio + + return loop.run_until_complete(redis.asyncio.Redis(host=DB_SETTINGS["host"], port=DB_SETTINGS["port"], db=0)) + + +@pytest.mark.skipif(REDIS_VERSION < (4, 2), reason="This functionality exists in Redis 4.2+") +@validate_transaction_metrics("test_asyncio:test_async_pipeline", background_task=True) +@background_task() +def test_async_pipeline(client, loop): # noqa + async def _test_pipeline(client): + async with client.pipeline(transaction=True) as pipe: + await pipe.set("key1", "value1") + await pipe.execute() + + loop.run_until_complete(_test_pipeline(client)) + + +@pytest.mark.skipif(REDIS_VERSION < (4, 2), reason="This functionality exists in Redis 4.2+") +@validate_transaction_metrics( + "test_asyncio:test_async_pubsub", + scoped_metrics=_enable_scoped_metrics, + rollup_metrics=_enable_rollup_metrics, + background_task=True, +) +@background_task() +def test_async_pubsub(client, loop): # noqa + messages_received = [] + + async def reader(pubsub): + while True: + message = await pubsub.get_message(ignore_subscribe_messages=True) + if message: + messages_received.append(message["data"].decode()) + if message["data"].decode() == "NOPE": + break + + async def _test_pubsub(): + async with client.pubsub() as pubsub: + await pubsub.psubscribe("channel:*") + + future = asyncio.create_task(reader(pubsub)) + + await client.publish("channel:1", "Hello") + await client.publish("channel:2", "World") + await client.publish("channel:1", "NOPE") + + await future + + loop.run_until_complete(_test_pubsub()) + assert messages_received == ["Hello", "World", "NOPE"]