Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config options #110

Merged
merged 72 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
a83f2cc
chore(configOptions): add ConfigOptions command to ServiceProvider
hcdias Jan 31, 2022
7b0061f
chore(configOptions): call new configOptions artisan command
hcdias Jan 31, 2022
574f219
chore(configOptions): add ConfigOptionsCommand
hcdias Jan 31, 2022
a0e26ea
chore: make config manager with config options
djonasm Jan 31, 2022
1407d9f
chore: replace config manager to config options
djonasm Jan 31, 2022
a57b4de
chore: migrate Avro to use ConfigOptions\AvroSchema
djonasm Feb 1, 2022
14046ed
chore: migrate Aith factory to use config options
djonasm Feb 1, 2022
9b3c028
chore: remove configManager and add Sasl config class
hcdias Feb 1, 2022
2e937f6
chore: remove configManager and add SSL config class
hcdias Feb 1, 2022
96bfd52
chore: fix code standard with phpcbf
hcdias Feb 1, 2022
865e543
chore: debug config options
hcdias Mar 7, 2022
a89f497
chore: debug config options
hcdias Mar 9, 2022
de86b27
chore: create interfaces for classes
hcdias Mar 10, 2022
7294e95
chore: update calls from configManager to configOptions
hcdias Mar 10, 2022
3af4711
chore: add interfaces
hcdias Mar 10, 2022
a9596cd
chore: add AvroSchemaMixedEncoderTest
hcdias Mar 10, 2022
61dfbab
chore: remove inutilized interfaces
hcdias Mar 21, 2022
ca175af
chore: add method to get consumerGroupId
hcdias Mar 21, 2022
8bddb6b
chore: replace configManager by configOptions
hcdias Mar 21, 2022
d33816e
fix: update consumer middlewares
djonasm Mar 21, 2022
7b7407b
fix: get and set options from consumer command
hcdias Mar 23, 2022
b8059b3
chore: remove inutilized code
hcdias Mar 23, 2022
a8291f4
chore: create public variable for message
hcdias Mar 23, 2022
1d677b0
chore: create public variable for message
hcdias Mar 23, 2022
8819606
chore: create public variable for message
hcdias Mar 23, 2022
5b718ec
fix: change parameter order
hcdias Mar 23, 2022
62a1938
docs: add CHANGELOG file
hcdias Mar 23, 2022
d8bf597
docs: add CHANGELOG file
hcdias Mar 23, 2022
8ddde21
chore: rename parameter from 'producer' to 'configOptions'
hcdias Mar 24, 2022
f61dd2d
chore: remove annotation
hcdias Mar 24, 2022
4b83d91
chore: update middleware resolution on consumerCommand
hcdias Mar 24, 2022
2a8149c
chore: remove ManageTest
hcdias Mar 25, 2022
16118fd
chore: remove occurrences of ConsumerConfigManager
hcdias Mar 25, 2022
3df6426
chore: remove TODO comment
hcdias Mar 25, 2022
c0d2378
chore: remove ProducerConfigManager occurences
hcdias Mar 25, 2022
353a228
chore: remove unecessary interface
hcdias Apr 19, 2022
ff69908
chore: remove unecessary code block
hcdias Apr 19, 2022
9342da4
chore: define constant visibility
hcdias Apr 19, 2022
9fa3113
chore: change config file format
hcdias Apr 1, 2022
ddc454c
chore: adjust tests to match new config file format
hcdias Apr 1, 2022
b060e90
chore: update docs
hcdias Apr 4, 2022
6dc74ee
chore: update console parameters
hcdias Apr 4, 2022
ab85f7e
chore: remove unused method
hcdias Apr 4, 2022
f25959d
chore: add how to use data objects
hcdias Apr 4, 2022
eb023d5
chore: remove blank spaces
hcdias Apr 4, 2022
708e14e
chore: fix codacy warnings
hcdias Apr 4, 2022
1b26ba4
chore: fix codacy warnings
hcdias Apr 4, 2022
84cda5b
docs: add commens explaining parameters
hcdias Apr 4, 2022
4b618a5
docs: document finished method
hcdias Apr 4, 2022
9822cf8
docs: add anchor to section
hcdias Apr 4, 2022
2ba1050
chore: remove blank spaces on docs
hcdias Apr 5, 2022
58e1c51
chore: fix codacy
hcdias Apr 5, 2022
c6e3f20
Update docs/quick-usage.md
hcdias Apr 7, 2022
e51273f
fix(codacy): fix codacy warnings
hcdias Apr 25, 2022
5a3fcda
docs: add upgrade guide
hcdias May 3, 2022
066df6d
fix: add handleMiddlewares method
hcdias Apr 5, 2023
402a59d
style: fix coding standard violations
hcdias Oct 9, 2023
d886eb4
style: fix coding standard violations
hcdias Oct 9, 2023
f6b6af4
style: supress phpcs typehing warning
hcdias Oct 9, 2023
5afadef
style: supress phpcs typehing warning
hcdias Oct 9, 2023
f9a312a
chore: remove call to inexistent method
hcdias Oct 9, 2023
c7ffc00
chore: supress psalm alert
hcdias Oct 9, 2023
6d2fedb
tests: remove configManager related tests
hcdias Oct 9, 2023
42ad7dc
gha: update kafka broker connection
hcdias Oct 9, 2023
96e072c
gha: undo update kafka broker connection
hcdias Oct 9, 2023
33556c5
gha: remove kafka broker variable
hcdias Oct 10, 2023
a827976
gha: get back kafka broker variable
hcdias Oct 10, 2023
be49acf
fix(tests): get connection by env
GetulioMR Oct 10, 2023
0e52815
test(producer): add expect not to perform assertion
GetulioMR Oct 10, 2023
f44e2f9
fix(tests): enable to get broker connection from env
GetulioMR Oct 10, 2023
a3551b5
fix(tests): enable to get broker connection from env
GetulioMR Oct 10, 2023
40173c0
fix(tests): adjust mock expectations
GetulioMR Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Added

