-
Notifications
You must be signed in to change notification settings - Fork 136
/
logging.py
74 lines (59 loc) · 2.2 KB
/
logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import logging
from typing import TYPE_CHECKING, Any, Optional, Type
from typing_extensions import Self
from faststream.broker.middlewares.base import BaseMiddleware
from faststream.exceptions import IgnoredException
from faststream.utils.context.repository import context
if TYPE_CHECKING:
from types import TracebackType
from faststream.broker.message import StreamMessage
from faststream.types import LoggerProto
class CriticalLogMiddleware(BaseMiddleware):
"""A middleware class for logging critical errors."""
def __init__(
self,
logger: Optional["LoggerProto"],
log_level: int,
) -> None:
"""Initialize the class."""
self.logger = logger
self.log_level = log_level
def __call__(self, msg: Optional[Any]) -> Self:
"""Call the object with a message."""
self.msg = msg
return self
async def on_consume(
self,
msg: "StreamMessage[Any]",
) -> "StreamMessage[Any]":
if self.logger is not None:
c = context.get_local("log_context", {})
self.logger.log(self.log_level, "Received", extra=c)
return await super().on_consume(msg)
async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional["TracebackType"] = None,
) -> bool:
"""Asynchronously called after processing."""
if self.logger is not None:
c = context.get_local("log_context", {})
if exc_type:
if issubclass(exc_type, IgnoredException):
self.logger.log(
logging.INFO,
exc_val,
extra=c,
)
else:
self.logger.log(
logging.ERROR,
f"{exc_type.__name__}: {exc_val}",
exc_info=exc_val,
extra=c,
)
self.logger.log(self.log_level, "Processed", extra=c)
await super().after_processed(exc_type, exc_val, exc_tb)
# Exception was not processed
return False