Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC retry entrypoints can't retry other RPC retry entrypoints #29

Open
gchai87 opened this issue Mar 28, 2019 · 5 comments
Open

RPC retry entrypoints can't retry other RPC retry entrypoints #29

gchai87 opened this issue Mar 28, 2019 · 5 comments

Comments

@gchai87
Copy link

gchai87 commented Mar 28, 2019

Describe the bug
Retrying rpc endpoint with decorator imported from nameko-amqp-retry will not work if it then makes a call to another rpc endpoint with decorator imported from nameko-amqp-retry.

To Reproduce
setting up 2 services both with nameko_amqp_retry rpc entrypoints

service1

import logging

from nameko.rpc import RpcProxy
from nameko_amqp_retry import entrypoint_retry
from nameko_amqp_retry.rpc import rpc

logger = logging.getLogger(name)

class Service1():

name = "service1"

service2_rpc = RpcProxy("service2")

@rpc
@entrypoint_retry(retry_for=KeyError, limit=2, schedule=[1000])
def call_service2(self):
    logger.info(self.service2_rpc.do_bad_thing())
    a = {"bad": "thing"}
    return a["good"]

service2

import logging

from nameko_amqp_retry.rpc import rpc

logger = logging.getLogger(name)

class Service2():

name = "service2"

@rpc
def do_bad_thing(self):
    return "Works the first time only."

Call service 2 with service1
n.rpc.service1.call_service2()

results in KeyError as expected first but on retry it hits a MethodNotFound error instead of second KeyError:

nameko run --config deploy/core/config.yaml service1.services.core.service:Service1
2019-03-28 16:42:15,484 [INFO] [studapart-service] [nameko.runners] starting services: service1
2019-03-28 16:42:15,545 [INFO] [studapart-service] [kombu.mixins] Connected to amqp://guest:**@127.0.0.1:5672//
2019-03-28 16:42:16,349 [INFO] [studapart-service] [service1.services.core.service] Works the first time only.
2019-03-28 16:42:16,349 [WARNING] [studapart-service] [nameko.containers] (expected) error handling worker <WorkerContext [service1.call_service2] at 0x1117c1b70>: Backoff(uninitialised)
Traceback (most recent call last):
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko_amqp_retry/decorators.py", line 64, in wrapper
return wrapped(*args, **kwargs)
File "/Users/gchai/project/service1/src/service1/services/core/service.py", line 21, in call_service2
return a["good"]
KeyError: 'good'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker
result = method(*worker_ctx.args, **worker_ctx.kwargs)
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko_amqp_retry/decorators.py", line 66, in wrapper
six.raise_from(backoff_cls(), exc)
File "", line 3, in raise_from
nameko_amqp_retry.decorators.backoff_factory..CustomBackoff: Backoff(uninitialised)
2019-03-28 16:42:17,288 [ERROR] [studapart-service] [nameko.containers] error handling worker <WorkerContext [service1.call_service2] at 0x111807710>: call_service2
Traceback (most recent call last):
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker
result = method(*worker_ctx.args, **worker_ctx.kwargs)
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko_amqp_retry/decorators.py", line 64, in wrapper
return wrapped(*args, **kwargs)
File "/Users/gchai/project/service1/src/service1/services/core/service.py", line 19, in call_service2
logger.info(self.service2_rpc.do_bad_thing())
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/rpc.py", line 373, in call
return reply.result()
File "/Users/gchai/.virtualenvs/svc1/lib/python3.6/site-packages/nameko/rpc.py", line 331, in result
raise deserialize(error)
nameko.exceptions.MethodNotFound: call_service2

Expected behavior
Not hitting MethodNotFound exceptions. Example above should retry KeyError twice and give up.

Environment (please complete the following information):

  • Nameko version: 2.12.0
  • Nameko-amqp-retry version: 0.7.1
  • Python version: 3.6
  • OS: MacOs
@ketgo
Copy link

ketgo commented Sep 9, 2019

Is this issue because the dead letter exchange is the same for all services?

@ketgo
Copy link

ketgo commented Sep 9, 2019

Never mind, that does not seam to be the problem here. Just tested the following code and still see the same error.

Code:

import logging

from nameko.rpc import RpcProxy
from nameko_amqp_retry import Backoff, BackoffPublisher
from nameko_amqp_retry.rpc import Rpc
from kombu.messaging import Exchange

logger = logging.getLogger(__name__)


class CustomBackoffPublisherService1(BackoffPublisher):
    @property
    def exchange(self):
        backoff_exchange = Exchange(
            type="headers",
            name="service1-backoff"
        )
        return backoff_exchange


