diff --git a/.gitignore b/.gitignore index 02b7604d..4584a538 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,6 @@ env # VSCode .vscode + +# Idea / PyCharm +.idea diff --git a/kopf/_cogs/clients/watching.py b/kopf/_cogs/clients/watching.py index e4d9884f..11bd7617 100644 --- a/kopf/_cogs/clients/watching.py +++ b/kopf/_cogs/clients/watching.py @@ -26,12 +26,15 @@ import aiohttp from kopf._cogs.aiokits import aiotasks, aiotoggles -from kopf._cogs.clients import api, fetching +from kopf._cogs.clients import api, errors, fetching from kopf._cogs.configs import configuration from kopf._cogs.structs import bodies, references logger = logging.getLogger(__name__) +HTTP_TOO_MANY_REQUESTS_CODE = 429 +DEFAULT_RETRY_DELAY_SECONDS = 1 + class WatchingError(Exception): """ @@ -79,8 +82,19 @@ async def infinite_watch( namespace=namespace, operator_pause_waiter=operator_pause_waiter, ) - async for raw_event in stream: - yield raw_event + try: + async for raw_event in stream: + yield raw_event + except errors.APIClientError as ex: + if ex.code != HTTP_TOO_MANY_REQUESTS_CODE: + raise + + retry_wait = ex.details.get("retryAfterSeconds") or DEFAULT_RETRY_DELAY_SECONDS + logger.warning( + f"Receiving `too many requests` error from server, will retry after " + f"{retry_wait} seconds. Error details: {ex}" + ) + await asyncio.sleep(retry_wait) await asyncio.sleep(settings.watching.reconnect_backoff) finally: logger.debug(f"Stopping the watch-stream for {resource} {where}.") diff --git a/tests/k8s/test_watching_infinitely.py b/tests/k8s/test_watching_infinitely.py index 8ca83860..ef95ecfa 100644 --- a/tests/k8s/test_watching_infinitely.py +++ b/tests/k8s/test_watching_infinitely.py @@ -1,6 +1,6 @@ import pytest -from kopf._cogs.clients.errors import APIError +from kopf._cogs.clients.errors import APIClientError, APIError from kopf._cogs.clients.watching import Bookmark, infinite_watch STREAM_WITH_UNKNOWN_EVENT = [ @@ -63,3 +63,28 @@ async def test_infinite_watch_never_exits_normally( assert events[2] == Bookmark.LISTED assert events[3]['object']['spec'] == 'a' assert events[4]['object']['spec'] == 'b' + + +async def test_too_many_requests_exception( + settings, resource, stream, namespace, enforced_session, mocker): + + exc = APIClientError({ + "apiVersion": "v1", + "code": 429, + "status": "Failure", + "details": { + "retryAfterSeconds": 1, + } + }, status=429) + enforced_session.request = mocker.Mock(side_effect=exc) + stream.feed([], namespace=namespace) + stream.close(namespace=namespace) + + events = [] + async for event in infinite_watch(settings=settings, + resource=resource, + namespace=namespace, + _iterations=1): + events.append(event) + + assert len(events) == 0