Skip to content

Commit

Permalink
Add config param to pass additional parameters to confluent-kafka-pyt…
Browse files Browse the repository at this point in the history
…hon (#1505)

* Add config param to pass additional parameters to confluent-kafka-python

* Raise error if ssl_context is passed

* Add ConfluentConfig typed dict

* Add docs for passing config

* Fix mypy failure

* Remove unused parameters

* Bump version
  • Loading branch information
kumaranvpl committed Jun 11, 2024
1 parent c20629c commit df2a0a1
Show file tree
Hide file tree
Showing 37 changed files with 562 additions and 648 deletions.
1 change: 1 addition & 0 deletions .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dependant
unsecure
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1308,
"line_number": 1325,
"is_secret": false
}
],
Expand Down Expand Up @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-06-06T04:30:54Z"
"generated_at": "2024-06-10T09:56:52Z"
}
14 changes: 14 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ search:
- [Acknowledgement](confluent/ack.md)
- [Message Information](confluent/message.md)
- [Security Configuration](confluent/security.md)
- [Additional Configuration](confluent/additional-configuration.md)
- [RabbitMQ](rabbit/index.md)
- [Subscription](rabbit/examples/index.md)
- [Direct](rabbit/examples/direct.md)
Expand Down Expand Up @@ -443,6 +444,19 @@ search:
- [TopicPartition](api/faststream/confluent/client/TopicPartition.md)
- [check_msg_error](api/faststream/confluent/client/check_msg_error.md)
- [create_topics](api/faststream/confluent/client/create_topics.md)
- config
- [BrokerAddressFamily](api/faststream/confluent/config/BrokerAddressFamily.md)
- [BuiltinFeatures](api/faststream/confluent/config/BuiltinFeatures.md)
- [ClientDNSLookup](api/faststream/confluent/config/ClientDNSLookup.md)
- [CompressionCodec](api/faststream/confluent/config/CompressionCodec.md)
- [CompressionType](api/faststream/confluent/config/CompressionType.md)
- [ConfluentConfig](api/faststream/confluent/config/ConfluentConfig.md)
- [Debug](api/faststream/confluent/config/Debug.md)
- [GroupProtocol](api/faststream/confluent/config/GroupProtocol.md)
- [IsolationLevel](api/faststream/confluent/config/IsolationLevel.md)
- [OffsetStoreMethod](api/faststream/confluent/config/OffsetStoreMethod.md)
- [SASLOAUTHBearerMethod](api/faststream/confluent/config/SASLOAUTHBearerMethod.md)
- [SecurityProtocol](api/faststream/confluent/config/SecurityProtocol.md)
- fastapi
- [Context](api/faststream/confluent/fastapi/Context.md)
- [KafkaRouter](api/faststream/confluent/fastapi/KafkaRouter.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.BrokerAddressFamily
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/BuiltinFeatures.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.BuiltinFeatures
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/ClientDNSLookup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.ClientDNSLookup
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/CompressionCodec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.CompressionCodec
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/CompressionType.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.CompressionType
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/ConfluentConfig.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.ConfluentConfig
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/Debug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.Debug
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/GroupProtocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.GroupProtocol
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/IsolationLevel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.IsolationLevel
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/OffsetStoreMethod.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.OffsetStoreMethod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.SASLOAUTHBearerMethod
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/SecurityProtocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.SecurityProtocol
27 changes: 27 additions & 0 deletions docs/docs/en/confluent/additional-configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Passing Additional Configuration to confluent-kafka-python

The `confluent-kafka-python` package is a Python wrapper around [librdkakfa](https://github.com/confluentinc/librdkafka), which is a C/C++ client library for Apache Kafka.

`confluent-kafka-python` accepts a `config` dictionary that is then passed on to `librdkafka`. `librdkafka` provides plenty of [configuration properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) to configure the Kafka client.

**FastStream** also provides users with the ability to pass the config dictionary to `librdkafka` to provide greater customizability.

## Example

In the following example, we are setting the parameter `topic.metadata.refresh.fast.interval.ms`'s value to `300` instead of the default value `100` via the `config` parameter.

```python linenums="1" hl_lines="15 16"
{! docs_src/confluent/additional_config/app.py !}
```

Similarly, you could use the `config` parameter to pass any [configuration properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) to `librdkafka`.
28 changes: 24 additions & 4 deletions docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ This chapter discusses the security options available in **FastStream** and how

**Usage:**

```python linenums="1" hl_lines="4 7 9"
```python linenums="1" hl_lines="2 4 6"
{! docs_src/confluent/security/basic.py !}
```

Expand All @@ -33,7 +33,7 @@ This chapter discusses the security options available in **FastStream** and how
**Usage:**

```python linenums="1"
{! docs_src/confluent/security/plaintext.py [ln:1-10.25,11-] !}
{! docs_src/confluent/security/plaintext.py !}
```

**Using any SASL authentication without SSL:**
Expand All @@ -58,10 +58,30 @@ If the user does not want to use SSL encryption without the warning getting logg

=== "SCRAM256"
```python linenums="1"
{!> docs_src/confluent/security/sasl_scram256.py [ln:1-10.25,11-] !}
{!> docs_src/confluent/security/sasl_scram256.py [ln:1-6.25,7-] !}
```

=== "SCRAM512"
```python linenums="1"
{!> docs_src/confluent/security/sasl_scram512.py [ln:1-10.25,11-] !}
{!> docs_src/confluent/security/sasl_scram512.py [ln:1-6.25,7-] !}
```

### 4. Other security related usecases

**Purpose**: If you want to pass additional values to `confluent-kafka-python`, you can pass a dictionary called `config` to `KafkaBroker`. For example, to pass your own certificate file:

**Usage:**

```python
from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

security = SASLPlaintext(
username="admin",
password="password", # pragma: allowlist secret
)

config = {"ssl.ca.location": "~/my_certs/CRT_cacerts.pem"}

broker = KafkaBroker("localhost:9092", security=security, config=config)
```
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ search:
- [Acknowledgement](confluent/ack.md)
- [Message Information](confluent/message.md)
- [Security Configuration](confluent/security.md)
- [Additional Configuration](confluent/additional-configuration.md)
- [RabbitMQ](rabbit/index.md)
- [Subscription](rabbit/examples/index.md)
- [Direct](rabbit/examples/direct.md)
Expand Down
Empty file.
22 changes: 22 additions & 0 deletions docs/docs_src/confluent/additional_config/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.confluent import KafkaBroker


class HelloWorld(BaseModel):
msg: str = Field(
...,
examples=["Hello"],
description="Demo hello world message",
)


config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)


@broker.subscriber("hello_world")
async def on_hello_world(msg: HelloWorld, logger: Logger):
logger.info(msg)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/basic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import BaseSecurity

ssl_context = ssl.create_default_context()
security = BaseSecurity(ssl_context=ssl_context)
security = BaseSecurity(use_ssl=True)

broker = KafkaBroker("localhost:9092", security=security)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/plaintext.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

ssl_context = ssl.create_default_context()
security = SASLPlaintext(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/sasl_scram256.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLScram256

ssl_context = ssl.create_default_context()
security = SASLScram256(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/sasl_scram512.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLScram512

ssl_context = ssl.create_default_context()
security = SASLScram512(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.11"
__version__ = "0.5.12"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
Loading

0 comments on commit df2a0a1

Please sign in to comment.