class CustomRpcService1(Rpc):
    backoff_publisher = CustomBackoffPublisherService1()


rpc1 = CustomRpcService1.decorator


class Service1:
    name = "service1"
    service2_rpc = RpcProxy("service2")

    @rpc1
    def call_service2(self):
        try:
            logger.info('service2: {}'.format(self.service2_rpc.do_bad_thing()))
            a = {"bad": "thing"}
            return a["good"]
        except KeyError:
            raise Backoff()


class CustomBackoffPublisherService2(BackoffPublisher):
    @property
    def exchange(self):
        backoff_exchange = Exchange(
            type="headers",
            name="service2-backoff"
        )
        return backoff_exchange


class CustomRpcService2(Rpc):
    backoff_publisher = CustomBackoffPublisherService2()


rpc2 = CustomRpcService2.decorator


class Service2:
    name = "service2"

    @rpc2
    def do_bad_thing(self):
        return "Works the first time only."

Logs:

(venv) ketan@Other:~/Projects/pyMSC/scheduler/profile/service$ nameko run --config config.yaml tests.integration.test_retry:Service1
starting <QueueConsumer at 0x7f45755585c0>
waiting for consumer ready <QueueConsumer at 0x7f45755585c0>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f45755585c0>
consumer started <QueueConsumer at 0x7f45755585c0>
started <QueueConsumer at 0x7f45755585c0>
invoking <proxy method: service2.do_bad_thing>
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f4575bcdc50>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f4575bcdc50> {'result': 'Works the first time only.', 'error': None}
service2: Works the first time only.
invoking <proxy method: service2.do_bad_thing>
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f4575c84dd8>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f4575c84dd8> {'result': None, 'error': {'exc_type': 'MethodNotFound', 'exc_path': 'nameko.exceptions.MethodNotFound', 'exc_args': ['call_service2'], 'value': 'call_service2'}}

@ketgo
Copy link

ketgo commented Sep 9, 2019

Using the nameko_tracer extension and adding a print statement in MethodProxy class (see link) after line 433 (see link), I was finally able to debug the issue. Here the MethodProxy class is used internally by RpcProxy.

Problem

The headers for dead letter queue of service 1 are being sent to service 2 which results in the observed exception. See extra_headers printed for nameko.rpc.MethodProxy in the below logs for details.

starting <QueueConsumer at 0x7f3466c713c8>
waiting for consumer ready <QueueConsumer at 0x7f3466c713c8>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f3466c713c8>
consumer started <QueueConsumer at 0x7f3466c713c8>
started <QueueConsumer at 0x7f3466c713c8>
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 12, 994320), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {}, 'call_id': 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'request', 'call_args': {}, 'call_args_redacted': False}
[service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792] entrypoint call trace
invoking <proxy method: service2.do_bad_thing>
<class 'nameko.rpc.MethodProxy'>:
         exchange: Exchange nameko-rpc(topic)
         routing_key: service2.do_bad_thing
         reply_to: c006b4e3-5125-46bb-a84a-0fa5c97f3c38
         correlation_id: 9451ad22-1956-42a6-8963-0cbc91ca4e35
         extra_headers: {'nameko.call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792']}
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f3466df4eb8>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f3466df4eb8> {'result': 'Works the first time only.', 'error': None}
service2: Works the first time only.
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 13, 72546), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {}, 'call_id': 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'response', 'call_args': {}, 'call_args_redacted': False, 'response_status': 'error', 'exception_type': 'CustomBackOff', 'exception_path': 'tests.integration.test_retry.CustomBackOff', 'exception_args': [], 'exception_value': 'Backoff(retry #1 in 1000ms)', 'exception_traceback': 'Traceback (most recent call last):\n  File "./tests/integration/test_retry.py", line 45, in call_service2\n    return a["good"]\nKeyError: \'good\'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker\n    result = method(*worker_ctx.args, **worker_ctx.kwargs)\n  File "./tests/integration/test_retry.py", line 47, in call_service2\n    raise CustomBackOff()\ntests.integration.test_retry.CustomBackOff: Backoff(retry #1 in 1000ms)\n', 'exception_expected': True, 'response_time': 0.078226}
[service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792] entrypoint result trace
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 14, 68293), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {'backoff': 1000, 'rpc_method_id': 'service1.call_service2', 'x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': '2019-09-09 06:28:14', 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'x-first-death-exchange': 'backoff', 'x-first-death-queue': 'backoff--1000ms', 'x-first-death-reason': 'expired'}, 'call_id': 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'request', 'call_args': {}, 'call_args_redacted': False}
[service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945] entrypoint call trace
invoking <proxy method: service2.do_bad_thing>
<class 'nameko.rpc.MethodProxy'>:
         exchange: Exchange nameko-rpc(topic)
         routing_key: service2.do_bad_thing
         reply_to: c006b4e3-5125-46bb-a84a-0fa5c97f3c38
         correlation_id: 09d6f40e-bb68-4f8e-a98b-87d99373c56d
         extra_headers: {'nameko.backoff': 1000, 'nameko.rpc_method_id': 'service1.call_service2', 'nameko.x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': datetime.datetime(2019, 9, 9, 6, 28, 14), 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'nameko.x-first-death-exchange': 'backoff', 'nameko.x-first-death-queue': 'backoff--1000ms', 'nameko.x-first-death-reason': 'expired', 'nameko.call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945']}
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f3466d65160>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f3466d65160> {'result': None, 'error': {'exc_type': 'MethodNotFound', 'exc_path': 'nameko.exceptions.MethodNotFound', 'exc_args': ['call_service2'], 'value': 'call_service2'}}
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 14, 96395), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {'backoff': 1000, 'rpc_method_id': 'service1.call_service2', 'x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': '2019-09-09 06:28:14', 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'x-first-death-exchange': 'backoff', 'x-first-death-queue': 'backoff--1000ms', 'x-first-death-reason': 'expired'}, 'call_id': 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'response', 'call_args': {}, 'call_args_redacted': False, 'response_status': 'error', 'exception_type': 'MethodNotFound', 'exception_path': 'nameko.exceptions.MethodNotFound', 'exception_args': ['call_service2'], 'exception_value': 'call_service2', 'exception_traceback': 'Traceback (most recent call last):\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker\n    result = method(*worker_ctx.args, **worker_ctx.kwargs)\n  File "./tests/integration/test_retry.py", line 43, in call_service2\n    logger.info(\'service2: {}\'.format(self.service2_rpc.do_bad_thing()))\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/rpc.py", line 373, in __call__\n    return reply.result()\n  File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/rpc.py", line 331, in result\n    raise deserialize(error)\nnameko.exceptions.MethodNotFound: call_service2\n', 'exception_expected': False, 'response_time': 0.028102}
[service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945] entrypoint result trace

