Skip to content

Commit

Permalink
Merge branch 'feat/asgi' of github.com:airtai/faststream into feat/asgi
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl committed Aug 4, 2024
2 parents 8e64d6f + 8f23bba commit 636cd1b
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ This chapter discusses the security options available in **FastStream** and how

### 4. SASLOAuthBearer Object with SSL/TLS

**Purpose:** The `SASLOAuthBearer` is used for authentication using the Oauth sasl.mechanism. While using it you additionaly need to provide necessary `sasl.oauthbearer.*` values in config and provide it to `KafkaBroker`, eg. `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`. Full list is available in the [confluent doc](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md){.external-link target="_blank"}
**Purpose:** The `SASLOAuthBearer` is used for authentication using the Oauth sasl.mechanism. While using it you additionally need to provide necessary `sasl.oauthbearer.*` values in config and provide it to `KafkaBroker`, eg. `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`. Full list is available in the [confluent doc](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md){.external-link target="_blank"}

**Usage:**

Expand Down
8 changes: 5 additions & 3 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, cast

from faststream.broker.message import decode_message, gen_cor_id
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage
Expand Down Expand Up @@ -97,7 +97,9 @@ async def decode_message(
msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> "DecodedMessage":
"""Decode a batch of messages."""
# super() should be here due python can't find it in comprehension
super_obj = cast(AioKafkaParser, super())

return [
decode_message(await super(AioKafkaBatchParser, self).parse_message(m))
for m in msg.raw_message
decode_message(await super_obj.parse_message(m)) for m in msg.raw_message
]
4 changes: 1 addition & 3 deletions faststream/redis/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ async def start( # type: ignore[override]
await super().start()

start_signal = anyio.Event()
self.task = asyncio.create_task(
self._consume(*args, start_signal=start_signal)
)
self.task = asyncio.create_task(self._consume(*args, start_signal=start_signal))

with anyio.fail_after(3.0):
await start_signal.wait()
Expand Down
10 changes: 2 additions & 8 deletions faststream/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ class SASLOAuthBearer(BaseSecurity):
This class defines basic security configuration for SASL/OAUTHBEARER authentication.
"""

__slots__ = (
"use_ssl",
"ssl_context"
)
__slots__ = ("use_ssl", "ssl_context")

def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/OAUTHBEARER authentication."""
Expand All @@ -179,10 +176,7 @@ class SASLGSSAPI(BaseSecurity):
This class defines security configuration for SASL/GSSAPI authentication.
"""

__slots__ = (
"use_ssl",
"ssl_context"
)
__slots__ = ("use_ssl", "ssl_context")

def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/GSSAPI authentication."""
Expand Down
6 changes: 4 additions & 2 deletions tests/asyncapi/confluent/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from faststream.confluent import KafkaBroker
from faststream.security import (
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
SASLScram256,
SASLScram512,
SASLOAuthBearer,
)

basic_schema = {
Expand Down Expand Up @@ -187,7 +187,9 @@ async def test_topic(msg: str) -> str:
schema = get_app_schema(app).to_jsonable()

sasl_oauthbearer_security_schema = deepcopy(basic_schema)
sasl_oauthbearer_security_schema["servers"]["development"]["security"] = [{"oauthbearer": []}]
sasl_oauthbearer_security_schema["servers"]["development"]["security"] = [
{"oauthbearer": []}
]
sasl_oauthbearer_security_schema["components"]["securitySchemes"] = {
"oauthbearer": {"type": "oauthBearer"}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/brokers/base/testclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def test_broker_with_real_patches_subscribers_and_subscribers(

publisher = test_broker.publisher(f"{queue}1")

@test_broker.subscriber(queue)
@test_broker.subscriber(queue, **self.subscriber_kwargs)
async def m(msg):
await publisher.publish(f"response: {msg}")

Expand Down

0 comments on commit 636cd1b

Please sign in to comment.