Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: when headers() returns None in AsyncConfluentParser, replace it with an empty tuple #1460

Merged
merged 1 commit into from
May 21, 2024

Conversation

andreaimprovised
Copy link
Contributor

@andreaimprovised andreaimprovised commented May 21, 2024

Description

Fixes #1459

Also, happens to fix a batch=False flow when headers() returns None now that I think about it.

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh
    • I only ran the test that was added to cover this use case.
  • I have ensured that static analysis tests are passing by running scripts/static-analysis.sh
    • I ensured that there were no new errors. This fixes two existing ones.
  • I have included code examples to illustrate the modifications

Test

Just ran this test:

(venv) ➜  faststream git:(fix/1459) ✗ pytest -vv tests/brokers/confluent/test_consume.py -k test_consume_batch_headers
======================================================================================================== test session starts =========================================================================================================
platform darwin -- Python 3.12.3, pytest-8.2.0, pluggy-1.5.0 -- /Users/andreaimprovised/workspace/faststream/venv/bin/python
cachedir: .pytest_cache
rootdir: /Users/andreaimprovised/workspace/faststream
configfile: pyproject.toml
plugins: anyio-4.3.0, asyncio-0.23.6
asyncio: mode=Mode.STRICT
collected 16 items / 15 deselected / 1 selected

tests/brokers/confluent/test_consume.py::TestConsume::test_consume_batch_headers PASSED                                                                                                                                        [100%]

================================================================================================== 1 passed, 15 deselected in 3.62s ==================================================================================================

lint

(venv) ➜  faststream git:(fix/1459) ✗ scripts/lint.sh
Running ruff linter (isort, flake, pyupgrade, etc. replacement)...
All checks passed!
Running ruff formatter (black replacement)...
637 files left unchanged
Running codespell to find typos...
./docs/docs/en/release.md:42: aboout ==> about

static analysis

(venv) ➜  faststream git:(fix/1459) ✗ scripts/static-analysis.sh
Running mypy...
faststream/confluent/client.py:181: error: Argument 2 to "produce" of "Producer" has incompatible type "**Dict[str, Union[str, bytes, int, List[Tuple[str, Union[str, bytes]]]]]"; expected "Union[str, bytes, None]"  [arg-type]
faststream/confluent/client.py:181: error: Argument 2 to "produce" of "Producer" has incompatible type "**Dict[str, Union[str, bytes, int, List[Tuple[str, Union[str, bytes]]]]]"; expected "Optional[int]"  [arg-type]
faststream/confluent/client.py:181: error: Argument 2 to "produce" of "Producer" has incompatible type "**Dict[str, Union[str, bytes, int, List[Tuple[str, Union[str, bytes]]]]]"; expected "Optional[Callable[..., Any]]"  [arg-type]
faststream/confluent/client.py:181: error: Argument 2 to "produce" of "Producer" has incompatible type "**Dict[str, Union[str, bytes, int, List[Tuple[str, Union[str, bytes]]]]]"; expected "int"  [arg-type]
faststream/confluent/client.py:181: error: Argument 2 to "produce" of "Producer" has incompatible type "**Dict[str, Union[str, bytes, int, List[Tuple[str, Union[str, bytes]]]]]"; expected "Union[Dict[str, Union[str, bytes]], List[Tuple[str, Union[str, bytes]]], None]"  [arg-type]
faststream/confluent/client.py:396: error: Argument 1 to "run_async" has incompatible type "Callable[[int, Union[float, int]], List[TopicPartition]]"; expected "Union[Callable[[int, float], List[Optional[Message]]], Callable[[int, float], Awaitable[List[Optional[Message]]]]]"  [arg-type]
faststream/confluent/parser.py:31: error: Incompatible return value type (got "KafkaMessage", expected "StreamMessage[Message]")  [return-value]
faststream/confluent/parser.py:64: error: Incompatible return value type (got "KafkaMessage", expected "StreamMessage[Tuple[Message, ...]]")  [return-value]
faststream/confluent/subscriber/asyncapi.py:141: error: Overloaded function implementation does not accept all possible arguments of signature 1  [misc]
faststream/confluent/subscriber/asyncapi.py:141: error: Overloaded function implementation does not accept all possible arguments of signature 2  [misc]
faststream/confluent/publisher/asyncapi.py:128: error: Overloaded function implementation does not accept all possible arguments of signature 1  [misc]
faststream/confluent/publisher/asyncapi.py:128: error: Overloaded function implementation does not accept all possible arguments of signature 2  [misc]
faststream/confluent/broker/registrator.py:55: error: Incompatible types in assignment (expression has type "Dict[int, Union[AsyncAPIBatchSubscriber, AsyncAPIDefaultSubscriber]]", base class "ABCBroker" defined the type as "Mapping[int, SubscriberProto[Union[Message, Tuple[Message, ...]]]]")  [assignment]
faststream/confluent/broker/registrator.py:58: error: Incompatible types in assignment (expression has type "Dict[int, Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]]", base class "ABCBroker" defined the type as "Mapping[int, PublisherProto[Union[Message, Tuple[Message, ...]]]]")  [assignment]
faststream/confluent/broker/registrator.py:1237: error: Argument 1 to "subscriber" of "ABCBroker" has incompatible type "Union[AsyncAPIDefaultSubscriber, AsyncAPIBatchSubscriber]"; expected "SubscriberProto[Union[Message, Tuple[Message, ...]]]"  [arg-type]
faststream/confluent/broker/registrator.py:1607: error: Argument 1 to "publisher" of "ABCBroker" has incompatible type "Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]"; expected "PublisherProto[Union[Message, Tuple[Message, ...]]]"  [arg-type]
faststream/confluent/broker/registrator.py:1609: error: Argument 1 to "publisher" of "ABCBroker" has incompatible type "Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]"; expected "PublisherProto[Union[Message, Tuple[Message, ...]]]"  [arg-type]
faststream/confluent/router.py:529: error: Argument "middlewares" to "__init__" of "BrokerRouter" has incompatible type "Iterable[Union[Callable[[Optional[Message]], BaseMiddleware], Callable[[Optional[Tuple[Message, ...]]], BaseMiddleware]]]"; expected "Iterable[Callable[[Union[Message, Tuple[Message, ...], None]], BaseMiddleware]]"  [arg-type]
faststream/confluent/broker/broker.py:60: error: Definition of "publisher" in base class "KafkaRegistrator" is incompatible with definition in base class "BrokerUsecase"  [misc]
faststream/confluent/fastapi/fastapi.py:374: error: Argument "middlewares" to "__init__" of "StreamRouter" has incompatible type "Iterable[Union[Callable[[Optional[Message]], BaseMiddleware], Callable[[Optional[Tuple[Message, ...]]], BaseMiddleware]]]"; expected "Iterable[Callable[[Union[Message, Tuple[Message, ...], None]], BaseMiddleware]]"  [arg-type]
faststream/confluent/testing.py:50: error: Argument 1 to "setup_subscriber" of "BrokerUsecase" has incompatible type "Union[AsyncAPIDefaultSubscriber, AsyncAPIBatchSubscriber]"; expected "SubscriberProto[Union[Message, Tuple[Message, ...]]]"  [arg-type]
Found 21 errors in 9 files (checked 240 source files)

@Lancetnik Lancetnik enabled auto-merge May 21, 2024 04:57
@Lancetnik Lancetnik added this pull request to the merge queue May 21, 2024
Merged via the queue into airtai:main with commit 2d224ee May 21, 2024
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bug: Unable to process messages when headers() returns null in batch confluent kafka consumers
2 participants