- Added AvroSchemaMixedEncoderTest
- Added AvroSchemaDecoderTest
- Added ProducerWithConfigOptionsTest
- Added ConfigOptionsCommand to run commands with ConfigOptions class
- Added pt_BR contributing section
- Added setup-dev script on composer
- Added grumphp commit validation

### Fixed

- Fixed parameters and options override on Consumer\Config class
- Update instructions on contribute section
- Update project install section

### Changed

- Updated class from ConfigManager to ConfigOptions on unit tests
- Updated class from ConfigManager to ConfigOptions where any config request was made
- Consumer and Producer middlewares resolution
164 changes: 34 additions & 130 deletions config/kafka.php
Original file line number Diff line number Diff line change
@@ -1,65 +1,6 @@
<?php

return [
/*
|--------------------------------------------------------------------------
| AVRO Schemas
|--------------------------------------------------------------------------
|
| Here you may specify the schema details configured on topic's broker key.
| Schema are kind of "contract" between the producer and consumer.
| For now, we are just decoding AVRO, not encoding.
|
*/

'avro_schemas' => [
'default' => [
'url' => '',
// Disable SSL verification on schema request.
'ssl_verify' => true,
// This option will be put directly into a Guzzle http request
// Use this to do authorizations or send any headers you want.
// Here is a example of basic authentication on AVRO schema.
'request_options' => [
'headers' => [
'Authorization' => [
'Basic ' . base64_encode(
env('AVRO_SCHEMA_USERNAME')
. ':'
. env('AVRO_SCHEMA_PASSWORD')
),
],
],
],
],
],

/*
|--------------------------------------------------------------------------
| Brokers
|--------------------------------------------------------------------------
|
| Here you may specify the connections details for each broker configured
| on topic's broker key.
|
*/

'brokers' => [
'default' => [
'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'),

// If your broker doest not have authentication, you can
// remove this configuration, or set as empty.
// The Authentication types may be "ssl" or "none"
'auth' => [
'type' => 'ssl', // ssl and none
'ca' => storage_path('ca.pem'),
'certificate' => storage_path('kafka.cert'),
'key' => storage_path('kafka.key'),
],
],
],