Solution

Remove the extra headers added for dead letter queue and backoff. This can be achieved by using the following custom rpc proxy:

# utils.py module

from nameko.rpc import RpcProxy as NamekoRpcProxy, ServiceProxy

class CustomRpcProxy(NamekoRpcProxy):

    dead_letter_properties = ['x-death', 'x-first-death-exchange', 'x-first-death-queue', 'x-first-death-reason']
    backoff_properties = ['backoff', 'rpc_method_id']

    def get_dependency(self, worker_ctx):
        # Removing dead letter queue and backoff headers
        for key in self.dead_letter_properties + self.backoff_properties:
            if key in worker_ctx.data:
                worker_ctx.data.pop(key)
        return ServiceProxy(
            worker_ctx,
            self.target_service,
            self.rpc_reply_listener,
            **self.options
        )

The following code worked for me as desired:

# sevice.py module

import logging

from .utils import CustomRpcProxy

from nameko_amqp_retry import Backoff
from nameko_amqp_retry.rpc import rpc
from nameko_tracer import Tracer

logger = logging.getLogger(__name__)


class CustomBackOff(Backoff):
    limit = 2
    schedule = [1000]


class Service1:
    name = "service1"
    service2_rpc = CustomRpcProxy("service2")
    tracer = Tracer()

    @rpc
    def call_service2(self):
        try:
            logger.info('service2: {}'.format(self.service2_rpc.do_bad_thing()))
            a = {"bad": "thing"}
            return a["good"]
        except KeyError:
            raise CustomBackOff()


class Service2:
    name = "service2"
    tracer = Tracer()

    @rpc
    def do_bad_thing(self):
        return "Works the first time only."

Hope that helps!
If there is a more elegant solution do please let me know. Thanks!

@mattbennett
Copy link
Member

Thanks for doing this investigation @ketgo.

I think the only header that's causing a problem is rpc_method_id. It's caused by this conditional:

# use the rpc_method_id if set, otherwise fall back to the routing key
method_id = message.headers.get(RPC_METHOD_ID_HEADER_KEY)
if method_id is None:
method_id = message.delivery_info['routing_key']

The conditional is required because the message republisher has to use the routing_key property with a different value.

If you unset just that header in a custom RpcProxy I think it'll work. I'd welcome a pull request with a fix for this if you're able to contribute one.

@ketgo
Copy link

ketgo commented Sep 15, 2019

@mattbennett Sure, will send one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants