Skip to content

Commit

Permalink
Merge pull request #110 from leroy-merlin-br/chore/configOptions
Browse files Browse the repository at this point in the history
Config options
  • Loading branch information
hcdias committed Oct 10, 2023
2 parents 4018582 + 40173c0 commit 1de2aaf
Show file tree
Hide file tree
Showing 60 changed files with 1,343 additions and 1,284 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Added

- Added AvroSchemaMixedEncoderTest
- Added AvroSchemaDecoderTest
- Added ProducerWithConfigOptionsTest
- Added ConfigOptionsCommand to run commands with ConfigOptions class
- Added pt_BR contributing section
- Added setup-dev script on composer
- Added grumphp commit validation

### Fixed

- Fixed parameters and options override on Consumer\Config class
- Update instructions on contribute section
- Update project install section

### Changed

- Updated class from ConfigManager to ConfigOptions on unit tests
- Updated class from ConfigManager to ConfigOptions where any config request was made
- Consumer and Producer middlewares resolution
164 changes: 34 additions & 130 deletions config/kafka.php
Original file line number Diff line number Diff line change
@@ -1,65 +1,6 @@
<?php

return [
/*
|--------------------------------------------------------------------------
| AVRO Schemas
|--------------------------------------------------------------------------
|
| Here you may specify the schema details configured on topic's broker key.
| Schema are kind of "contract" between the producer and consumer.
| For now, we are just decoding AVRO, not encoding.
|
*/

'avro_schemas' => [
'default' => [
'url' => '',
// Disable SSL verification on schema request.
'ssl_verify' => true,
// This option will be put directly into a Guzzle http request
// Use this to do authorizations or send any headers you want.
// Here is a example of basic authentication on AVRO schema.
'request_options' => [
'headers' => [
'Authorization' => [
'Basic ' . base64_encode(
env('AVRO_SCHEMA_USERNAME')
. ':'
. env('AVRO_SCHEMA_PASSWORD')
),
],
],
],
],
],

/*
|--------------------------------------------------------------------------
| Brokers
|--------------------------------------------------------------------------
|
| Here you may specify the connections details for each broker configured
| on topic's broker key.
|
*/

'brokers' => [
'default' => [
'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'),

// If your broker doest not have authentication, you can
// remove this configuration, or set as empty.
// The Authentication types may be "ssl" or "none"
'auth' => [
'type' => 'ssl', // ssl and none
'ca' => storage_path('ca.pem'),
'certificate' => storage_path('kafka.cert'),
'key' => storage_path('kafka.key'),
],
],
],

'topics' => [
// This is your topic "keyword" where you will put all configurations needed
// on this specific topic.
Expand All @@ -68,57 +9,44 @@
// your messages from kafka.
'topic_id' => 'kafka-test',

// Here you may point the key of the broker configured above.
'broker' => 'default',

// Configurations specific for consumer
//your consumer configurations
'consumer' => [
// You may define more than one consumer group per topic.
// If there is just one defined, it will be used by default,
// otherwise, you may pass which consumer group should be used
// when using the consumer command.
'consumer_groups' => [
'test-consumer-group' => [

// Action to take when there is no initial
// offset in offset store or the desired offset is out of range.
// This config will be passed to 'auto.offset.reset'.
// The valid options are: smallest, earliest, beginning, largest, latest, end, error.
'offset_reset' => 'earliest',

// The offset at which to start consumption. This only applies if partition is set.
// You can use a positive integer or any of the constants: RD_KAFKA_OFFSET_BEGINNING,
// RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
'offset' => 0,

// The partition to consume. It can be null,
// if you don't wish do specify one.
'partition' => 0,

// A consumer class that implements ConsumerTopicHandler
'handler' => '\App\Kafka\Consumers\ConsumerExample',

// A Timeout to listen to a message. That means: how much
// time we need to wait until receiving a message?
'timeout' => 20000,

// Once you've enabled this, the Kafka consumer will commit the
// offset of the last message received in response to its poll() call
'auto_commit' => true,

// If commit_async is false process block until offsets are committed or the commit fails.
// Only works when auto_commit is false
'commit_async' => false,

// An array of middlewares applied only for this consumer_group
'middlewares' => [],
],
],
'consumer_group' => 'test-consumer-group',
// Action to take when there is no initial
// offset in offset store or the desired offset is out of range.
// This config will be passed to 'auto.offset.reset'.
// The valid options are: smallest, earliest, beginning, largest, latest, end, error.
'offset_reset' => 'earliest',

// The offset at which to start consumption. This only applies if partition is set.
// You can use a positive integer or any of the constants: RD_KAFKA_OFFSET_BEGINNING,
// RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
'offset' => 0,

// The partition to consume. It can be null,
// if you don't wish do specify one.
'partition' => 0,

// A consumer class that implements ConsumerTopicHandler
'handler' => '\App\Kafka\Consumers\ConsumerExample',

// A Timeout to listen to a message. That means: how much
// time we need to wait until receiving a message?
'timeout' => 20000,

// Once you've enabled this, the Kafka consumer will commit the
// offset of the last message received in response to its poll() call
'auto_commit' => true,

// If commit_async is false process block until offsets are committed or the commit fails.
// Only works when auto_commit is false
'commit_async' => false,

// An array of middlewares applied only for this consumer_group
'middlewares' => [],
],

// Configurations specific for producer
'producer' => [

// Sets to true if you want to know if a message was successfully posted.
'required_acknowledgment' => true,

Expand Down Expand Up @@ -149,28 +77,4 @@
],
],
],