'topics' => [
// This is your topic "keyword" where you will put all configurations needed
// on this specific topic.
Expand All @@ -68,57 +9,44 @@
// your messages from kafka.
'topic_id' => 'kafka-test',

// Here you may point the key of the broker configured above.
'broker' => 'default',

// Configurations specific for consumer
//your consumer configurations
'consumer' => [
// You may define more than one consumer group per topic.
// If there is just one defined, it will be used by default,
// otherwise, you may pass which consumer group should be used
// when using the consumer command.
'consumer_groups' => [
'test-consumer-group' => [

// Action to take when there is no initial
// offset in offset store or the desired offset is out of range.
// This config will be passed to 'auto.offset.reset'.
// The valid options are: smallest, earliest, beginning, largest, latest, end, error.
'offset_reset' => 'earliest',

// The offset at which to start consumption. This only applies if partition is set.
// You can use a positive integer or any of the constants: RD_KAFKA_OFFSET_BEGINNING,
// RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
'offset' => 0,

// The partition to consume. It can be null,
// if you don't wish do specify one.
'partition' => 0,

// A consumer class that implements ConsumerTopicHandler
'handler' => '\App\Kafka\Consumers\ConsumerExample',

// A Timeout to listen to a message. That means: how much
// time we need to wait until receiving a message?
'timeout' => 20000,

// Once you've enabled this, the Kafka consumer will commit the
// offset of the last message received in response to its poll() call
'auto_commit' => true,

// If commit_async is false process block until offsets are committed or the commit fails.
// Only works when auto_commit is false
'commit_async' => false,

// An array of middlewares applied only for this consumer_group
'middlewares' => [],
],
],
'consumer_group' => 'test-consumer-group',
// Action to take when there is no initial
// offset in offset store or the desired offset is out of range.
// This config will be passed to 'auto.offset.reset'.
// The valid options are: smallest, earliest, beginning, largest, latest, end, error.
'offset_reset' => 'earliest',

// The offset at which to start consumption. This only applies if partition is set.
// You can use a positive integer or any of the constants: RD_KAFKA_OFFSET_BEGINNING,
// RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
'offset' => 0,

// The partition to consume. It can be null,
// if you don't wish do specify one.
'partition' => 0,

// A consumer class that implements ConsumerTopicHandler
'handler' => '\App\Kafka\Consumers\ConsumerExample',

// A Timeout to listen to a message. That means: how much
// time we need to wait until receiving a message?
'timeout' => 20000,

// Once you've enabled this, the Kafka consumer will commit the
// offset of the last message received in response to its poll() call
'auto_commit' => true,

// If commit_async is false process block until offsets are committed or the commit fails.
// Only works when auto_commit is false
'commit_async' => false,

// An array of middlewares applied only for this consumer_group
'middlewares' => [],
],

// Configurations specific for producer
'producer' => [

// Sets to true if you want to know if a message was successfully posted.
'required_acknowledgment' => true,

Expand Down Expand Up @@ -149,28 +77,4 @@
],
],
],

/*
|--------------------------------------------------------------------------
| Global Middlewares
|--------------------------------------------------------------------------
|
| Here you may specify the global middlewares that will be applied for every
| consumed topic. Middlewares work between the received data from broker and
| before being passed into consumers.
| Available middlewares: log, avro-decode
|
*/

