Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry after receiving too many requests error from kubernetes #963

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ env

# VSCode
.vscode

# Idea / PyCharm
.idea
20 changes: 17 additions & 3 deletions kopf/_cogs/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import aiohttp

from kopf._cogs.aiokits import aiotasks, aiotoggles
from kopf._cogs.clients import api, fetching
from kopf._cogs.clients import api, errors, fetching
from kopf._cogs.configs import configuration
from kopf._cogs.structs import bodies, references

logger = logging.getLogger(__name__)

HTTP_TOO_MANY_REQUESTS_CODE = 429
DEFAULT_RETRY_DELAY_SECONDS = 1


class WatchingError(Exception):
"""
Expand Down Expand Up @@ -79,8 +82,19 @@ async def infinite_watch(
namespace=namespace,
operator_pause_waiter=operator_pause_waiter,
)
async for raw_event in stream:
yield raw_event
try:
async for raw_event in stream:
yield raw_event
except errors.APIClientError as ex:
if ex.code != HTTP_TOO_MANY_REQUESTS_CODE:
raise

retry_wait = ex.details.get("retryAfterSeconds") or DEFAULT_RETRY_DELAY_SECONDS

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjsi quick question - is this key correct? Does APIClientError do some parsing/formatting of the response headers? When we saw this issue our response looked like:

<ClientResponse(https://kubernetes.default.svc/apis/monitoring.coreos.com/v1/namespaces/tenant-sta-cwdev04-added01/servicemonitors/knative-proxy) [429 Too Many Requests]> <CIMultiDictProxy('Cache-Control': 'no-cache, private', 'Content-Type': 'text/plain; charset=utf-8', 'Retry-After': '1', 'X-Content-Type-Options': 'nosniff', 'Date': 'Tue, 14 Mar 2023 16:47:45 GMT', 'Content-Length': '43')>

So the response header is just Retry-After in our example.

logger.warning(
f"Receiving `too many requests` error from server, will retry after "
f"{retry_wait} seconds. Error details: {ex}"
)
await asyncio.sleep(retry_wait)
await asyncio.sleep(settings.watching.reconnect_backoff)
finally:
logger.debug(f"Stopping the watch-stream for {resource} {where}.")
Expand Down
27 changes: 26 additions & 1 deletion tests/k8s/test_watching_infinitely.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kopf._cogs.clients.errors import APIError
from kopf._cogs.clients.errors import APIClientError, APIError
from kopf._cogs.clients.watching import Bookmark, infinite_watch

STREAM_WITH_UNKNOWN_EVENT = [
Expand Down Expand Up @@ -63,3 +63,28 @@ async def test_infinite_watch_never_exits_normally(
assert events[2] == Bookmark.LISTED
assert events[3]['object']['spec'] == 'a'
assert events[4]['object']['spec'] == 'b'


async def test_too_many_requests_exception(
settings, resource, stream, namespace, enforced_session, mocker):

exc = APIClientError({
"apiVersion": "v1",
"code": 429,
"status": "Failure",
"details": {
"retryAfterSeconds": 1,
}
}, status=429)
enforced_session.request = mocker.Mock(side_effect=exc)
stream.feed([], namespace=namespace)
stream.close(namespace=namespace)

events = []
async for event in infinite_watch(settings=settings,
resource=resource,
namespace=namespace,
_iterations=1):
events.append(event)

assert len(events) == 0