/*
|--------------------------------------------------------------------------
| Global Middlewares
|--------------------------------------------------------------------------
|
| Here you may specify the global middlewares that will be applied for every
| consumed topic. Middlewares work between the received data from broker and
| before being passed into consumers.
| Available middlewares: log, avro-decode
|
*/

'middlewares' => [
'consumer' => [
\Metamorphosis\Middlewares\Log::class,
],
'producer' => [
\Metamorphosis\Middlewares\Log::class,
],
'global' => [
\Metamorphosis\Middlewares\Log::class,
],
],
];
30 changes: 30 additions & 0 deletions config/service.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

return [
'avro_schema' => [
'url' => '',
'request_options' => [
'headers' => [
'Authorization' => [
'Basic' . base64_encode(
env('AVRO_SCHEMA_USERNAME') . ':' . env(
'AVRO_SCHEMA_PASSWORD'
)
),
],
],
],
'ssl_verify' => true,
'username' => 'USERNAME',
'password' => 'PASSWORD',
],
'broker' => [
'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'),
'auth' => [
'type' => 'ssl', // ssl and none
'ca' => storage_path('ca.pem'),
'certificate' => storage_path('kafka.cert'),
'key' => storage_path('kafka.key'),
],
],
];
57 changes: 40 additions & 17 deletions docs/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ If you wish, you may set a middleware to run of a topic level or a consumer grou
],
```

The order matters here, they'll be execute as queue, from the most global scope to the most specific (global scope > topic scope > group_consumers scope).

The order matters here, they'll execute as queue, from the most global scope to the most specific (global scope > topic scope > group_consumers scope).

<a name="schemas"></a>
### Schemas
Expand Down Expand Up @@ -181,10 +180,17 @@ $ php artisan make:kafka-consumer PriceUpdateHandler
This will create a KafkaConsumer class inside the application, on the `app/Kafka/Consumers/` directory.

There, you'll have a `handler` method, which will send all records from the topic to the Consumer.
Methods will be available for handling exceptions:

Available methods:

- `warning` method will be call whenever something not critical is received from the topic.
Like a message informing that there's no more records to consume.
- `failure` method will be call whenever something critical happens, like an error to decode the record.


- `failure` will be call whenever something critical happens, like an error to decode the record.


- `finished` will be call when queue finishes

```php
use App\Repository;
Expand Down Expand Up @@ -230,10 +236,14 @@ class PriceUpdateHandler extends AbstractHandler
{
// handle failure exception
}

public function finished(): void
{
//handle queue end
}
}
```


<a name="commands-middleware"></a>
#### Creating Middleware
You can create a middleware class, that works between the received data from broker and before being passed into consumers, using the follow command:
Expand Down Expand Up @@ -275,29 +285,42 @@ stdout_logfile=/var/log/default/kafka-consumer-price-update.log

Although you can run this simple command, it provides some options you can pass to make it more flexible to your needs.

- `--broker=`

Sometimes, you may want to change which broker the consumer should connect to (maybe for testing/debug purposes).
For that, you just nedd to call the `--broker` option with another broker connection key already set in the `config/kafka.php` file.

`$ php artisan kafka:consume price-update --broker='some-other-broker'`

- `--offset=`

And if you need to start the consumption of a topic in a specific offset (it can be useful for debug purposes)
you can pass the `--offset=` option, but for this, it will be required to specify the partition too.

`$ php artisan kafka:consume price-update --partition=2 --offset=34`

$ php artisan kafka:consume price-update --partition=2 --offset=34


- `--partition=`

If you wish do specify in which partition the consumer must be attached, you can set the option `--partition=`.
Set in which partition the consumer must be attached.


$ php artisan kafka:consume price-update --partition=2 --offset=34

`$ php artisan kafka:consume price-update --partition=2 --offset=34`

- `--timeout=`

You can specify what would be the timeout for the consumer, by using the `--timeout=` option, the time is in milliseconds.
Set the timeout for the consumer in milliseconds.


$ php artisan kafka:consume price-update --timeout=23000


- `--config_name=`

Specify from what file topics configuration should be read.


$ php artisan kafka:consume topic-name --config_name=config.file


- `--service_name=`

Specify from what file services configurations should be read.

`$ php artisan kafka:consume price-update --timeout=23000`

$ php artisan kafka:consume price-update --service_name=config.file
Loading

0 comments on commit 1de2aaf

Please sign in to comment.