'middlewares' => [
'consumer' => [
\Metamorphosis\Middlewares\Log::class,
],
'producer' => [
\Metamorphosis\Middlewares\Log::class,
],
'global' => [
\Metamorphosis\Middlewares\Log::class,
],
],
];
30 changes: 30 additions & 0 deletions config/service.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

return [
'avro_schema' => [
'url' => '',
'request_options' => [
'headers' => [
'Authorization' => [
'Basic' . base64_encode(
env('AVRO_SCHEMA_USERNAME') . ':' . env(
'AVRO_SCHEMA_PASSWORD'
)
),
],
],
],
'ssl_verify' => true,
'username' => 'USERNAME',
'password' => 'PASSWORD',
],
'broker' => [
'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'),
'auth' => [
'type' => 'ssl', // ssl and none
'ca' => storage_path('ca.pem'),
'certificate' => storage_path('kafka.cert'),
'key' => storage_path('kafka.key'),
],
],
];
57 changes: 40 additions & 17 deletions docs/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ If you wish, you may set a middleware to run of a topic level or a consumer grou
],
```

The order matters here, they'll be execute as queue, from the most global scope to the most specific (global scope > topic scope > group_consumers scope).

The order matters here, they'll execute as queue, from the most global scope to the most specific (global scope > topic scope > group_consumers scope).

<a name="schemas"></a>
### Schemas
Expand Down Expand Up @@ -181,10 +180,17 @@ $ php artisan make:kafka-consumer PriceUpdateHandler
This will create a KafkaConsumer class inside the application, on the `app/Kafka/Consumers/` directory.

There, you'll have a `handler` method, which will send all records from the topic to the Consumer.
Methods will be available for handling exceptions:

Available methods:

- `warning` method will be call whenever something not critical is received from the topic.
Like a message informing that there's no more records to consume.
- `failure` method will be call whenever something critical happens, like an error to decode the record.


- `failure` will be call whenever something critical happens, like an error to decode the record.


- `finished` will be call when queue finishes

```php
use App\Repository;
Expand Down Expand Up @@ -230,10 +236,14 @@ class PriceUpdateHandler extends AbstractHandler
{
// handle failure exception
}

public function finished(): void
{
//handle queue end
}
}
```


<a name="commands-middleware"></a>
#### Creating Middleware
You can create a middleware class, that works between the received data from broker and before being passed into consumers, using the follow command:
Expand Down Expand Up @@ -275,29 +285,42 @@ stdout_logfile=/var/log/default/kafka-consumer-price-update.log

Although you can run this simple command, it provides some options you can pass to make it more flexible to your needs.

- `--broker=`

Sometimes, you may want to change which broker the consumer should connect to (maybe for testing/debug purposes).
For that, you just nedd to call the `--broker` option with another broker connection key already set in the `config/kafka.php` file.

`$ php artisan kafka:consume price-update --broker='some-other-broker'`

- `--offset=`

And if you need to start the consumption of a topic in a specific offset (it can be useful for debug purposes)
you can pass the `--offset=` option, but for this, it will be required to specify the partition too.

`$ php artisan kafka:consume price-update --partition=2 --offset=34`

$ php artisan kafka:consume price-update --partition=2 --offset=34


- `--partition=`

If you wish do specify in which partition the consumer must be attached, you can set the option `--partition=`.
Set in which partition the consumer must be attached.


$ php artisan kafka:consume price-update --partition=2 --offset=34

`$ php artisan kafka:consume price-update --partition=2 --offset=34`

- `--timeout=`

You can specify what would be the timeout for the consumer, by using the `--timeout=` option, the time is in milliseconds.
Set the timeout for the consumer in milliseconds.


$ php artisan kafka:consume price-update --timeout=23000


- `--config_name=`

Specify from what file topics configuration should be read.


$ php artisan kafka:consume topic-name --config_name=config.file


- `--service_name=`

Specify from what file services configurations should be read.

`$ php artisan kafka:consume price-update --timeout=23000`

$ php artisan kafka:consume price-update --service_name=config.file
Loading