diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..342b1ef1 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,28 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +### Added + +- Added AvroSchemaMixedEncoderTest +- Added AvroSchemaDecoderTest +- Added ProducerWithConfigOptionsTest +- Added ConfigOptionsCommand to run commands with ConfigOptions class +- Added pt_BR contributing section +- Added setup-dev script on composer +- Added grumphp commit validation + +### Fixed + +- Fixed parameters and options override on Consumer\Config class +- Update instructions on contribute section +- Update project install section + +### Changed + +- Updated class from ConfigManager to ConfigOptions on unit tests +- Updated class from ConfigManager to ConfigOptions where any config request was made +- Consumer and Producer middlewares resolution diff --git a/config/kafka.php b/config/kafka.php index 9f9ae370..3e7f0764 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -1,65 +1,6 @@ [ - 'default' => [ - 'url' => '', - // Disable SSL verification on schema request. - 'ssl_verify' => true, - // This option will be put directly into a Guzzle http request - // Use this to do authorizations or send any headers you want. - // Here is a example of basic authentication on AVRO schema. - 'request_options' => [ - 'headers' => [ - 'Authorization' => [ - 'Basic ' . base64_encode( - env('AVRO_SCHEMA_USERNAME') - . ':' - . env('AVRO_SCHEMA_PASSWORD') - ), - ], - ], - ], - ], - ], - - /* - |-------------------------------------------------------------------------- - | Brokers - |-------------------------------------------------------------------------- - | - | Here you may specify the connections details for each broker configured - | on topic's broker key. - | - */ - - 'brokers' => [ - 'default' => [ - 'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'), - - // If your broker doest not have authentication, you can - // remove this configuration, or set as empty. - // The Authentication types may be "ssl" or "none" - 'auth' => [ - 'type' => 'ssl', // ssl and none - 'ca' => storage_path('ca.pem'), - 'certificate' => storage_path('kafka.cert'), - 'key' => storage_path('kafka.key'), - ], - ], - ], - 'topics' => [ // This is your topic "keyword" where you will put all configurations needed // on this specific topic. @@ -68,57 +9,44 @@ // your messages from kafka. 'topic_id' => 'kafka-test', - // Here you may point the key of the broker configured above. - 'broker' => 'default', - - // Configurations specific for consumer + //your consumer configurations 'consumer' => [ - // You may define more than one consumer group per topic. - // If there is just one defined, it will be used by default, - // otherwise, you may pass which consumer group should be used - // when using the consumer command. - 'consumer_groups' => [ - 'test-consumer-group' => [ - - // Action to take when there is no initial - // offset in offset store or the desired offset is out of range. - // This config will be passed to 'auto.offset.reset'. - // The valid options are: smallest, earliest, beginning, largest, latest, end, error. - 'offset_reset' => 'earliest', - - // The offset at which to start consumption. This only applies if partition is set. - // You can use a positive integer or any of the constants: RD_KAFKA_OFFSET_BEGINNING, - // RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED. - 'offset' => 0, - - // The partition to consume. It can be null, - // if you don't wish do specify one. - 'partition' => 0, - - // A consumer class that implements ConsumerTopicHandler - 'handler' => '\App\Kafka\Consumers\ConsumerExample', - - // A Timeout to listen to a message. That means: how much - // time we need to wait until receiving a message? - 'timeout' => 20000, - - // Once you've enabled this, the Kafka consumer will commit the - // offset of the last message received in response to its poll() call - 'auto_commit' => true, - - // If commit_async is false process block until offsets are committed or the commit fails. - // Only works when auto_commit is false - 'commit_async' => false, - - // An array of middlewares applied only for this consumer_group - 'middlewares' => [], - ], - ], + 'consumer_group' => 'test-consumer-group', + // Action to take when there is no initial + // offset in offset store or the desired offset is out of range. + // This config will be passed to 'auto.offset.reset'. + // The valid options are: smallest, earliest, beginning, largest, latest, end, error. + 'offset_reset' => 'earliest', + + // The offset at which to start consumption. This only applies if partition is set. + // You can use a positive integer or any of the constants: RD_KAFKA_OFFSET_BEGINNING, + // RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED. + 'offset' => 0, + + // The partition to consume. It can be null, + // if you don't wish do specify one. + 'partition' => 0, + + // A consumer class that implements ConsumerTopicHandler + 'handler' => '\App\Kafka\Consumers\ConsumerExample', + + // A Timeout to listen to a message. That means: how much + // time we need to wait until receiving a message? + 'timeout' => 20000, + + // Once you've enabled this, the Kafka consumer will commit the + // offset of the last message received in response to its poll() call + 'auto_commit' => true, + + // If commit_async is false process block until offsets are committed or the commit fails. + // Only works when auto_commit is false + 'commit_async' => false, + + // An array of middlewares applied only for this consumer_group + 'middlewares' => [], ], - // Configurations specific for producer 'producer' => [ - // Sets to true if you want to know if a message was successfully posted. 'required_acknowledgment' => true, @@ -149,28 +77,4 @@ ], ], ], - - /* - |-------------------------------------------------------------------------- - | Global Middlewares - |-------------------------------------------------------------------------- - | - | Here you may specify the global middlewares that will be applied for every - | consumed topic. Middlewares work between the received data from broker and - | before being passed into consumers. - | Available middlewares: log, avro-decode - | - */ - - 'middlewares' => [ - 'consumer' => [ - \Metamorphosis\Middlewares\Log::class, - ], - 'producer' => [ - \Metamorphosis\Middlewares\Log::class, - ], - 'global' => [ - \Metamorphosis\Middlewares\Log::class, - ], - ], ]; diff --git a/config/service.php b/config/service.php new file mode 100644 index 00000000..7eacd41f --- /dev/null +++ b/config/service.php @@ -0,0 +1,30 @@ + [ + 'url' => '', + 'request_options' => [ + 'headers' => [ + 'Authorization' => [ + 'Basic' . base64_encode( + env('AVRO_SCHEMA_USERNAME') . ':' . env( + 'AVRO_SCHEMA_PASSWORD' + ) + ), + ], + ], + ], + 'ssl_verify' => true, + 'username' => 'USERNAME', + 'password' => 'PASSWORD', + ], + 'broker' => [ + 'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'), + 'auth' => [ + 'type' => 'ssl', // ssl and none + 'ca' => storage_path('ca.pem'), + 'certificate' => storage_path('kafka.cert'), + 'key' => storage_path('kafka.key'), + ], + ], +]; diff --git a/docs/advanced.md b/docs/advanced.md index f290b06c..686d80ef 100644 --- a/docs/advanced.md +++ b/docs/advanced.md @@ -136,8 +136,7 @@ If you wish, you may set a middleware to run of a topic level or a consumer grou ], ``` -The order matters here, they'll be execute as queue, from the most global scope to the most specific (global scope > topic scope > group_consumers scope). - +The order matters here, they'll execute as queue, from the most global scope to the most specific (global scope > topic scope > group_consumers scope). ### Schemas @@ -181,10 +180,17 @@ $ php artisan make:kafka-consumer PriceUpdateHandler This will create a KafkaConsumer class inside the application, on the `app/Kafka/Consumers/` directory. There, you'll have a `handler` method, which will send all records from the topic to the Consumer. -Methods will be available for handling exceptions: + +Available methods: + - `warning` method will be call whenever something not critical is received from the topic. Like a message informing that there's no more records to consume. - - `failure` method will be call whenever something critical happens, like an error to decode the record. + + + - `failure` will be call whenever something critical happens, like an error to decode the record. + + + - `finished` will be call when queue finishes ```php use App\Repository; @@ -230,10 +236,14 @@ class PriceUpdateHandler extends AbstractHandler { // handle failure exception } + + public function finished(): void + { + //handle queue end + } } ``` - #### Creating Middleware You can create a middleware class, that works between the received data from broker and before being passed into consumers, using the follow command: @@ -275,29 +285,42 @@ stdout_logfile=/var/log/default/kafka-consumer-price-update.log Although you can run this simple command, it provides some options you can pass to make it more flexible to your needs. -- `--broker=` - - Sometimes, you may want to change which broker the consumer should connect to (maybe for testing/debug purposes). - For that, you just nedd to call the `--broker` option with another broker connection key already set in the `config/kafka.php` file. - - `$ php artisan kafka:consume price-update --broker='some-other-broker'` - - `--offset=` And if you need to start the consumption of a topic in a specific offset (it can be useful for debug purposes) you can pass the `--offset=` option, but for this, it will be required to specify the partition too. - `$ php artisan kafka:consume price-update --partition=2 --offset=34` + + $ php artisan kafka:consume price-update --partition=2 --offset=34 + - `--partition=` - If you wish do specify in which partition the consumer must be attached, you can set the option `--partition=`. + Set in which partition the consumer must be attached. + + + $ php artisan kafka:consume price-update --partition=2 --offset=34 - `$ php artisan kafka:consume price-update --partition=2 --offset=34` - `--timeout=` - You can specify what would be the timeout for the consumer, by using the `--timeout=` option, the time is in milliseconds. + Set the timeout for the consumer in milliseconds. + + + $ php artisan kafka:consume price-update --timeout=23000 + + +- `--config_name=` + + Specify from what file topics configuration should be read. + + + $ php artisan kafka:consume topic-name --config_name=config.file + + +- `--service_name=` + + Specify from what file services configurations should be read. - `$ php artisan kafka:consume price-update --timeout=23000` + $ php artisan kafka:consume price-update --service_name=config.file diff --git a/docs/quick-usage.md b/docs/quick-usage.md index 3d003da4..d6a143c8 100644 --- a/docs/quick-usage.md +++ b/docs/quick-usage.md @@ -1,6 +1,7 @@ ## Quick Usage Guide -- [Config file](#config) +- [Configure with files](#config) +- [Configure using data objects](#config-dto) - [Consumer](#consumer) - [Creating Consumer](#creating-consumer) - [Running](#running-consumer) @@ -8,76 +9,102 @@ - [Produce Message](#produce-message) -### Config file: `config/kafka.php` - -The config file holds all information about brokers, topics, consumer groups and middlewares. - -To quickly start using, we can focus in two sections: -- Brokers - - An array of brokers, with connection and authentication configurations: - - - `connections`: *required*. can be a `string` with multiple connections separated by comma or an `array` of connections (as `string`) - - - `auth`: *optional*. out of the box, the package can connect with SSL Authentication only or without any authentication - - ```php - 'brokers' => [ - 'price_brokers' => [ - 'connections' => 'localhost:8091,localhost:8092', - 'auth' => [ - 'type' => 'ssl', - 'ca' => storage_path('ca.pem'), - 'certificate' => storage_path('kafka.cert'), - 'key' => storage_path('kafka.key'), - ], - ], - 'stock_brokers' => [ - 'connections' => ['localhost:8091', 'localhost:8092'], - 'auth' => [], // can be an empty array or even don't have this key in the broker config - ], - ], - ``` - -- Topics - - An array of topics configuration, such as the topic name, which broker connection should use, consumer groups and middlewares. - - Here we can specify the group consumers, each topic can have multiple groups, - and each group holds the configuration for which consumer, offset_reset (for setting initial offset) and middleware it must use. - - ```php - 'topics' => [ - 'price_update' => [ - 'topic' => 'products.price.update', - 'broker' => 'price_brokers', - 'consumer_groups' => [ - 'default' => [ - 'offset_reset' => 'smallest', - 'handler' => '\App\Kafka\Consumers\PriceUpdateConsumer', - ], - ], - ], - ], - ``` +### Configure using files + +To get started using configuration files, at least two files are needed. A file to keep the topics +configuration and a file to keep the broker and schema configuration. In this example, we will use the files +`config/kafka.php` and `config/service.php`. + +### File `config/kafka.php`: + +This file keeps configurations about topics, consumers and producers. +It should return an array of topics containing the topic name, topic_id, consumer, producer and the settings for each one of them: + +```php + [ + 'this_is_your_topic_name' => [ + 'topic_id' => "this_is_your_topic_id", + 'consumer' => [ + 'consumer_group' => 'your-consumer-group', + 'offset_reset' => 'earliest', + 'offset' => 0, + 'partition' => 0, + 'handler' => '\App\Kafka\Consumers\ConsumerExample', + 'timeout' => 20000, + 'auto_commit' => true, + 'commit_async' => false, + 'middlewares' => [], + ], + + 'producer' => [ + 'required_acknowledgment' => true, + 'is_async' => true, + 'max_poll_records' => 500, + 'flush_attempts' => 10, + 'middlewares' => [], + 'timeout' => 10000, + 'partition' => constant('RD_KAFKA_PARTITION_UA') ?? -1, + ], + ] + ], +]; +``` + +### File `config/service.php` + +This file keeps configurations about **broker** and **schema** utilized. + +```php + [ + 'url' => '', + 'request_options' => [ + 'headers' => [ + 'Authorization' => [ + 'Basic ' . base64_encode( + env('AVRO_SCHEMA_USERNAME').':'.env('AVRO_SCHEMA_PASSWORD') + ), + ], + ], + ], + + 'ssl_verify' => true, + 'username' => 'USERNAME', + 'password' => 'PASSWORD', + ], + + 'broker' => [ + 'connections' => 'kafka:9092', + 'auth' => [ + 'type' => 'ssl', + 'ca' => storage_path('ca.pem'), + 'certificate' => storage_path('kafka.cert'), + 'key' => storage_path('kafka.key'), + ], + ], +]; +``` ### Consumer -After setting up the required configs, you need to create the consumer, which will handle all records received -from the topic specified in the config. +After setting up the required configuration, you must create a consumer to handle records received +from the specified topic in your configuration. -#### Creating Consumer +#### Creating a Consumer -Creating the consumer is easy as running the following command: +To create a consumer run the following command: ```bash $ php artisan make:kafka-consumer PriceUpdateConsumer ``` -This will create a KafkaConsumer class inside the application, on the app/Kafka/Consumers/ directory - -There, you'll have a handler method, which will send all records from the topic to the Consumer, -also, methods will be available for handling exceptions +This will create a KafkaConsumer class on the app/Kafka/Consumers/ directory with the following +content: ```php use App\Kafka\Consumers\PriceUpdateConsumer; @@ -109,20 +136,19 @@ class PriceUpdateConsumer extends AbstractHandler ``` -#### Running consumer +#### Running the consumer -Now you just need to start consuming the topic. +To start consuming the topic, the simplest way to see it working is by running the kafka:consume command along with the topic name, topic configuration file and service configuration file: -The simplest way to see it working is by running the kafka:consume command along with the topic name -declared in the topics config key: ```bash -$ php artisan kafka:consume price-update -``` +$ php artisan kafka:consume this_is_your_topic_name --config_name=config.file --service_name=service.file +``` This command will run in a `while true`, that means, it will never stop running. But, errors can happen, so we strongly advice you to run this command along with [supervisor](http://supervisord.org/running.html), like this example below: + ```bash [program:kafka-consumer-price-update] process_name=%(program_name)s_%(process_num)02d @@ -135,32 +161,31 @@ redirect_stderr=true stdout_logfile=/var/log/default/kafka-consumer-price-update.log ``` -That's it. For more information about usage, middlewares, broker authentication, consumer groups and other advanced topics, please have a look at our [Advanced Usage Guide](advanced.md). - - -### Producer + +#### Using data objects -Producer also required configs, which will produce all records using parameters specified in the config. +To configure and consume using classes: ```php - 'brokers' => [ - 'local-dev' => [ - 'connections' => 'kafka:9092', - ], - ], - 'topics' => [ - 'product-updated' => [ - 'topic_id' => 'product_updated', - 'broker' => 'local-dev', - ], - ], + use Metamorphosis\Consumer; + use Metamorphosis\TopicHandler\ConfigOptions\Factories\ConsumerFactory; + + $topic = config('yourConfig.topics.topic-id'); + $broker = config('yourService.broker'); + $avro = config('yourService.avro_schema'); + + $consumerConfiguration = ConsumerFactory::make($broker, $topic, $avro); + $consumer = app(Consumer::class, ['configOptions' => $consumerConfiguration]); + + $consumer->consume(); ``` + +That's it. For more information about usage, middlewares, broker authentication, consumer groups and other advanced topics, please have a look at our [Advanced Usage Guide](advanced.md). + ### Produce Message -Creating Producer handler. - -The Producer must extends AbstractHandler class and can be empty. +To create a producer handler, create a class that extends `Metamorphosis\TopicHandler\Producer\AbstractHandler` class: ```php -### Arquivo de configuração: `config/kafka.php` - -Esse arquivo contém todas as informações sobre *brokers*, tópicos, *consumer groups* e *middlewares*. - -Para começar a usar, podemos focar em duas seções: - -- Brokers - - Uma lista de *brokers*, com configurações de conexão e autenticação. - - - `connections`: *obrigatório*. Pode ser uma `string` com múltiplas conexões separadas por vírgula ou uma `array` de conexões. - - - `auth`: *opcional*. É possivel se conectar sem autenticação ou usando autenticação SSL. - - ```php - 'brokers' => [ - 'price_brokers' => [ - 'connections' => 'localhost:8091,localhost:8092', - 'auth' => [ - 'type' => 'ssl', - 'ca' => storage_path('ca.pem'), - 'certificate' => storage_path('kafka.cert'), - 'key' => storage_path('kafka.key'), - ], - ], - 'stock_brokers' => [ - 'connections' => ['localhost:8091', 'localhost:8092'], - 'auth' => [], // pode ser uma array vazia ou até mesmo não ter essa chave aqui. - ], - ], - ``` - -- Tópicos - - Uma lista de configuração de tópicos, como nome, qual *broker* usar, *consumer group* e *middlewares*. - - Aqui você pode especificar os *consumer groups*. Cada tópico pode ter vários grupos, - e cada grupo tem a sua configuração para cada *consumer*, *offset_reset* (para definir um *offset* inicial) e *middlewares* que devem ser usados. - - ```php - 'topics' => [ - 'price_update' => [ - 'topic' => 'products.price.update', - 'broker' => 'price_brokers', - 'consumer_groups' => [ - 'default' => [ - 'offset_reset' => 'smallest', - 'handler' => '\App\Kafka\Consumers\PriceUpdateConsumer', - ], - ], - ], - ], - ``` +### Configurar usando arquivos + +Para começar a usar arquivos de configuração, são necessários pelo menos dois arquivos. Um arquivo para manter os tópicos +configuração e um arquivo para manter a configuração do broker e do esquema. Neste exemplo, usaremos os arquivos +`config/kafka.php` e `config/service.php`. + + +### Arquivo `config/kafka.php`: + +Este arquivo mantém configurações sobre tópicos, consumidores e produtores. +Deve retornar um array de tópicos contendo o nome do tópico, topic_id, consumidor, produtor e as configurações de cada um deles: + + +```php + [ + 'this_is_your_topic_name' => [ + 'topic_id' => "this_is_your_topic_id", + 'consumer' => [ + 'consumer_group' => 'your-consumer-group', + 'offset_reset' => 'earliest', + 'offset' => 0, + 'partition' => 0, + 'handler' => '\App\Kafka\Consumers\ConsumerExample', + 'timeout' => 20000, + 'auto_commit' => true, + 'commit_async' => false, + 'middlewares' => [], + ], + + 'producer' => [ + 'required_acknowledgment' => true, + 'is_async' => true, + 'max_poll_records' => 500, + 'flush_attempts' => 10, + 'middlewares' => [], + 'timeout' => 10000, + 'partition' => constant('RD_KAFKA_PARTITION_UA') ?? -1, + ], + ] + ], +]; +``` + +### File `config/service.php` + +Esse arquivo possui as configurações de **broker** e **schema** utilizados. + +```php + [ + 'url' => '', + 'request_options' => [ + 'headers' => [ + 'Authorization' => [ + 'Basic ' . base64_encode( + env('AVRO_SCHEMA_USERNAME').':'.env('AVRO_SCHEMA_PASSWORD') + ), + ], + ], + ], + + 'ssl_verify' => true, + 'username' => 'USERNAME', + 'password' => 'PASSWORD', + ], + + 'broker' => [ + 'connections' => 'kafka:9092', + 'auth' => [ + 'type' => 'ssl', + 'ca' => storage_path('ca.pem'), + 'certificate' => storage_path('kafka.cert'), + 'key' => storage_path('kafka.key'), + ], + ], +]; +``` ### Consumer @@ -113,11 +145,11 @@ class PriceUpdateConsumer extends AbstractHandler Agora é só consumir o tópico. -A forma mais simples de ver tudo isso funcionando é rodando o comando `kafka:consume` com o nome do tópico que foi configurado: +Para começar a consumir o tópico, a maneira mais simples de vê-lo funcionando é executando o comando kafka:consume junto com o nome do tópico, arquivo de configuração do tópico e arquivo de configuração do serviço: ```bash -$ php artisan kafka:consume price-update -``` +$ php artisan kafka:consume this_is_your_topic_name --config_name=config.file --service_name=service.file +``` Esse comando rodará em um laço infinito (while true), isso significa que ele nunca irá parar de rodar por conta própria. Mas erros podem acontecer, então, recomendamos fortemente que você execute este comando com o auxílio de um [supervisor](http://supervisord.org/running.html), como no exemplo abaixo: diff --git a/docs/upgrade.md b/docs/upgrade.md new file mode 100644 index 00000000..5c4ef6fe --- /dev/null +++ b/docs/upgrade.md @@ -0,0 +1,76 @@ + +## Upgrade guide + +To upgrade from version X.x to version X.y: + +Move your `avroschema` and `broker` section from old `config/kafka.php` file into a new file: + + +```php + [ + 'this_is_your_topic_name' => [ + 'topic_id' => "this_is_your_topic_id", + 'consumer' => [ + 'consumer_group' => 'your-consumer-group', + 'offset_reset' => 'earliest', + 'offset' => 0, + 'partition' => 0, + 'handler' => '\App\Kafka\Consumers\ConsumerExample', + 'timeout' => 20000, + 'auto_commit' => true, + 'commit_async' => false, + 'middlewares' => [], + ], + + 'producer' => [ + 'required_acknowledgment' => true, + 'is_async' => true, + 'max_poll_records' => 500, + 'flush_attempts' => 10, + 'middlewares' => [], + 'timeout' => 10000, + 'partition' => constant('RD_KAFKA_PARTITION_UA') ?? -1, + ], + ] + ], +]; +``` + +Upgrade your topic configuration files: + +```php + [ + 'this_is_your_topic_name' => [ + 'topic_id' => "this_is_your_topic_id", + 'consumer' => [ + 'consumer_group' => 'your-consumer-group', + 'offset_reset' => 'earliest', + 'offset' => 0, + 'partition' => 0, + 'handler' => '\App\Kafka\Consumers\ConsumerExample', + 'timeout' => 20000, + 'auto_commit' => true, + 'commit_async' => false, + 'middlewares' => [], + ], + + 'producer' => [ + 'required_acknowledgment' => true, + 'is_async' => true, + 'max_poll_records' => 500, + 'flush_attempts' => 10, + 'middlewares' => [], + 'timeout' => 10000, + 'partition' => constant('RD_KAFKA_PARTITION_UA') ?? -1, + ], + ] + ], +]; +``` + diff --git a/readme.md b/readme.md index 76b81013..9206ab91 100644 --- a/readme.md +++ b/readme.md @@ -15,6 +15,7 @@ - [Installation](#installation) - [Quick Usage Guide](docs/quick-usage.md) - [Advanced Usage Guide](docs/advanced.md) +- [Upgrade Guide](docs/upgrade.md) - [Contributing](docs/CONTRIBUTING.md) - [License](#license) diff --git a/src/Authentication/Factory.php b/src/Authentication/Factory.php index 67e3e1fa..55fb7053 100644 --- a/src/Authentication/Factory.php +++ b/src/Authentication/Factory.php @@ -2,8 +2,8 @@ namespace Metamorphosis\Authentication; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Exceptions\AuthenticationException; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\AuthInterface; use RdKafka\Conf; class Factory @@ -14,9 +14,9 @@ class Factory private const TYPE_NONE = 'none'; - public static function authenticate(Conf $conf, AbstractConfigManager $configManager): void + public static function authenticate(Conf $conf, AuthInterface $configOptions): void { - $type = $configManager->get('auth.type'); + $type = $configOptions->getType(); switch ($type) { case null: case self::TYPE_NONE: @@ -26,14 +26,14 @@ public static function authenticate(Conf $conf, AbstractConfigManager $configMan case self::TYPE_SSL: app( SSLAuthentication::class, - compact('conf', 'configManager') + compact('conf', 'configOptions') ); break; case self::TYPE_SASL_SSL: app( SASLAuthentication::class, - compact('conf', 'configManager') + compact('conf', 'configOptions') ); break; diff --git a/src/Authentication/SASLAuthentication.php b/src/Authentication/SASLAuthentication.php index c0ebb42e..f4a0e3b0 100644 --- a/src/Authentication/SASLAuthentication.php +++ b/src/Authentication/SASLAuthentication.php @@ -2,44 +2,35 @@ namespace Metamorphosis\Authentication; -use Metamorphosis\AbstractConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\SaslSsl; use RdKafka\Conf; class SASLAuthentication implements AuthenticationInterface { private Conf $conf; - private AbstractConfigManager $configManager; + private SaslSsl $configOptions; - public function __construct(Conf $conf, AbstractConfigManager $configManager) + public function __construct(Conf $conf, SaslSsl $configOptions) { $this->conf = $conf; - $this->configManager = $configManager; + $this->configOptions = $configOptions; $this->authenticate(); } private function authenticate(): void { - $this->conf->set( - 'security.protocol', - $this->configManager->get('auth.type') - ); + $this->conf->set('security.protocol', $this->configOptions->getType()); // The mechanisms key is optional when configuring this kind of authentication // If the user does not specify the mechanism, the default will be 'PLAIN'. // But, to make config more clear, we are asking the user every time. $this->conf->set( 'sasl.mechanisms', - $this->configManager->get('auth.mechanisms') - ); - $this->conf->set( - 'sasl.username', - $this->configManager->get('auth.username') - ); - $this->conf->set( - 'sasl.password', - $this->configManager->get('auth.password') + $this->configOptions->getMechanisms() ); + $this->conf->set('sasl.username', $this->configOptions->getUsername()); + $this->conf->set('sasl.password', $this->configOptions->getPassword()); } } diff --git a/src/Authentication/SSLAuthentication.php b/src/Authentication/SSLAuthentication.php index 246ab5e7..e698dfe5 100644 --- a/src/Authentication/SSLAuthentication.php +++ b/src/Authentication/SSLAuthentication.php @@ -2,40 +2,31 @@ namespace Metamorphosis\Authentication; -use Metamorphosis\AbstractConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\Ssl; use RdKafka\Conf; class SSLAuthentication implements AuthenticationInterface { private Conf $conf; - private AbstractConfigManager $configManager; + private Ssl $configOptions; - public function __construct(Conf $conf, AbstractConfigManager $configManager) + public function __construct(Conf $conf, Ssl $configOptions) { $this->conf = $conf; - $this->configManager = $configManager; + $this->configOptions = $configOptions; $this->authenticate(); } private function authenticate(): void { - $this->conf->set( - 'security.protocol', - $this->configManager->get('auth.type') - ); - $this->conf->set( - 'ssl.ca.location', - $this->configManager->get('auth.ca') - ); + $this->conf->set('security.protocol', $this->configOptions->getType()); + $this->conf->set('ssl.ca.location', $this->configOptions->getCa()); $this->conf->set( 'ssl.certificate.location', - $this->configManager->get('auth.certificate') - ); - $this->conf->set( - 'ssl.key.location', - $this->configManager->get('auth.key') + $this->configOptions->getCertificate() ); + $this->conf->set('ssl.key.location', $this->configOptions->getKey()); } } diff --git a/src/Avro/ClientFactory.php b/src/Avro/ClientFactory.php index 0e537e4c..388a8fed 100644 --- a/src/Avro/ClientFactory.php +++ b/src/Avro/ClientFactory.php @@ -3,29 +3,31 @@ namespace Metamorphosis\Avro; use GuzzleHttp\Client as GuzzleClient; -use Metamorphosis\AbstractConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\AvroSchema; class ClientFactory { - public function make(AbstractConfigManager $configManager): CachedSchemaRegistryClient + protected const REQUEST_TIMEOUT = 2000; + + public function make(AvroSchema $avroSchema): CachedSchemaRegistryClient { - $guzzleHttp = $this->getGuzzleHttpClient($configManager); + $guzzleHttp = $this->getGuzzleHttpClient($avroSchema); $client = app(Client::class, ['client' => $guzzleHttp]); return app(CachedSchemaRegistryClient::class, compact('client')); } - private function getGuzzleHttpClient(AbstractConfigManager $configManager): GuzzleClient + private function getGuzzleHttpClient(AvroSchema $avroSchema): GuzzleClient { - $config = $configManager->get('request_options') ?: []; - $config['timeout'] = $configManager->get('timeout'); - $config['base_uri'] = $configManager->get('url'); + $config = $avroSchema->getRequestOptions(); + $config['timeout'] = self::REQUEST_TIMEOUT; + $config['base_uri'] = $avroSchema->getUrl(); $config['headers'] = array_merge( $this->getDefaultHeaders(), $config['headers'] ?? [] ); - $config['verify'] = $configManager->get('ssl_verify') ?? false; + $config['verify'] = $avroSchema->getRequestOptions()['ssl_verify'] ?? false; return app(GuzzleClient::class, compact('config')); } diff --git a/src/Avro/Serializer/Encoders/SchemaId.php b/src/Avro/Serializer/Encoders/SchemaId.php index ea428472..75d08216 100644 --- a/src/Avro/Serializer/Encoders/SchemaId.php +++ b/src/Avro/Serializer/Encoders/SchemaId.php @@ -41,4 +41,9 @@ public function encode(Schema $schema, $message): string return $io->string(); } + + public function getRegistry() + { + return $this->registry; + } } diff --git a/src/Connectors/AbstractConfig.php b/src/Connectors/AbstractConfig.php index 9942dc2e..ef26cbf5 100644 --- a/src/Connectors/AbstractConfig.php +++ b/src/Connectors/AbstractConfig.php @@ -8,19 +8,27 @@ abstract class AbstractConfig { /** - * @var mixed[] + * @var string[] */ - protected array $rules = []; + protected array $rules; - protected function getBrokerConfig(string $configName, string $brokerId): array + /** + * @psalm-suppress InvalidReturnStatement + */ + protected function getBrokerConfig(string $servicesFile): array { - if (!$brokerConfig = config("{$configName}.brokers.{$brokerId}")) { + if (!$brokerConfig = config($servicesFile . '.broker')) { throw new ConfigurationException( - "Broker '{$brokerId}' configuration not found" + "Broker configuration not found on '{$servicesFile}'" ); } - return (array) $brokerConfig; + return $brokerConfig; + } + + protected function getSchemaConfig(string $servicesFile): array + { + return config($servicesFile . '.avro_schema', []); } protected function validate(array $config): void @@ -31,9 +39,4 @@ protected function validate(array $config): void throw new ConfigurationException($validator->errors()->toJson()); } } - - protected function getSchemaConfig(string $configName, string $topicId): array - { - return config($configName . '.avro_schemas.' . $topicId, []); - } } diff --git a/src/Connectors/Consumer/Config.php b/src/Connectors/Consumer/Config.php index 306600aa..1e870981 100644 --- a/src/Connectors/Consumer/Config.php +++ b/src/Connectors/Consumer/Config.php @@ -2,10 +2,11 @@ namespace Metamorphosis\Connectors\Consumer; -use Metamorphosis\AbstractConfigManager; +use InvalidArgumentException; use Metamorphosis\Connectors\AbstractConfig; -use Metamorphosis\ConsumerConfigManager; use Metamorphosis\Exceptions\ConfigurationException; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer; +use Metamorphosis\TopicHandler\ConfigOptions\Factories\ConsumerFactory; /** * This class is responsible for handling all configuration made on the @@ -18,7 +19,7 @@ class Config extends AbstractConfig { /** - * @var array + * @var string[] */ protected array $rules = [ 'topic' => 'required', @@ -39,38 +40,45 @@ class Config extends AbstractConfig 'middlewares' => 'array', ]; - public function make(array $options, array $arguments): AbstractConfigManager + public function makeWithConfigOptions(string $handlerClass): ?Consumer + { + $handler = app($handlerClass); + $configOptions = $handler->getConfigOptions(); + if (is_null($configOptions)) { + throw new InvalidArgumentException('Handler class cannot be null'); + } + + return $configOptions; + } + + public function make(array $options, array $arguments): Consumer { $configName = $options['config_name'] ?? 'kafka'; + $service = $options['service_name'] ?? 'service'; + $topicConfig = $this->getTopicConfig($configName, $arguments['topic']); - $consumerConfig = $this->getConsumerConfig( - $topicConfig, - $arguments['consumer_group'] - ); - $brokerConfig = $this->getBrokerConfig( - $configName, - $topicConfig['broker'] - ); - $schemaConfig = $this->getSchemaConfig( - $configName, - $arguments['topic'] - ); - $override = array_merge( - $this->filterValues($options), - $this->filterValues($arguments) - ); - $config = array_merge( - $topicConfig, + $brokerConfig = $this->getBrokerConfig($service); + $schemaConfig = $this->getSchemaConfig($service); + + if (isset($topicConfig['consumer'])) { + if (isset($options['partition'])) { + $topicConfig['consumer']['partition'] = $options['partition']; + } + + if (isset($options['offset'])) { + $topicConfig['consumer']['offset'] = $options['offset']; + } + + if (isset($options['timeout'])) { + $topicConfig['consumer']['timeout'] = $options['timeout']; + } + } + + return ConsumerFactory::make( $brokerConfig, - $consumerConfig, + $topicConfig, $schemaConfig ); - - $this->validate(array_merge($config, $override)); - $configManager = app(ConsumerConfigManager::class); - $configManager->set($config, $override); - - return $configManager; } /** @@ -91,31 +99,6 @@ private function getTopicConfig(string $configName, string $topicId): array return $topicConfig; } - private function getConsumerConfig(array $topicConfig, ?string $consumerGroupId = null): array - { - if ( - !$consumerGroupId && 1 === count( - $topicConfig['consumer']['consumer_groups'] - ) - ) { - $consumerGroupId = current( - array_keys($topicConfig['consumer']['consumer_groups']) - ); - } - - $consumerGroupId = $consumerGroupId ?? 'default'; - $consumerConfig = $topicConfig['consumer']['consumer_groups'][$consumerGroupId] ?? null; - $consumerConfig['consumer_group'] = $consumerGroupId; - - if (!$consumerConfig) { - throw new ConfigurationException( - "Consumer group '{$consumerGroupId}' not found" - ); - } - - return $consumerConfig; - } - private function getMiddlewares(string $configName, array $topicConfig): array { return array_merge( @@ -123,18 +106,4 @@ private function getMiddlewares(string $configName, array $topicConfig): array $topicConfig['consumer']['middlewares'] ?? [] ); } - - /** - * Sometimes that user may pass `--partition=0` as argument. - * So if we just use array_filter here, this option will - * be removed. - * - * This code makes sure that only null values will be removed. - */ - private function filterValues(array $options = []): array - { - return array_filter($options, function ($value) { - return !is_null($value); - }); - } } diff --git a/src/Connectors/Consumer/ConnectorInterface.php b/src/Connectors/Consumer/ConnectorInterface.php index 3163fca6..f9a7934d 100644 --- a/src/Connectors/Consumer/ConnectorInterface.php +++ b/src/Connectors/Consumer/ConnectorInterface.php @@ -2,10 +2,10 @@ namespace Metamorphosis\Connectors\Consumer; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Consumers\ConsumerInterface; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConfigOptions; interface ConnectorInterface { - public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface; + public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface; } diff --git a/src/Connectors/Consumer/Factory.php b/src/Connectors/Consumer/Factory.php index bc7258e9..923fb817 100644 --- a/src/Connectors/Consumer/Factory.php +++ b/src/Connectors/Consumer/Factory.php @@ -2,29 +2,45 @@ namespace Metamorphosis\Connectors\Consumer; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Consumers\ConsumerInterface; +use Metamorphosis\Middlewares\Handler\Consumer as ConsumerMiddleware; use Metamorphosis\Middlewares\Handler\Dispatcher; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions; /** * This factory will determine what kind of connector will be used. * Basically, if the user pass --partition and --offset as argument * means that we will use the low level approach. */ + class Factory { - public static function make(AbstractConfigManager $configManager): Manager + public static function make(ConsumerConfigOptions $configOptions): Manager { - $autoCommit = $configManager->get('auto_commit', true); - $commitAsync = $configManager->get('commit_async', true); + $autoCommit = $configOptions->isAutoCommit(); + $commitAsync = $configOptions->isCommitASync(); + + $consumer = self::getConsumer($autoCommit, $configOptions); - $consumer = self::getConsumer($autoCommit, $configManager); - $handler = app($configManager->get('handler')); + $handler = app($configOptions->getHandler()); - $dispatcher = self::getMiddlewareDispatcher( - $configManager->middlewares() + $middlewares = $configOptions->getMiddlewares(); + foreach ($middlewares as &$middleware) { + $middleware = is_string($middleware) + ? app( + $middleware, + ['consumerConfigOptions' => $configOptions] + ) + : $middleware; + } + + $middlewares[] = app( + ConsumerMiddleware::class, + ['consumerTopicHandler' => $handler] ); + $dispatcher = self::getMiddlewareDispatcher($middlewares); + return new Manager( $consumer, $handler, @@ -34,21 +50,21 @@ public static function make(AbstractConfigManager $configManager): Manager ); } - public static function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface + public static function getConsumer(bool $autoCommit, ConsumerConfigOptions $configOptions): ConsumerInterface { - if (self::requiresPartition($configManager)) { + if (self::requiresPartition($configOptions)) { return app(LowLevel::class)->getConsumer( $autoCommit, - $configManager + $configOptions ); } - return app(HighLevel::class)->getConsumer($autoCommit, $configManager); + return app(HighLevel::class)->getConsumer($autoCommit, $configOptions); } - protected static function requiresPartition(AbstractConfigManager $configManager): bool + protected static function requiresPartition(ConsumerConfigOptions $configOptions): bool { - $partition = $configManager->get('partition'); + $partition = $configOptions->getPartition(); return !is_null($partition) && $partition >= 0; } diff --git a/src/Connectors/Consumer/HighLevel.php b/src/Connectors/Consumer/HighLevel.php index 7a61a171..ede233a4 100644 --- a/src/Connectors/Consumer/HighLevel.php +++ b/src/Connectors/Consumer/HighLevel.php @@ -2,38 +2,39 @@ namespace Metamorphosis\Connectors\Consumer; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Authentication\Factory; use Metamorphosis\Consumers\ConsumerInterface; use Metamorphosis\Consumers\HighLevel as HighLevelConsumer; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConfigOptions; use RdKafka\Conf; use RdKafka\KafkaConsumer; class HighLevel implements ConnectorInterface { - public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface + public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface { - $conf = $this->getConf($configManager); + $conf = $this->getConf($configOptions); - $conf->set('group.id', $configManager->get('consumer_group')); - $conf->set('auto.offset.reset', $configManager->get('offset_reset')); + $conf->set('group.id', $configOptions->getConsumerGroup()); + $conf->set('auto.offset.reset', $configOptions->getOffsetReset()); if (!$autoCommit) { $conf->set('enable.auto.commit', 'false'); } $consumer = app(KafkaConsumer::class, ['conf' => $conf]); - $consumer->subscribe([$configManager->get('topic_id')]); - $timeout = $configManager->get('timeout'); + $consumer->subscribe([$configOptions->getTopicId()]); + $timeout = $configOptions->getTimeout(); return app(HighLevelConsumer::class, compact('consumer', 'timeout')); } - protected function getConf(AbstractConfigManager $configManager): Conf + protected function getConf(ConfigOptions $configOptions): Conf { $conf = resolve(Conf::class); - Factory::authenticate($conf, $configManager); + $broker = $configOptions->getBroker(); + Factory::authenticate($conf, $broker->getAuth()); - $conf->set('metadata.broker.list', $configManager->get('connections')); + $conf->set('metadata.broker.list', $broker->getConnections()); return $conf; } diff --git a/src/Connectors/Consumer/LowLevel.php b/src/Connectors/Consumer/LowLevel.php index 6c4d3993..33d0c1de 100644 --- a/src/Connectors/Consumer/LowLevel.php +++ b/src/Connectors/Consumer/LowLevel.php @@ -2,44 +2,45 @@ namespace Metamorphosis\Connectors\Consumer; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Authentication\Factory; use Metamorphosis\Consumers\ConsumerInterface; use Metamorphosis\Consumers\LowLevel as LowLevelConsumer; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConfigOptions; use RdKafka\Conf; use RdKafka\Consumer; use RdKafka\TopicConf; class LowLevel implements ConnectorInterface { - public function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface + public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface { $conf = $this->getConf(); - $conf->set('group.id', $configManager->get('consumer_group')); + $conf->set('group.id', $configOptions->getConsumerGroup()); if (!$autoCommit) { $conf->set('enable.auto.commit', 'false'); } - Factory::authenticate($conf, $configManager); + $broker = $configOptions->getBroker(); + Factory::authenticate($conf, $broker->getAuth()); $consumer = new Consumer($conf); - $consumer->addBrokers($configManager->get('connections')); + $consumer->addBrokers($broker->getConnections()); - $topicConf = $this->getTopicConfigs($configManager); + $topicConf = $this->getTopicConfigs($configOptions); $topicConsumer = $consumer->newTopic( - $configManager->get('topic_id'), + $configOptions->getTopicId(), $topicConf ); $topicConsumer->consumeStart( - $configManager->get('partition'), - $configManager->get('offset') + $configOptions->getPartition(), + $configOptions->getOffset() ); - return new LowLevelConsumer($topicConsumer, $configManager); + return new LowLevelConsumer($topicConsumer, $configOptions); } - protected function getTopicConfigs(AbstractConfigManager $configManager) + protected function getTopicConfigs(ConfigOptions $configOptions) { $topicConfig = new TopicConf(); @@ -48,7 +49,7 @@ protected function getTopicConfigs(AbstractConfigManager $configManager) // 'smallest': start from the beginning $topicConfig->set( 'auto.offset.reset', - $configManager->get('offset_reset') + $configOptions->getOffsetReset() ); return $topicConfig; diff --git a/src/Connectors/Producer/Config.php b/src/Connectors/Producer/Config.php index 90a9cd1d..e3cf8e80 100644 --- a/src/Connectors/Producer/Config.php +++ b/src/Connectors/Producer/Config.php @@ -50,15 +50,9 @@ public function make(ProducerConfigOptions $configOptions): AbstractConfigManage public function makeByTopic(string $topicId): AbstractConfigManager { $topicConfig = $this->getTopicConfig($topicId); - $topicConfig['middlewares'] = array_merge( - config('kafka.middlewares.producer', []), - $topicConfig['producer']['middlewares'] ?? [] - ); - $brokerConfig = $this->getBrokerConfig( - 'kafka', - $topicConfig['broker'] - ); - $schemaConfig = $this->getSchemaConfig('kafka', $topicId); + $topicConfig['middlewares'] = $topicConfig['producer']['middlewares'] ?? []; + $brokerConfig = $this->getBrokerConfig('service'); + $schemaConfig = $this->getSchemaConfig('service'); $config = array_merge($topicConfig, $brokerConfig, $schemaConfig); $this->validate($config); diff --git a/src/Connectors/Producer/Connector.php b/src/Connectors/Producer/Connector.php index ebe696ea..3d427afc 100644 --- a/src/Connectors/Producer/Connector.php +++ b/src/Connectors/Producer/Connector.php @@ -2,8 +2,8 @@ namespace Metamorphosis\Connectors\Producer; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Authentication\Factory; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use Metamorphosis\TopicHandler\Producer\HandleableResponseInterface; use Metamorphosis\TopicHandler\Producer\HandlerInterface; use RdKafka\Conf; @@ -12,8 +12,10 @@ class Connector { - public function getProducerTopic(HandlerInterface $handler, AbstractConfigManager $configManager): KafkaProducer - { + public function getProducerTopic( + HandlerInterface $handler, + ProducerConfigOptions $producerConfigOptions + ): KafkaProducer { $conf = resolve(Conf::class); if ($this->canHandleResponse($handler)) { @@ -29,9 +31,10 @@ function ($kafka, Message $message) use ($handler) { ); } - $conf->set('metadata.broker.list', $configManager->get('connections')); + $broker = $producerConfigOptions->getBroker(); + $conf->set('metadata.broker.list', $broker->getConnections()); - Factory::authenticate($conf, $configManager); + Factory::authenticate($conf, $broker->getAuth()); return app(KafkaProducer::class, compact('conf')); } diff --git a/src/Console/ConfigOptionsCommand.php b/src/Console/ConfigOptionsCommand.php new file mode 100644 index 00000000..52d62dde --- /dev/null +++ b/src/Console/ConfigOptionsCommand.php @@ -0,0 +1,55 @@ +argument('handler')); + + $configOptions = $consumerHandler->getConfigOptions(); + + $this->writeStartingConsumer($configOptions); + + $manager = Factory::make($configOptions); + + $runner = app(Runner::class, compact('manager')); + $runner->run($this->option('times')); + } + + private function writeStartingConsumer(ConfigOptions $configOptions) + { + $text = 'Starting consumer for topic: ' . $configOptions->getTopicId() . PHP_EOL; + $text .= ' on consumer group: ' . $configOptions->getConsumerGroup() . PHP_EOL; + $text .= 'Connecting in ' . $configOptions->getBroker()->getConnections() . PHP_EOL; + $text .= 'Running consumer..'; + + $this->output->writeln($text); + } +} diff --git a/src/Console/ConsumerCommand.php b/src/Console/ConsumerCommand.php index c4bdab76..ded31d7f 100644 --- a/src/Console/ConsumerCommand.php +++ b/src/Console/ConsumerCommand.php @@ -3,10 +3,10 @@ namespace Metamorphosis\Console; use Illuminate\Console\Command as BaseCommand; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Connectors\Consumer\Config; use Metamorphosis\Connectors\Consumer\Factory; use Metamorphosis\Consumers\Runner; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer; class ConsumerCommand extends BaseCommand { @@ -34,31 +34,26 @@ class ConsumerCommand extends BaseCommand {--broker= : Override broker connection from config.} {--timeout= : Sets timeout for consumer.} {--times= : Amount of messages to be consumed.} - {--config_name= : Change default name for laravel config file.}'; + {--config_name= : Change default name for laravel config file.} + {--service_name= : Change default name for services config file.}'; public function handle(Config $config) { - $configManager = $config->make($this->option(), $this->argument()); + $consumer = $config->make($this->option(), $this->argument()); - $this->writeStartingConsumer($configManager); + $this->writeStartingConsumer($consumer); - $manager = Factory::make($configManager); + $manager = Factory::make($consumer); $runner = app(Runner::class, compact('manager')); - $runner->run($configManager->get('times')); + $runner->run($this->option('times')); } - private function writeStartingConsumer(AbstractConfigManager $configManager) + private function writeStartingConsumer(Consumer $consumer) { - $text = 'Starting consumer for topic: ' . $configManager->get( - 'topic' - ) . PHP_EOL; - $text .= ' on consumer group: ' . $configManager->get( - 'consumer_group' - ) . PHP_EOL; - $text .= 'Connecting in ' . $configManager->get( - 'connections' - ) . PHP_EOL; + $text = 'Starting consumer for topic: ' . $consumer->getTopicId() . PHP_EOL; + $text .= ' on consumer group: ' . $consumer->getConsumerGroup() . PHP_EOL; + $text .= 'Connecting in ' . $consumer->getBroker()->getConnections() . PHP_EOL; $text .= 'Running consumer..'; $this->output->writeln($text); diff --git a/src/Consumer.php b/src/Consumer.php index ba04c783..367556fa 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -15,12 +15,10 @@ class Consumer private Dispatcher $dispatcher; - public function __construct(ConsumerConfigManager $configManager, ConsumerConfigOptions $configOptions) + public function __construct(ConsumerConfigOptions $configOptions) { - $configManager->set($configOptions->toArray()); - - $this->consumer = Factory::getConsumer(true, $configManager); - $this->dispatcher = new Dispatcher($configManager->middlewares()); + $this->consumer = Factory::getConsumer(true, $configOptions); + $this->dispatcher = new Dispatcher($configOptions->getMiddlewares()); } public function consume(): ?RecordInterface diff --git a/src/ConsumerConfigManager.php b/src/ConsumerConfigManager.php deleted file mode 100644 index 36d31c40..00000000 --- a/src/ConsumerConfigManager.php +++ /dev/null @@ -1,59 +0,0 @@ -setConfig($config, $consumerHandler); - $this->setCommandConfig($commandConfig); - - $middlewares = $this->get('middlewares', []); - $this->middlewares = []; - $this->remove('middlewares'); - - foreach ($middlewares as $middleware) { - $this->middlewares[] = is_string($middleware) - ? app( - $middleware, - ['configManager' => $this] - ) - : $middleware; - } - - if (!$consumerHandler) { - return; - } - - $this->middlewares[] = new ConsumerMiddleware($consumerHandler); - } - - private function setCommandConfig(?array $commandConfig): void - { - if (!$commandConfig) { - return; - } - - $this->setting = array_merge($this->setting, $commandConfig); - } - - private function setConfig(array $config, ?AbstractHandler $handler): void - { - if (!$handler || !$overrideConfig = $handler->getConfigOptions()) { - $this->setting = $config; - - return; - } - - $this->setting = $overrideConfig->toArray(); - } -} diff --git a/src/Consumers/LowLevel.php b/src/Consumers/LowLevel.php index 0a1d762b..e9fee56c 100644 --- a/src/Consumers/LowLevel.php +++ b/src/Consumers/LowLevel.php @@ -2,7 +2,7 @@ namespace Metamorphosis\Consumers; -use Metamorphosis\AbstractConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions; use RdKafka\ConsumerTopic; use RdKafka\Message; @@ -14,12 +14,12 @@ class LowLevel implements ConsumerInterface private ?int $timeout; - public function __construct(ConsumerTopic $consumer, AbstractConfigManager $configManager) + public function __construct(ConsumerTopic $consumer, ConsumerConfigOptions $consumerConfigOptions) { $this->consumer = $consumer; - $this->partition = $configManager->get('partition'); - $this->timeout = $configManager->get('timeout'); + $this->partition = $consumerConfigOptions->getPartition(); + $this->timeout = $consumerConfigOptions->getTimeout(); } public function consume(): ?Message diff --git a/src/Facades/Metamorphosis.php b/src/Facades/Metamorphosis.php index 70f903c8..b3c934ca 100644 --- a/src/Facades/Metamorphosis.php +++ b/src/Facades/Metamorphosis.php @@ -4,9 +4,6 @@ use Illuminate\Support\Facades\Facade; -/** - * @method static void produce(HandlerInterface $producerHandler) - */ class Metamorphosis extends Facade { protected static function getFacadeAccessor() diff --git a/src/MetamorphosisServiceProvider.php b/src/MetamorphosisServiceProvider.php index 687c739a..6a64d8c4 100644 --- a/src/MetamorphosisServiceProvider.php +++ b/src/MetamorphosisServiceProvider.php @@ -3,6 +3,7 @@ namespace Metamorphosis; use Illuminate\Support\ServiceProvider; +use Metamorphosis\Console\ConfigOptionsCommand; use Metamorphosis\Console\ConsumerCommand; use Metamorphosis\Console\ConsumerMakeCommand; use Metamorphosis\Console\MiddlewareMakeCommand; @@ -14,9 +15,11 @@ public function boot() { $this->publishes([ __DIR__ . '/../config/kafka.php' => config_path('kafka.php'), + __DIR__ . '/../config/service.php' => config_path('service.php'), ], 'config'); $this->mergeConfigFrom(__DIR__ . '/../config/kafka.php', 'kafka'); + $this->mergeConfigFrom(__DIR__ . '/../config/service.php', 'service'); } public function register() @@ -26,6 +29,7 @@ public function register() ConsumerMakeCommand::class, MiddlewareMakeCommand::class, ProducerMakeCommand::class, + ConfigOptionsCommand::class, ]); $this->app->bind('metamorphosis', function ($app) { diff --git a/src/Middlewares/AvroSchemaDecoder.php b/src/Middlewares/AvroSchemaDecoder.php index 4ae626c9..07f2193d 100644 --- a/src/Middlewares/AvroSchemaDecoder.php +++ b/src/Middlewares/AvroSchemaDecoder.php @@ -3,28 +3,27 @@ namespace Metamorphosis\Middlewares; use Closure; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Avro\ClientFactory; use Metamorphosis\Avro\Serializer\MessageDecoder; use Metamorphosis\Exceptions\ConfigurationException; use Metamorphosis\Record\RecordInterface; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions; class AvroSchemaDecoder implements MiddlewareInterface { private MessageDecoder $decoder; - private AbstractConfigManager $configManager; - - public function __construct(AbstractConfigManager $configManager, ClientFactory $factory) + public function __construct(ClientFactory $factory, ConsumerConfigOptions $consumerConfigOptions) { - $this->configManager = $configManager; - if (!$this->configManager->get('url')) { + if (!$consumerConfigOptions->getAvroSchema()->getUrl()) { throw new ConfigurationException( "Avro schema url not found, it's required to use AvroSchemaDecoder Middleware" ); } - $this->decoder = new MessageDecoder($factory->make($configManager)); + $this->decoder = new MessageDecoder( + $factory->make($consumerConfigOptions->getAvroSchema()) + ); } public function process(RecordInterface $record, Closure $next) diff --git a/src/Middlewares/AvroSchemaMixedEncoder.php b/src/Middlewares/AvroSchemaMixedEncoder.php index 4df0beb7..67c09076 100644 --- a/src/Middlewares/AvroSchemaMixedEncoder.php +++ b/src/Middlewares/AvroSchemaMixedEncoder.php @@ -3,12 +3,12 @@ namespace Metamorphosis\Middlewares; use Closure; -use Metamorphosis\AbstractConfigManager; use Metamorphosis\Avro\CachedSchemaRegistryClient; use Metamorphosis\Avro\ClientFactory; use Metamorphosis\Avro\Serializer\Encoders\SchemaId; use Metamorphosis\Exceptions\ConfigurationException; use Metamorphosis\Record\RecordInterface; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; /** * Fetches a schema for a topic by subject and version (currently only 'latest') @@ -21,30 +21,34 @@ class AvroSchemaMixedEncoder implements MiddlewareInterface private CachedSchemaRegistryClient $schemaRegistry; - private AbstractConfigManager $configManager; + private ProducerConfigOptions $producerConfigOptions; - public function __construct(SchemaId $schemaIdEncoder, ClientFactory $factory, AbstractConfigManager $configManager) - { - if (!$configManager->get('url')) { + public function __construct( + SchemaId $schemaIdEncoder, + ClientFactory $factory, + ProducerConfigOptions $producerConfigOptions + ) { + if (!$producerConfigOptions->getAvroSchema()->getUrl()) { throw new ConfigurationException( "Avro schema url not found, it's required to use AvroSchemaEncoder Middleware" ); } - $schemaRegistry = $factory->make($configManager); + $schemaRegistry = $factory->make( + $producerConfigOptions->getAvroSchema() + ); $this->schemaIdEncoder = $schemaIdEncoder; $this->schemaRegistry = $schemaRegistry; - $this->configManager = $configManager; + $this->producerConfigOptions = $producerConfigOptions; } public function process(RecordInterface $record, Closure $next) { - $topic = $this->configManager->get('topic_id'); + $topic = $this->producerConfigOptions->getTopicId(); $schema = $this->schemaRegistry->getBySubjectAndVersion( "{$topic}-value", 'latest' ); - $arrayPayload = json_decode($record->getPayload(), true); $encodedPayload = $this->schemaIdEncoder->encode( $schema, diff --git a/src/Producer.php b/src/Producer.php index 61c155a3..b3211f7c 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -7,6 +7,7 @@ use Metamorphosis\Middlewares\Handler\Dispatcher; use Metamorphosis\Middlewares\Handler\Producer as ProducerMiddleware; use Metamorphosis\Producer\Poll; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use Metamorphosis\TopicHandler\Producer\AbstractProducer; use Metamorphosis\TopicHandler\Producer\HandlerInterface; @@ -31,12 +32,21 @@ public function produce(HandlerInterface $producerHandler): void public function build(HandlerInterface $producerHandler): Dispatcher { - $configManager = $this->getConfigManager($producerHandler); + $producerConfigOptions = $producerHandler->getConfigOptions(); + + $middlewares = $producerConfigOptions->getMiddlewares(); + foreach ($middlewares as &$middleware) { + $middleware = is_string($middleware) + ? app( + $middleware, + ['producerConfigOptions' => $producerConfigOptions] + ) + : $middleware; + } - $middlewares = $configManager->middlewares(); $middlewares[] = $this->getProducerMiddleware( $producerHandler, - $configManager + $producerConfigOptions ); return new Dispatcher($middlewares); @@ -44,19 +54,19 @@ public function build(HandlerInterface $producerHandler): Dispatcher public function getProducerMiddleware( HandlerInterface $producerHandler, - AbstractConfigManager $configManager + ProducerConfigOptions $producerConfigOptions ): ProducerMiddleware { $producer = $this->connector->getProducerTopic( $producerHandler, - $configManager + $producerConfigOptions ); - $topic = $producer->newTopic($configManager->get('topic_id')); + $topic = $producer->newTopic($producerConfigOptions->getTopicId()); $poll = app( Poll::class, - ['producer' => $producer, 'configManager' => $configManager] + ['producer' => $producer, 'producerConfigOptions' => $producerConfigOptions] ); - $partition = $configManager->get('partition'); + $partition = $producerConfigOptions->getPartition(); return app( ProducerMiddleware::class, diff --git a/src/Producer/Poll.php b/src/Producer/Poll.php index afb8d081..c07508ec 100644 --- a/src/Producer/Poll.php +++ b/src/Producer/Poll.php @@ -2,7 +2,7 @@ namespace Metamorphosis\Producer; -use Metamorphosis\AbstractConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use RdKafka\Producer; use RuntimeException; @@ -24,15 +24,13 @@ class Poll private Producer $producer; - public function __construct(Producer $producer, AbstractConfigManager $configManager) + public function __construct(Producer $producer, ProducerConfigOptions $producerConfigOptions) { - $this->isAsync = $configManager->get('is_async'); - $this->maxPollRecords = $configManager->get('max_poll_records'); - $this->requiredAcknowledgment = $configManager->get( - 'required_acknowledgment' - ); - $this->maxFlushAttempts = $configManager->get('flush_attempts'); - $this->timeout = $configManager->get('timeout'); + $this->isAsync = $producerConfigOptions->isAsync(); + $this->maxPollRecords = $producerConfigOptions->getMaxPollRecords(); + $this->requiredAcknowledgment = $producerConfigOptions->isRequiredAcknowledgment(); + $this->maxFlushAttempts = $producerConfigOptions->getFlushAttempts(); + $this->timeout = $producerConfigOptions->getTimeout(); $this->producer = $producer; } diff --git a/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php b/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php index 103b882e..2d28a28c 100644 --- a/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php +++ b/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php @@ -22,20 +22,17 @@ public static function make( private static function getConsumerGroupConfig(array $topicData): array { $topicData['topicId'] = $topicData['topic_id']; + $consumer = $topicData['consumer']; + $topicData['consumerGroup'] = $consumer['consumer_group']; - $consumer = current($topicData['consumer']); - $topicData['consumerGroup'] = key($consumer); - - return array_merge( + return array_merge_recursive( $topicData, self::convertConfigAttributes($consumer) ); } - private static function convertConfigAttributes(array $topic): array + private static function convertConfigAttributes(array $consumerConfig): array { - $consumerConfig = current($topic); - if (isset($consumerConfig['auto_commit'])) { $consumerConfig['autoCommit'] = $consumerConfig['auto_commit']; } diff --git a/src/TopicHandler/Producer/AbstractProducer.php b/src/TopicHandler/Producer/AbstractProducer.php index e95a6f63..ecab7c93 100644 --- a/src/TopicHandler/Producer/AbstractProducer.php +++ b/src/TopicHandler/Producer/AbstractProducer.php @@ -4,7 +4,7 @@ use Metamorphosis\Exceptions\JsonException; use Metamorphosis\Record\ProducerRecord; -use Metamorphosis\TopicHandler\ConfigOptions\Producer as ConfigOptionsProducer; +use Metamorphosis\TopicHandler\ConfigOptions\Producer; class AbstractProducer implements HandlerInterface { @@ -15,18 +15,21 @@ class AbstractProducer implements HandlerInterface protected ?string $key; - private ConfigOptionsProducer $configOptions; + /** + * @var Producer + */ + protected $producer; - public function __construct($record, ConfigOptionsProducer $configOptions, ?string $key = null) + public function __construct($record, Producer $configOptions, ?string $key = null) { $this->record = $record; $this->key = $key; - $this->configOptions = $configOptions; + $this->producer = $configOptions; } - public function getConfigOptions(): ConfigOptionsProducer + public function getConfigOptions(): Producer { - return $this->configOptions; + return $this->producer; } public function getRecord() diff --git a/tests/Integration/ConsumerTest.php b/tests/Integration/ConsumerTest.php index 8c107499..2fb7b1bf 100644 --- a/tests/Integration/ConsumerTest.php +++ b/tests/Integration/ConsumerTest.php @@ -63,12 +63,12 @@ public function testItShouldSetup(): void Consumer::class, ['configOptions' => $consumerConfigOptions] ); - $expected = ['id' => 'MESSAGE_ID']; + $expected = '{"id":"MESSAGE_ID"}'; // Actions - $result = $consumer->consume(); + $result = $consumer->consume()->getPayload(); // Assertions - $this->assertSame($expected, $result->getPayload()); + $this->assertSame($expected, $result); } } diff --git a/tests/Integration/Dummies/MessageProducer.php b/tests/Integration/Dummies/MessageProducer.php index 22aac6f8..38fb8090 100644 --- a/tests/Integration/Dummies/MessageProducer.php +++ b/tests/Integration/Dummies/MessageProducer.php @@ -3,22 +3,12 @@ namespace Tests\Integration\Dummies; use Illuminate\Support\Facades\Log; -use Metamorphosis\TopicHandler\Producer\AbstractHandler; +use Metamorphosis\TopicHandler\Producer\AbstractProducer; use RdKafka\Message; use RuntimeException; -class MessageProducer extends AbstractHandler +class MessageProducer extends AbstractProducer { - public string $topic = 'default'; - - public function __construct($record, string $topic, ?string $key = null, ?int $partition = null) - { - $this->record = $record; - $this->topic = $topic; - $this->key = $key ?? 'recordId123'; - $this->partition = $partition; - } - public function success(Message $message): void { Log::info('Record successfully sent to broker.', [ diff --git a/tests/Integration/ProducerTest.php b/tests/Integration/ProducerTest.php index f7d13966..39cedbfd 100644 --- a/tests/Integration/ProducerTest.php +++ b/tests/Integration/ProducerTest.php @@ -5,6 +5,9 @@ use Illuminate\Support\Facades\Log; use Illuminate\Support\Str; use Metamorphosis\Facades\Metamorphosis; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use Tests\Integration\Dummies\MessageConsumer; use Tests\Integration\Dummies\MessageProducer; use Tests\LaravelTestCase; @@ -17,11 +20,16 @@ class ProducerTest extends LaravelTestCase protected string $secondLowLevelMessage; + protected string $topicId; + + /** + * @group runProducer + */ public function testShouldRunAProducerAndReceiveMessagesWithAHighLevelConsumer(): void { // Given That I $this->haveAConsumerHandlerConfigured(); - $this->haveNoPartitionConfigured(); + $this->haveAConsumerPartitionConfigured(); $this->haveSomeRandomMessagesProduced(); // I Expect That @@ -50,22 +58,30 @@ protected function setUp(): void parent::setUp(); $this->withoutAuthentication(); + $this->topicId = 'kafka-test-' . Str::random(5); } protected function withoutAuthentication(): void { - config(['kafka.brokers.default.auth' => []]); + config(['service.broker.auth' => []]); } protected function haveAConsumerHandlerConfigured(): void { config( - ['kafka.topics.default.consumer.consumer_groups.test-consumer-group.handler' => MessageConsumer::class] + ['kafka.topics.default.consumer.handler' => MessageConsumer::class] ); } + protected function haveAConsumerPartitionConfigured(): void + { + config(['kafka.topics.default.consumer.partition' => -1]); + } + protected function runTheConsumer(): void { + config(['kafka.topics.default.topic_id' => $this->topicId]); + $this->artisan( 'kafka:consume', [ @@ -86,15 +102,12 @@ protected function haveALowLevelConsumerConfigured(): void 'topic_id' => 'low_level', 'broker' => 'default', 'consumer' => [ - 'consumer_groups' => [ - 'test-consumer-group' => [ - 'offset_reset' => 'earliest', - 'offset' => 0, - 'handler' => MessageConsumer::class, - 'timeout' => 20000, - 'middlewares' => [], - ], - ], + 'consumer_group' => 'test-consumer-group', + 'offset_reset' => 'earliest', + 'offset' => 0, + 'handler' => MessageConsumer::class, + 'timeout' => 20000, + 'middlewares' => [], ], 'producer' => [ 'required_acknowledgment' => true, @@ -128,13 +141,15 @@ protected function runTheLowLevelConsumerSkippingTheFirstTwoMessagesAndLimitingT private function haveSomeRandomMessagesProduced(): void { $this->highLevelMessage = Str::random(10); - $producer = app( - MessageProducer::class, - [ - 'record' => $this->highLevelMessage, - 'topic' => 'default', - ] + + $producerConfigOptions = $this->createProducerConfigOptions( + $this->topicId ); + $producer = app(MessageProducer::class, [ + 'record' => $this->highLevelMessage, + 'configOptions' => $producerConfigOptions, + 'key' => 'recordId123', + ]); Metamorphosis::produce($producer); Metamorphosis::produce($producer); @@ -142,8 +157,14 @@ private function haveSomeRandomMessagesProduced(): void private function produceRecordMessage(string $record): string { - $topic = 'low_level'; - $producer = app(MessageProducer::class, compact('record', 'topic')); + $producerConfigOptions = $this->createProducerConfigOptions( + 'low_level' + ); + $producer = app(MessageProducer::class, [ + 'record' => $record, + 'configOptions' => $producerConfigOptions, + 'key' => 'recordId123', + ]); Metamorphosis::produce($producer); Metamorphosis::produce($producer); @@ -183,10 +204,20 @@ private function haveFourProducedMessages(): void $this->produceRecordMessage($this->secondLowLevelMessage); } - private function haveNoPartitionConfigured(): void + private function createProducerConfigOptions(string $topicId): ProducerConfigOptions { - config( - ['kafka.topics.default.consumer.consumer_groups.test-consumer-group.partition' => -1] + $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); + $broker = new Broker($connections, new None()); + + return new ProducerConfigOptions( + $topicId, + $broker, + null, + null, + [], + 2000, + false, + true ); } } diff --git a/tests/Integration/ProducerWithAvroTest.php b/tests/Integration/ProducerWithAvroTest.php index 449370da..09237bfc 100644 --- a/tests/Integration/ProducerWithAvroTest.php +++ b/tests/Integration/ProducerWithAvroTest.php @@ -6,15 +6,31 @@ use GuzzleHttp\Handler\MockHandler; use GuzzleHttp\HandlerStack; use GuzzleHttp\Psr7\Response; +use Illuminate\Support\Facades\Log; use Metamorphosis\Facades\Metamorphosis; use Metamorphosis\Middlewares\AvroSchemaDecoder; use Metamorphosis\Middlewares\AvroSchemaMixedEncoder; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use Tests\Integration\Dummies\MessageConsumer; use Tests\Integration\Dummies\MessageProducer; use Tests\LaravelTestCase; class ProducerWithAvroTest extends LaravelTestCase { + /** + * @var string[] + */ + protected array $records; + + public function setUp(): void + { + parent::setUp(); + + $this->records = ['saleOrderId' => 'SALE_ORDER_ID', 'productId' => 'PRODUCT_ID']; + } + public function testShouldRunAProducerMessagesWithAAvroSchema(): void { // Given That I @@ -22,6 +38,9 @@ public function testShouldRunAProducerMessagesWithAAvroSchema(): void // When I $this->haveSomeRandomMessagesProduced(); + + // I expect that + $this->myMessagesHaveBeenLogged(); $this->expectNotToPerformAssertions(); } @@ -93,15 +112,22 @@ protected function haveAHandlerConfigured(): void private function haveSomeRandomMessagesProduced(): void { - $saleOrderProducer = app( - MessageProducer::class, - ['record' => ['saleOrderId' => 'SALE_ORDER_ID'], 'topic' => 'sale_order'] + $producerConfigOptionsSale = $this->createProducerConfigOptions( + 'sale_order' ); - $productProducer = app( - MessageProducer::class, - ['record' => ['productId' => 'PRODUCT_ID'], 'topic' => 'product'] + $producerConfigOptionsProduct = $this->createProducerConfigOptions( + 'product' ); + $saleOrderProducer = app(MessageProducer::class, [ + 'record' => ['saleOrderId' => 'SALE_ORDER_ID'], + 'configOptions' => $producerConfigOptionsSale, + ]); + $productProducer = app(MessageProducer::class, [ + 'record' => ['productId' => 'PRODUCT_ID'], + 'configOptions' => $producerConfigOptionsProduct, + ]); + $saleOrderSchemaResponse = '{ "subject":"sale_order-value", "version":1, @@ -129,7 +155,34 @@ private function haveSomeRandomMessagesProduced(): void $productDispatcher = Metamorphosis::build($productProducer); $productDispatcher->handle($productProducer->createRecord()); + } - $saleOrderDispatcher->handle($saleOrderProducer->createRecord()); + private function myMessagesHaveBeenLogged(): void + { + $this->setLogExpectationsFor($this->records['saleOrderId']); + $this->setLogExpectationsFor($this->records['productId']); + } + + private function setLogExpectationsFor(string $message): void + { + Log::shouldReceive('info') + ->with($message); + } + + private function createProducerConfigOptions(string $topicId): ProducerConfigOptions + { + $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); + $broker = new Broker($connections, new None()); + + return new ProducerConfigOptions( + $topicId, + $broker, + null, + null, + [], + 2000, + false, + true + ); } } diff --git a/tests/Integration/ProducerWithConfigOptionsTest.php b/tests/Integration/ProducerWithConfigOptionsTest.php index 3382b590..ec37a620 100644 --- a/tests/Integration/ProducerWithConfigOptionsTest.php +++ b/tests/Integration/ProducerWithConfigOptionsTest.php @@ -25,13 +25,13 @@ public function testShouldRunAProducerMessagesWithConfigOptions(): void // I Expect That $this->myMessagesHaveBeenProduced(); - $this->expectNotToPerformAssertions(); // When I $this->haveSomeRandomMessageProduced(); // I Expect That $this->myMessagesHaveBeenLogged(); + $this->expectNotToPerformAssertions(); // When I $this->runTheConsumer(); @@ -41,39 +41,12 @@ protected function runTheConsumer(): void { $dummy = new MessageConsumer($this->consumerConfigOptions); $this->instance('\App\Kafka\Consumers\ConsumerOverride', $dummy); - config([ - 'kafka_new_config' => [ - 'brokers' => [ - 'override' => [ - 'connections' => env( - 'KAFKA_BROKER_CONNECTIONS', - 'kafka:9092' - ), - ], - ], - 'topics' => [ - 'default' => [ - 'broker' => 'override', - 'consumer' => [ - 'consumer_groups' => [ - 'test-consumer-group' => [ - 'handler' => '\App\Kafka\Consumers\ConsumerOverride', - 'offset_reset' => 'earliest', - ], - ], - ], - ], - ], - ], - ]); + $this->artisan( - 'kafka:consume', + 'kafka:consume-config-class', [ - 'topic' => 'default', - 'consumer_group' => 'test-consumer-group', - '--timeout' => 20000, + 'handler' => '\\App\\Kafka\\Consumers\\ConsumerOverride', '--times' => 2, - '--config_name' => 'kafka_new_config', ] ); } diff --git a/tests/Unit/AbstractConfigManagerTest.php b/tests/Unit/AbstractConfigManagerTest.php deleted file mode 100644 index 7cf86812..00000000 --- a/tests/Unit/AbstractConfigManagerTest.php +++ /dev/null @@ -1,75 +0,0 @@ -instance( - AbstractHandler::class, - m::mock(AbstractHandler::class) - ); - $config = [ - 'middlewares' => [], - 'handler' => AbstractHandler::class, - 'broker' => [ - 'default' => [ - 'connections' => 'kafka:9092', - ], - ], - 'topic_id' => 'kafka-test', - ]; - $broker = new Broker('kafka:9092', new None()); - $configOptions = new ConsumerConfigOptions( - 'kafka-override', - $broker, - '\App\Kafka\Consumers\ConsumerExample', - null, - null, - 'default', - null, - [MiddlewareDummy::class], - 200, - false, - true - ); - - $expected = [ - 'topic_id' => 'kafka-override', - 'connections' => 'kafka:9092', - 'auth' => null, - 'timeout' => 200, - 'handler' => '\App\Kafka\Consumers\ConsumerExample', - 'partition' => -1, - 'offset' => null, - 'consumer_group' => 'default', - 'auto_commit' => false, - 'commit_async' => true, - 'offset_reset' => 'smallest', - ]; - - $configManager = new ConsumerConfigManager(); - - // Expectations - $handler->expects() - ->getConfigOptions() - ->andReturn($configOptions); - - // Actions - $configManager->set($config); - - // Expectations - $this->assertEquals($expected, $configManager->get()); - } -} diff --git a/tests/Unit/Authentication/FactoryTest.php b/tests/Unit/Authentication/FactoryTest.php index 53999f8a..f13b2e46 100644 --- a/tests/Unit/Authentication/FactoryTest.php +++ b/tests/Unit/Authentication/FactoryTest.php @@ -3,8 +3,11 @@ namespace Tests\Unit\Authentication; use Metamorphosis\Authentication\Factory; -use Metamorphosis\ConsumerConfigManager; use Metamorphosis\Exceptions\AuthenticationException; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\AuthInterface; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\SaslSsl; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\Ssl; +use Mockery as m; use RdKafka\Conf; use Tests\LaravelTestCase; @@ -13,15 +16,11 @@ class FactoryTest extends LaravelTestCase public function testItMakesSslAuthenticationClass(): void { // Set - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'auth' => [ - 'type' => 'ssl', - 'ca' => 'path/to/ca', - 'certificate' => 'path/to/certificate', - 'key' => 'path/to/key', - ], - ]); + $configOptionsSsl = new Ssl( + 'path/to/ca', + 'path/to/certificate', + 'path/to/key' + ); $conf = new Conf(); $expected = [ 'security.protocol' => 'ssl', @@ -31,7 +30,7 @@ public function testItMakesSslAuthenticationClass(): void ]; // Actions - Factory::authenticate($conf, $configManager); + Factory::authenticate($conf, $configOptionsSsl); // Assertions $this->assertArraySubset($expected, $conf->dump()); @@ -40,15 +39,11 @@ public function testItMakesSslAuthenticationClass(): void public function testItMakesSASLAuthenticationClass(): void { // Set - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'auth' => [ - 'type' => 'sasl_ssl', - 'mechanisms' => 'PLAIN', - 'username' => 'some-username', - 'password' => 'some-password', - ], - ]); + $configOptionsSaslSsl = new SaslSsl( + 'PLAIN', + 'some-username', + 'some-password' + ); $conf = new Conf(); $expected = [ 'security.protocol' => 'sasl_ssl', @@ -58,7 +53,7 @@ public function testItMakesSASLAuthenticationClass(): void ]; // Actions - Factory::authenticate($conf, $configManager); + Factory::authenticate($conf, $configOptionsSaslSsl); // Assertions $this->assertArraySubset($expected, $conf->dump()); @@ -67,13 +62,17 @@ public function testItMakesSASLAuthenticationClass(): void public function testItThrowsExceptionWhenInvalidProtocolIsPassed(): void { // Set - $configManager = new ConsumerConfigManager(); - $configManager->set(['auth' => ['type' => 'some-invalid-type']]); + $invalidAuth = m::mock(AuthInterface::class); $conf = new Conf(); + // Expectations + $invalidAuth->expects() + ->getType() + ->andReturn('some-invalid-type'); + $this->expectException(AuthenticationException::class); // Actions - Factory::authenticate($conf, $configManager); + Factory::authenticate($conf, $invalidAuth); } } diff --git a/tests/Unit/Authentication/SASLAuthenticationTest.php b/tests/Unit/Authentication/SASLAuthenticationTest.php index ce1078b3..45a4dc3a 100644 --- a/tests/Unit/Authentication/SASLAuthenticationTest.php +++ b/tests/Unit/Authentication/SASLAuthenticationTest.php @@ -3,7 +3,7 @@ namespace Tests\Unit\Authentication; use Metamorphosis\Authentication\SASLAuthentication; -use Metamorphosis\ConsumerConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\SaslSsl; use RdKafka\Conf; use Tests\LaravelTestCase; @@ -12,16 +12,13 @@ class SASLAuthenticationTest extends LaravelTestCase public function testItShouldValidateAuthenticationConfigurations(): void { // Set - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'auth' => [ - 'type' => 'sasl_ssl', - 'mechanisms' => 'PLAIN', - 'username' => 'some-username', - 'password' => 'some-password', - ], - ]); + $configSaslSsl = new SaslSsl( + 'PLAIN', + 'some-username', + 'some-password' + ); $conf = new Conf(); + $expected = [ 'security.protocol' => 'sasl_ssl', 'sasl.username' => 'some-username', @@ -30,7 +27,7 @@ public function testItShouldValidateAuthenticationConfigurations(): void ]; // Actions - new SASLAuthentication($conf, $configManager); + new SASLAuthentication($conf, $configSaslSsl); // Assertions $this->assertArraySubset($expected, $conf->dump()); diff --git a/tests/Unit/Authentication/SSLAuthenticationTest.php b/tests/Unit/Authentication/SSLAuthenticationTest.php index 11c93691..9ea6c6b0 100644 --- a/tests/Unit/Authentication/SSLAuthenticationTest.php +++ b/tests/Unit/Authentication/SSLAuthenticationTest.php @@ -3,7 +3,7 @@ namespace Tests\Unit\Authentication; use Metamorphosis\Authentication\SSLAuthentication; -use Metamorphosis\ConsumerConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\Ssl; use RdKafka\Conf; use Tests\LaravelTestCase; @@ -12,16 +12,12 @@ class SSLAuthenticationTest extends LaravelTestCase public function testItShouldValidateAuthenticationConfigurations(): void { // Set - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'auth' => [ - 'type' => 'ssl', - 'ca' => 'path/to/ca', - 'certificate' => 'path/to/certificate', - 'key' => 'path/to/key', - ], - ]); $conf = new Conf(); + $configSsl = new Ssl( + 'path/to/ca', + 'path/to/certificate', + 'path/to/key' + ); $expected = [ 'security.protocol' => 'ssl', 'ssl.ca.location' => 'path/to/ca', @@ -30,7 +26,7 @@ public function testItShouldValidateAuthenticationConfigurations(): void ]; // Actions - new SSLAuthentication($conf, $configManager); + new SSLAuthentication($conf, $configSsl); // Assertions $this->assertArraySubset($expected, $conf->dump()); diff --git a/tests/Unit/Connectors/Consumer/ConfigTest.php b/tests/Unit/Connectors/Consumer/ConfigTest.php index 9e1c26f9..224f5666 100644 --- a/tests/Unit/Connectors/Consumer/ConfigTest.php +++ b/tests/Unit/Connectors/Consumer/ConfigTest.php @@ -8,6 +8,7 @@ use Mockery as m; use Tests\LaravelTestCase; use Tests\Unit\Dummies\ConsumerHandlerDummy; +use TypeError; class ConfigTest extends LaravelTestCase { @@ -84,10 +85,12 @@ public function testShouldValidateConsumerConfig(): void ]); // Actions - $configManager = $config->make($options, $arguments); + $configManager = $config->makeWithConfigOptions( + ConsumerHandlerDummy::class + ); // Assertions - $this->assertArraySubset($expected, $configManager->get()); + $this->assertArraySubset($expected, $configManager->toArray()); } public function testShouldNotSetRuntimeConfigWhenOptionsIsInvalid(): void @@ -104,18 +107,20 @@ public function testShouldNotSetRuntimeConfigWhenOptionsIsInvalid(): void 'consumer_group' => 'default', ]; + // Expectations + $this->expectException(TypeError::class); + // Actions - $this->expectException(ConfigurationException::class); $configManager = $config->make($options, $arguments); // Assertions - $this->assertEmpty($configManager->get()); + $this->assertEmpty($configManager->toArray()); } public function testShouldNotSetRuntimeConfigWhenKafkaConfigIsInvalid(): void { // Set - config(['kafka.brokers.default.connections' => null]); + config(['service.broker' => null]); $config = new Config(); $options = [ 'partition' => 0, @@ -132,6 +137,6 @@ public function testShouldNotSetRuntimeConfigWhenKafkaConfigIsInvalid(): void $configManager = $config->make($options, $arguments); // Assertions - $this->assertEmpty($configManager->get()); + $this->assertEmpty($configManager->toArray()); } } diff --git a/tests/Unit/Connectors/Consumer/FactoryTest.php b/tests/Unit/Connectors/Consumer/FactoryTest.php index a90471d6..fcd1a6d3 100644 --- a/tests/Unit/Connectors/Consumer/FactoryTest.php +++ b/tests/Unit/Connectors/Consumer/FactoryTest.php @@ -14,26 +14,29 @@ class FactoryTest extends LaravelTestCase public function testItMakesManagerWithLowLevelConsumer(): void { // Set + $this->haveAConsumerWithPartitionConfigured(); + $config = new Config(); - $configManager = $config->make( + $configConsumer = $config->make( ['timeout' => 61], ['topic' => 'topic_key', 'consumer_group' => 'with-partition'] ); - $manager = Factory::make($configManager); + $manager = Factory::make($configConsumer); // Assertions $this->assertInstanceOf(LowLevel::class, $manager->getConsumer()); } - public function testItMakesManagerWithLowLevelConsumerWhenPartitionIsNotValid(): void + public function testItMakesManagerWithHighLevelConsumerWhenPartitionIsNotValid(): void { // Set + $this->haveAConsumerWithoutPartitionConfigured(); $config = new Config(); - $configManager = $config->make( - ['timeout' => 61], - ['topic' => 'topic_key', 'consumer_group' => 'with-partition', 'partition' => -1] + $configConsumer = $config->make( + ['timeout' => 61, 'partition' => -1], + ['topic' => 'topic_key', 'consumer_group' => 'with-partition'] ); - $manager = Factory::make($configManager); + $manager = Factory::make($configConsumer); // Assertions $this->assertInstanceOf(HighLevel::class, $manager->getConsumer()); @@ -42,12 +45,13 @@ public function testItMakesManagerWithLowLevelConsumerWhenPartitionIsNotValid(): public function testItMakesHighLevelClass(): void { // Set + $this->haveAConsumerWithoutPartitionConfigured(); $config = new Config(); - $configManager = $config->make( + $configConsumer = $config->make( ['timeout' => 61], ['topic' => 'topic_key', 'consumer_group' => 'without-partition'] ); - $manager = Factory::make($configManager); + $manager = Factory::make($configConsumer); // Assertions $this->assertInstanceOf(HighLevel::class, $manager->getConsumer()); @@ -88,4 +92,52 @@ protected function setUp(): void ], ]); } + + private function haveAConsumerWithPartitionConfigured() + { + config([ + 'kafka' => [ + 'topics' => [ + 'topic_key' => [ + 'topic_id' => 'topic_name', + 'consumer' => [ + 'consumer_group' => 'with-partition', + 'offset_reset' => 'earliest', + 'offset' => 0, + 'partition' => 0, + 'handler' => ConsumerHandlerDummy::class, + ], + ], + ], + ], + 'service' => [ + 'broker' => [ + 'connections' => 'kafka:123', + ], + ], + ]); + } + + private function haveAConsumerWithoutPartitionConfigured() + { + config([ + 'kafka' => [ + 'topics' => [ + 'topic_key' => [ + 'topic_id' => 'topic_name', + 'consumer' => [ + 'consumer_group' => 'without-partition', + 'offset_reset' => 'earliest', + 'handler' => ConsumerHandlerDummy::class, + ], + ], + ], + ], + 'service' => [ + 'broker' => [ + 'connections' => 'kafka:123', + ], + ], + ]); + } } diff --git a/tests/Unit/Connectors/Consumer/HighLevelTest.php b/tests/Unit/Connectors/Consumer/HighLevelTest.php index 52d58fc4..97ab8e91 100644 --- a/tests/Unit/Connectors/Consumer/HighLevelTest.php +++ b/tests/Unit/Connectors/Consumer/HighLevelTest.php @@ -3,8 +3,11 @@ namespace Tests\Unit\Connectors\Consumer; use Metamorphosis\Connectors\Consumer\HighLevel; -use Metamorphosis\ConsumerConfigManager; use Metamorphosis\Consumers\HighLevel as HighLevelConsumer; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\AvroSchema as AvroSchemaConfigOptions; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions; use Tests\LaravelTestCase; class HighLevelTest extends LaravelTestCase @@ -12,19 +15,20 @@ class HighLevelTest extends LaravelTestCase public function testItShouldMakeConnectorSetup(): void { // Set - $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'connections' => $connections, - 'consumer_group' => 'some-group', - 'topic_id' => 'some_topic', - 'offset_reset' => 'earliest', - 'timeout' => 1000, - ]); $connector = new HighLevel(); + $brokerOptions = new Broker('kafka:9092', new None()); + $consumerConfigOptions = new ConsumerConfigOptions( + 'kafka-test', + $brokerOptions, + null, + 1, + 0, + 'some-group', + new AvroSchemaConfigOptions('http://url.teste') + ); // Actions - $result = $connector->getConsumer(false, $configManager); + $result = $connector->getConsumer(false, $consumerConfigOptions); // Assertions $this->assertInstanceOf(HighLevelConsumer::class, $result); diff --git a/tests/Unit/Connectors/Consumer/LowLevelTest.php b/tests/Unit/Connectors/Consumer/LowLevelTest.php index 039d096c..21c12291 100644 --- a/tests/Unit/Connectors/Consumer/LowLevelTest.php +++ b/tests/Unit/Connectors/Consumer/LowLevelTest.php @@ -3,8 +3,11 @@ namespace Tests\Unit\Connectors\Consumer; use Metamorphosis\Connectors\Consumer\LowLevel; -use Metamorphosis\ConsumerConfigManager; use Metamorphosis\Consumers\LowLevel as LowLevelConsumer; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\AvroSchema as AvroSchemaConfigOptions; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions; use Tests\LaravelTestCase; class LowLevelTest extends LaravelTestCase @@ -12,20 +15,20 @@ class LowLevelTest extends LaravelTestCase public function testItShouldMakeConnectorSetup(): void { // Set - $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'connections' => $connections, - 'consumer_group' => 'some-group', - 'topic' => 'some_topic', - 'offset_reset' => 'earliest', - 'offset' => 0, - 'partition' => 1, - ]); $connector = new LowLevel(); + $brokerOptions = new Broker('kafka:9092', new None()); + $consumerConfigOptions = new ConsumerConfigOptions( + 'kafka-test', + $brokerOptions, + null, + 1, + 0, + 'some-group', + new AvroSchemaConfigOptions('http://url.teste') + ); // Actions - $result = $connector->getConsumer(true, $configManager); + $result = $connector->getConsumer(true, $consumerConfigOptions); // Assertions $this->assertInstanceOf(LowLevelConsumer::class, $result); diff --git a/tests/Unit/Connectors/Consumer/ManagerTest.php b/tests/Unit/Connectors/Consumer/ManagerTest.php index d69d72eb..52e3bcc1 100644 --- a/tests/Unit/Connectors/Consumer/ManagerTest.php +++ b/tests/Unit/Connectors/Consumer/ManagerTest.php @@ -4,7 +4,6 @@ use Exception; use Metamorphosis\Connectors\Consumer\Manager; -use Metamorphosis\ConsumerConfigManager; use Metamorphosis\Consumers\ConsumerInterface; use Metamorphosis\Exceptions\ResponseTimeoutException; use Metamorphosis\Exceptions\ResponseWarningException; @@ -14,7 +13,6 @@ use Mockery as m; use RdKafka\Message as KafkaMessage; use Tests\LaravelTestCase; -use Tests\Unit\Dummies\ConsumerHandlerDummy; class ManagerTest extends LaravelTestCase { @@ -186,22 +184,4 @@ function () use ($messages, &$count, $exception) { $runner->handleMessage(); $runner->handleMessage(); } - - protected function setUp(): void - { - parent::setUp(); - - $configManager = new ConsumerConfigManager(); - $configManager->set([ - 'connections' => 'kafka:2019', - 'topic' => 'topic_key', - 'broker' => 'default', - 'offset_reset' => 'earliest', - 'offset' => 0, - 'timeout' => 30, - 'handler' => ConsumerHandlerDummy::class, - 'middlewares' => [], - 'consumer_group' => 'consumer-id', - ]); - } } diff --git a/tests/Unit/Connectors/Producer/ConfigTest.php b/tests/Unit/Connectors/Producer/ConfigTest.php index 82adae08..d60567e1 100644 --- a/tests/Unit/Connectors/Producer/ConfigTest.php +++ b/tests/Unit/Connectors/Producer/ConfigTest.php @@ -24,7 +24,6 @@ public function testShouldValidateProducerConfig(): void 'max_poll_records' => 500, 'flush_attempts' => 10, 'partition' => -1, - 'broker' => 'default', 'topic' => 'default', 'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'), 'auth' => [ @@ -70,7 +69,6 @@ public function testShouldNotOverrideDefaultParametersWhenConfigIsSet(): void 'required_acknowledgment' => true, 'max_poll_records' => 3000, 'flush_attempts' => 10, - 'broker' => 'default', 'topic' => 'default', 'connections' => env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'), 'auth' => [ diff --git a/tests/Unit/Connectors/Producer/ConnectorTest.php b/tests/Unit/Connectors/Producer/ConnectorTest.php index e0cb6123..6652a0e7 100644 --- a/tests/Unit/Connectors/Producer/ConnectorTest.php +++ b/tests/Unit/Connectors/Producer/ConnectorTest.php @@ -3,7 +3,8 @@ namespace Tests\Unit\Connectors\Producer; use Metamorphosis\Connectors\Producer\Connector; -use Metamorphosis\ProducerConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use Metamorphosis\TopicHandler\Producer\AbstractProducer; use Metamorphosis\TopicHandler\Producer\HandleableResponseInterface; @@ -27,11 +28,13 @@ public function testItShouldMakeSetup(): void KafkaProducer::class, m::mock(KafkaProducer::class) ); - $configManager = m::mock(ProducerConfigManager::class); - $configOptions = m::mock(ProducerConfigOptions::class); + + $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); + $broker = new Broker($connections, new None()); + $producerConfigOptions = m::mock(ProducerConfigOptions::class); $connector = new Connector(); - $handler = new class ('record', $configOptions) extends AbstractProducer implements HandleableResponseInterface { + $handler = new class ('record', $producerConfigOptions) extends AbstractProducer implements HandleableResponseInterface { /** @phpcsSuppress SlevomatCodingStandard.Functions.UnusedParameter.UnusedParameter */ public function success(Message $message): void { @@ -49,18 +52,17 @@ public function failed(Message $message): void ->withAnyArgs(); $conf->expects() - ->set('metadata.broker.list', 'kafka:9092'); - - $configManager->expects() - ->get('connections') - ->andReturn('kafka:9092'); + ->set('metadata.broker.list', $connections); - $configManager->expects() - ->get('auth.type') - ->andReturn('none'); + $producerConfigOptions->expects() + ->getBroker() + ->andReturn($broker); // Actions - $result = $connector->getProducerTopic($handler, $configManager); + $result = $connector->getProducerTopic( + $handler, + $producerConfigOptions + ); // Assertions $this->assertInstanceOf(KafkaProducer::class, $result); @@ -77,11 +79,13 @@ public function testItShouldMakeSetupWithoutHandleResponse(): void KafkaProducer::class, m::mock(KafkaProducer::class) ); - $configManager = m::mock(ProducerConfigManager::class); - $configOptions = m::mock(ProducerConfigOptions::class); + + $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); + $broker = new Broker($connections, new None()); + $producerConfigOptions = m::mock(ProducerConfigOptions::class); $connector = new Connector(); - $handler = new class ('record', $configOptions) extends AbstractProducer implements HandlerInterface { + $handler = new class ('record', $producerConfigOptions) extends AbstractProducer implements HandlerInterface { /** @phpcsSuppress SlevomatCodingStandard.Functions.UnusedParameter.UnusedParameter */ public function success(Message $message): void { @@ -98,18 +102,17 @@ public function failed(Message $message): void ->never(); $conf->expects() - ->set('metadata.broker.list', 'kafka:9092'); - - $configManager->expects() - ->get('connections') - ->andReturn('kafka:9092'); + ->set('metadata.broker.list', $connections); - $configManager->expects() - ->get('auth.type') - ->andReturn('none'); + $producerConfigOptions->expects() + ->getBroker() + ->andReturn($broker); // Actions - $result = $connector->getProducerTopic($handler, $configManager); + $result = $connector->getProducerTopic( + $handler, + $producerConfigOptions + ); // Assertions $this->assertInstanceOf(KafkaProducer::class, $result); diff --git a/tests/Unit/Console/ConsumerCommandTest.php b/tests/Unit/Console/ConsumerCommandTest.php index 26132745..c4737533 100644 --- a/tests/Unit/Console/ConsumerCommandTest.php +++ b/tests/Unit/Console/ConsumerCommandTest.php @@ -138,31 +138,24 @@ protected function setUp(): void config([ 'kafka' => [ - 'brokers' => [ - 'default' => [ - 'connections' => env( - 'KAFKA_BROKER_CONNECTIONS', - 'kafka:9092' - ), - 'auth' => [], - ], - ], 'topics' => [ 'topic_key' => [ 'topic_id' => 'topic_name', - 'broker' => 'default', 'consumer' => [ - 'consumer_groups' => [ - 'default' => [ - 'offset_reset' => 'earliest', - 'handler' => ConsumerHandlerDummy::class, - 'timeout' => 123, - ], - ], + 'consumer_group' => 'default', + 'offset_reset' => 'earliest', + 'handler' => ConsumerHandlerDummy::class, + 'timeout' => 123, ], ], ], ], + 'service' => [ + 'broker' => [ + 'connections' => 'test_kafka:6680', + 'auth' => [], + ], + ], ]); } } diff --git a/tests/Unit/ConsumerConfigManagerTest.php b/tests/Unit/ConsumerConfigManagerTest.php index 347aea6c..e69de29b 100644 --- a/tests/Unit/ConsumerConfigManagerTest.php +++ b/tests/Unit/ConsumerConfigManagerTest.php @@ -1,79 +0,0 @@ -instance( - AbstractHandler::class, - m::mock(AbstractHandler::class) - ); - $config = [ - 'middlewares' => [], - 'handler' => AbstractHandler::class, - 'broker' => [ - 'default' => [ - 'connections' => 'kafka:9092', - ], - ], - 'topic_id' => 'kafka-test', - ]; - $broker = new Broker('kafka:9092', new None()); - $configOptions = new ConsumerConfigOptions( - 'kafka-override', - $broker, - '\App\Kafka\Consumers\ConsumerExample', - null, - null, - 'default', - null, - [MiddlewareDummy::class], - 200, - false - ); - - $expected = [ - 'topic_id' => 'kafka-override', - 'connections' => 'kafka:9092', - 'auth' => null, - 'timeout' => 1000, - 'handler' => '\App\Kafka\Consumers\ConsumerExample', - 'partition' => -1, - 'offset' => null, - 'consumer_group' => 'default', - 'auto_commit' => false, - 'commit_async' => true, - 'offset_reset' => 'smallest', - 'times' => 2, - ]; - - $configManager = new ConsumerConfigManager(); - - $commandConfig = [ - 'timeout' => 1000, - 'times' => 2, - ]; - - // Expectations - $handler->expects() - ->getConfigOptions() - ->andReturn($configOptions); - - // Actions - $configManager->set($config, $commandConfig); - - // Expectations - $this->assertEquals($expected, $configManager->get()); - } -} diff --git a/tests/Unit/Consumers/LowLevelTest.php b/tests/Unit/Consumers/LowLevelTest.php index 6b35f993..e2347b60 100644 --- a/tests/Unit/Consumers/LowLevelTest.php +++ b/tests/Unit/Consumers/LowLevelTest.php @@ -2,8 +2,11 @@ namespace Tests\Unit\Consumers; -use Metamorphosis\ConsumerConfigManager; use Metamorphosis\Consumers\LowLevel; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\AvroSchema as AvroSchemaConfigOptions; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; +use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions; use Mockery as m; use RdKafka\ConsumerTopic; use RdKafka\Message; @@ -16,13 +19,27 @@ public function testItShouldConsume(): void // Set $timeout = 2; $partition = 3; - $configManager = new ConsumerConfigManager(); - $configManager->set(compact('timeout', 'partition')); + + $brokerOptions = new Broker('kafka:9092', new None()); + $consumerConfigOptions = new ConsumerConfigOptions( + 'kafka-test', + $brokerOptions, + null, + $partition, + null, + '', + new AvroSchemaConfigOptions('http://url.teste'), + [], + $timeout + ); $consumerTopic = m::mock(ConsumerTopic::class); $message = new Message(); - $lowLevelConsumer = new LowLevel($consumerTopic, $configManager); + $lowLevelConsumer = new LowLevel( + $consumerTopic, + $consumerConfigOptions + ); // Expectations $consumerTopic->expects() diff --git a/tests/Unit/ManagerTest.php b/tests/Unit/ManagerTest.php index 846db3f5..e69de29b 100644 --- a/tests/Unit/ManagerTest.php +++ b/tests/Unit/ManagerTest.php @@ -1,66 +0,0 @@ -set([ - 'topic' => 'products', - 'middlewares' => [ - Log::class, - app(JsonDecode::class), - ], - 'other' => 'config', - ]); - - // Actions - $other = $manager->get('other'); - $middlewares = $manager->middlewares(); - - // Assertions - $this->assertSame('config', $other); - $this->assertInstanceOf(Log::class, $middlewares[0]); - $this->assertInstanceOf(JsonDecode::class, $middlewares[1]); - } - - public function testShouldRemoveOldMiddlewareBeforeAddOthers(): void - { - // Set - $manager = new ConsumerConfigManager(); - $firstConfig = [ - 'topic' => 'products', - 'middlewares' => [ - Log::class, - app(JsonDecode::class), - ], - 'other' => 'config', - ]; - $secondConfig = [ - 'topic' => 'products', - 'middlewares' => [ - JsonDecode::class, - ], - 'other' => 'config', - ]; - - // Actions - $manager->set($firstConfig); - $manager->set($secondConfig); - $other = $manager->get('other'); - $middlewares = $manager->middlewares(); - - // Assertions - $this->assertSame('config', $other); - $this->assertInstanceOf(JsonDecode::class, $middlewares[0]); - $this->assertCount(1, $middlewares); - } -} diff --git a/tests/Unit/Middlewares/AvroSchemaDecoderTest.php b/tests/Unit/Middlewares/AvroSchemaDecoderTest.php new file mode 100644 index 00000000..ba9edd29 --- /dev/null +++ b/tests/Unit/Middlewares/AvroSchemaDecoderTest.php @@ -0,0 +1,76 @@ +getAvroSchema(); + $avroSchema = new AvroSchema('string'); + $decoder = m::mock(Schema::class); + $clientFactory = m::mock(ClientFactory::class); + $cachedSchemaRegistryClient = m::mock( + CachedSchemaRegistryClient::class + ); + $expected = 'my awesome message'; + + $message = new Message(); + $message->payload = "\x01\x00\x00\x00\fmy-topic-key\x00\x00\x00\x05\$my awesome message"; + $message->err = 0; + + $closure = Closure::fromCallable(function ($producerRecord) { + return $producerRecord; + }); + + $consumerRecord = new ConsumerRecord($message); + + // Expectations + $clientFactory->expects() + ->make($avroSchemaConfigOptions) + ->andReturn($cachedSchemaRegistryClient); + + $cachedSchemaRegistryClient->expects() + ->getBySubjectAndVersion('my-topic-key', 5) + ->andReturn($decoder); + + $decoder->expects() + ->getAvroSchema() + ->andReturn($avroSchema); + + $avroSchemaDecoder = new AvroSchemaDecoder( + $clientFactory, + $consumerConfigOptions + ); + + $result = $avroSchemaDecoder->process($consumerRecord, $closure); + + $this->assertSame($expected, $result->getPayload()); + } +} diff --git a/tests/Unit/Middlewares/AvroSchemaMixedEncoderTest.php b/tests/Unit/Middlewares/AvroSchemaMixedEncoderTest.php new file mode 100644 index 00000000..7fc33c0f --- /dev/null +++ b/tests/Unit/Middlewares/AvroSchemaMixedEncoderTest.php @@ -0,0 +1,118 @@ +getSchemaFixture(); + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + 'kafka-test', + $broker, + null, + new AvroSchemaConfigOptions( + 'subjects/kafka-test-value/versions/latest', + [] + ) + ); + $avroSchemaConfigOptions = $producerConfigOptions->getAvroSchema(); + + $clientFactory = m::mock(ClientFactory::class); + + $cachedSchemaRegistryClient = m::mock( + CachedSchemaRegistryClient::class + ); + $schemaIdEncoder = m::mock( + SchemaId::class, + [$cachedSchemaRegistryClient] + ); + + $schema = new Schema(); + $parsedSchema = $schema->parse( + $avroSchema, + '123', + 'kafka-test-value', + 'latest' + ); + $record = $this->getRecord($parsedSchema->getAvroSchema()); + $producerRecord = new ProducerRecord($record, 'kafka-test'); + + $closure = Closure::fromCallable(function ($producerRecord) { + return $producerRecord; + }); + + $payload = json_decode($producerRecord->getPayload(), true); + $encodedMessage = 'binary_message'; + + // Expectations + $clientFactory->expects() + ->make($avroSchemaConfigOptions) + ->andReturn($cachedSchemaRegistryClient); + + $cachedSchemaRegistryClient->expects() + ->getBySubjectAndVersion('kafka-test-value', 'latest') + ->andReturn($schema); + + $schemaIdEncoder->expects() + ->encode($schema, $payload) + ->andReturn($encodedMessage); + + // Actions + $avroSchemaMixedEncoder = new AvroSchemaMixedEncoder( + $schemaIdEncoder, + $clientFactory, + $producerConfigOptions + ); + $result = $avroSchemaMixedEncoder->process($producerRecord, $closure); + + // Assertions + $this->assertSame($record, $result->getOriginal()); + $this->assertSame($encodedMessage, $result->getPayload()); + } + + private function getRecord(AvroSchema $avroSchema): string + { + $defaultValues = [ + 'null' => null, + 'boolean' => true, + 'string' => 'abc', + 'int' => 1, + 'long' => 1.0, + 'float' => 1.0, + 'double' => 1.0, + 'array' => [], + ]; + + $result = []; + foreach ($avroSchema->fields() as $field) { + $result[$field->name()] = $defaultValues[$field->type->type]; + } + + return json_encode($result); + } + + private function getSchemaFixture(): string + { + return file_get_contents( + __DIR__ . '/../fixtures/schemas/sales_price.avsc' + ); + } +} diff --git a/tests/Unit/Middlewares/Handler/ProducerTest.php b/tests/Unit/Middlewares/Handler/ProducerTest.php index 3a7bbc70..561a7c78 100644 --- a/tests/Unit/Middlewares/Handler/ProducerTest.php +++ b/tests/Unit/Middlewares/Handler/ProducerTest.php @@ -5,7 +5,6 @@ use Closure; use Metamorphosis\Middlewares\Handler\Producer; use Metamorphosis\Producer\Poll; -use Metamorphosis\ProducerConfigManager; use Metamorphosis\Record\ProducerRecord; use Mockery as m; use RdKafka\ProducerTopic as KafkaTopicProducer; @@ -38,20 +37,4 @@ public function testItShouldSendMessageToKafkaBroker(): void $producerHandler = new Producer($producerTopic, $poll, 1); $producerHandler->process($record, $closure); } - - protected function setUp(): void - { - parent::setUp(); - - $configManager = new ProducerConfigManager(); - $configManager->set([ - 'topic_id' => 'topic_name', - 'timeout' => 4000, - 'is_async' => true, - 'max_poll_records' => 500, - 'flush_attempts' => 10, - 'required_acknowledgment' => true, - 'partition' => 0, - ]); - } } diff --git a/tests/Unit/Producer/PollTest.php b/tests/Unit/Producer/PollTest.php index 8bda848d..ba216928 100644 --- a/tests/Unit/Producer/PollTest.php +++ b/tests/Unit/Producer/PollTest.php @@ -3,7 +3,10 @@ namespace Tests\Unit\Producer; use Metamorphosis\Producer\Poll; -use Metamorphosis\ProducerConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\AvroSchema as AvroSchemaConfigOptions; +use Metamorphosis\TopicHandler\ConfigOptions\Broker; +use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; use Mockery as m; use RdKafka\Producer as KafkaProducer; use RuntimeException; @@ -14,17 +17,21 @@ class PollTest extends LaravelTestCase public function testItShouldHandleMessageWithoutAcknowledgment(): void { // Set - $configManager = new ProducerConfigManager(); - $configManager->set([ - 'topic_id' => 'topic_name', - 'timeout' => 4000, - 'is_async' => true, - 'max_poll_records' => 500, - 'flush_attempts' => 10, - 'required_acknowledgment' => false, - ]); + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + 'topic_name', + $broker, + null, + new AvroSchemaConfigOptions('string', []), + [], + 4000, + true, + false, + 500, + 10 + ); $kafkaProducer = m::mock(KafkaProducer::class); - $poll = new Poll($kafkaProducer, $configManager); + $poll = new Poll($kafkaProducer, $producerConfigOptions); // Expectations $kafkaProducer->expects() @@ -40,27 +47,29 @@ public function testItShouldHandleMessageWithoutAcknowledgment(): void public function testShouldThrowExceptionWhenFlushFailed(): void { // Set - $configManager = new ProducerConfigManager(); - $configManager->set([ - 'topic_id' => 'topic_name', - 'timeout' => 1000, - 'is_async' => false, - 'max_poll_records' => 500, - 'flush_attempts' => 3, - 'required_acknowledgment' => true, - 'partition' => 0, - ]); - + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + 'topic_name', + $broker, + null, + new AvroSchemaConfigOptions('string', []), + [], + 100, + false, + true, + 500, + 10 + ); $kafkaProducer = m::mock(KafkaProducer::class); - $poll = new Poll($kafkaProducer, $configManager); + $poll = new Poll($kafkaProducer, $producerConfigOptions); // Expectations $kafkaProducer->expects() ->poll(0); $kafkaProducer->expects() - ->flush(1000) - ->times(3) + ->flush(100) + ->times(10) ->andReturn(1); $this->expectException(RuntimeException::class); @@ -72,18 +81,22 @@ public function testShouldThrowExceptionWhenFlushFailed(): void public function testItShouldHandleResponseEveryTimeWhenAsyncModeIsTrue(): void { // Set - $configManager = new ProducerConfigManager(); - $configManager->set([ - 'topic_id' => 'topic_name', - 'timeout' => 4000, - 'is_async' => false, - 'max_poll_records' => 500, - 'flush_attempts' => 10, - 'required_acknowledgment' => true, - 'partition' => 0, - ]); + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + 'topic_name', + $broker, + null, + new AvroSchemaConfigOptions('string', []), + [], + 4000, + false, + true, + 10, + 500 + ); + $kafkaProducer = m::mock(KafkaProducer::class); - $poll = new Poll($kafkaProducer, $configManager); + $poll = new Poll($kafkaProducer, $producerConfigOptions); // Expectations $kafkaProducer->expects() diff --git a/tests/Unit/ProducerTest.php b/tests/Unit/ProducerTest.php index 5a93dc8c..61f43155 100644 --- a/tests/Unit/ProducerTest.php +++ b/tests/Unit/ProducerTest.php @@ -8,11 +8,10 @@ use Metamorphosis\Middlewares\Handler\Dispatcher; use Metamorphosis\Middlewares\Handler\Producer as ProducerMiddleware; use Metamorphosis\Producer; -use Metamorphosis\ProducerConfigManager; use Metamorphosis\TopicHandler\ConfigOptions\Auth\None; +use Metamorphosis\TopicHandler\ConfigOptions\AvroSchema as AvroSchemaConfigOptions; use Metamorphosis\TopicHandler\ConfigOptions\Broker; use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions; -use Metamorphosis\TopicHandler\Producer\AbstractHandler; use Metamorphosis\TopicHandler\Producer\AbstractProducer; use Mockery as m; use RdKafka\Producer as KafkaProducer; @@ -32,38 +31,22 @@ public function testItShouldProduceRecordAsArrayThroughMiddlewareQueue(): void ); $config = m::mock(Config::class); $connector = m::mock(Connector::class); - $configManager = m::mock(ProducerConfigManager::class)->makePartial(); $producer = new Producer($config, $connector); $kafkaProducer = m::mock(KafkaProducer::class); $producerTopic = m::mock(ProducerTopic::class); - $producerHandler = new class ($record, $topic) extends AbstractHandler { + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + $topic, + $broker + ); + $producerHandler = new class ($record, $producerConfigOptions, $topic) extends AbstractProducer { }; // Expectations - $config->expects() - ->makeByTopic($topic) - ->andReturn($configManager); - - $configManager->expects() - ->middlewares() - ->andReturn([]); - - $configManager->expects() - ->get('topic_id') - ->andReturn($topic); - - $configManager->expects() - ->get('partition') - ->andReturn(0); - - $configManager->expects() - ->get('timeout') - ->andReturn(1000); - $connector->expects() - ->getProducerTopic($producerHandler, $configManager) + ->getProducerTopic($producerHandler, $producerConfigOptions) ->andReturn($kafkaProducer); $kafkaProducer->expects() @@ -87,36 +70,27 @@ public function testItShouldProduceRecordAsStringThroughMiddlewareQueue(): void ProducerMiddleware::class, m::mock(ProducerMiddleware::class) ); + $config = m::mock(Config::class); $connector = m::mock(Connector::class); - $configManager = m::mock(ProducerConfigManager::class)->makePartial(); + + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + $topic, + $broker + ); + $producer = new Producer($config, $connector); $kafkaProducer = m::mock(KafkaProducer::class); $producerTopic = m::mock(ProducerTopic::class); - $producerHandler = new class ($record, $topic) extends AbstractHandler { + $producerHandler = new class ($record, $producerConfigOptions, $topic) extends AbstractProducer { }; // Expectations - $config->expects() - ->makeByTopic($topic) - ->andReturn($configManager); - - $configManager->expects() - ->middlewares() - ->andReturn([]); - - $configManager->expects() - ->get('topic_id') - ->andReturn($topic); - - $configManager->expects() - ->get('partition') - ->andReturn(0); - $connector->expects() - ->getProducerTopic($producerHandler, $configManager) + ->getProducerTopic($producerHandler, $producerConfigOptions) ->andReturn($kafkaProducer); $kafkaProducer->expects() @@ -143,52 +117,29 @@ public function testItShouldThrowJsonExceptionWhenPassingMalFormattedArray(): vo $config = m::mock(Config::class); $connector = m::mock(Connector::class); $producer = new Producer($config, $connector); - $configManager = m::mock(ProducerConfigManager::class); + $kafkaProducer = m::mock(KafkaProducer::class); $producerTopic = m::mock(ProducerTopic::class); - $producerHandler = new class ($record, $topic) extends AbstractHandler { + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + $topic, + $broker, + 0, + new AvroSchemaConfigOptions('string'), + [], + 1000, + false, + true, + 500, + 1 + ); + $producerHandler = new class ($record, $producerConfigOptions, $topic) extends AbstractProducer { }; // Expectations - $configManager->expects() - ->middlewares() - ->andReturn([]); - - $configManager->expects() - ->get('topic_id') - ->andReturn($topic); - - $configManager->expects() - ->get('partition') - ->andReturn(0); - - $configManager->expects() - ->get('max_poll_records') - ->andReturn(500); - - $configManager->expects() - ->get('required_acknowledgment') - ->andReturn(true); - - $configManager->expects() - ->get('flush_attempts') - ->andReturn(1); - - $configManager->expects() - ->get('timeout') - ->andReturn(1000); - - $configManager->expects() - ->get('is_async') - ->andReturn(false); - - $config->expects() - ->makeByTopic($topic) - ->andReturn($configManager); - $connector->expects() - ->getProducerTopic($producerHandler, $configManager) + ->getProducerTopic($producerHandler, $producerConfigOptions) ->andReturn($kafkaProducer); $kafkaProducer->expects() @@ -221,50 +172,27 @@ public function testShouldBuildDispatcher(): void $kafkaProducer = m::mock(KafkaProducer::class); $producerTopic = m::mock(ProducerTopic::class); - $configManager = m::mock(ProducerConfigManager::class); - $producerHandler = new class ($record, $topic) extends AbstractHandler { + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + $topic, + $broker, + null, + new AvroSchemaConfigOptions('string'), + [], + 1000, + true, + true, + 500, + 1 + ); + + $producerHandler = new class ($record, $producerConfigOptions, $topic) extends AbstractProducer { }; // Expectations - $config->expects() - ->makeByTopic($topic) - ->andReturn($configManager); - - $configManager->expects() - ->middlewares() - ->andReturn([]); - - $configManager->expects() - ->get('topic_id') - ->andReturn($topic); - - $configManager->expects() - ->get('partition') - ->andReturn(0); - - $configManager->expects() - ->get('max_poll_records') - ->andReturn(500); - - $configManager->expects() - ->get('is_async') - ->andReturn(true); - - $configManager->expects() - ->get('required_acknowledgment') - ->andReturn(true); - - $configManager->expects() - ->get('flush_attempts') - ->andReturn(1); - - $configManager->expects() - ->get('timeout') - ->andReturn(1000); - $connector->expects() - ->getProducerTopic($producerHandler, $configManager) + ->getProducerTopic($producerHandler, $producerConfigOptions) ->andReturn($kafkaProducer); $kafkaProducer->expects() @@ -286,7 +214,7 @@ public function testShouldBuildDispatcherWithConfigOptions(): void { // Set $record = json_encode(['message' => 'some message']); - $topicId = 'TOPIC-ID'; + $topic = 'TOPIC-ID'; $config = m::mock(Config::class); $connector = m::mock(Connector::class); @@ -294,56 +222,30 @@ public function testShouldBuildDispatcherWithConfigOptions(): void $kafkaProducer = m::mock(KafkaProducer::class); $producerTopic = m::mock(ProducerTopic::class); - $configManager = m::mock(ProducerConfigManager::class); - $connections = env('KAFKA_BROKER_CONNECTIONS', 'kafka:9092'); - $broker = new Broker($connections, new None()); - $configOptions = new ProducerConfigOptions($topicId, $broker); - $producerHandler = new class ($record, $configOptions) extends AbstractProducer { + + $broker = new Broker('kafka:9092', new None()); + $producerConfigOptions = new ProducerConfigOptions( + $topic, + $broker, + null, + new AvroSchemaConfigOptions('string'), + [], + 1000, + true, + true, + 500, + 1 + ); + $producerHandler = new class ($record, $producerConfigOptions, $topic) extends AbstractProducer { }; // Expectations - $config->expects() - ->make($configOptions) - ->andReturn($configManager); - - $configManager->expects() - ->middlewares() - ->andReturn([]); - - $configManager->expects() - ->get('topic_id') - ->andReturn($topicId); - - $configManager->expects() - ->get('partition') - ->andReturn(0); - - $configManager->expects() - ->get('max_poll_records') - ->andReturn(500); - - $configManager->expects() - ->get('is_async') - ->andReturn(true); - - $configManager->expects() - ->get('required_acknowledgment') - ->andReturn(true); - - $configManager->expects() - ->get('flush_attempts') - ->andReturn(1); - - $configManager->expects() - ->get('timeout') - ->andReturn(1000); - $connector->expects() - ->getProducerTopic($producerHandler, $configManager) + ->getProducerTopic($producerHandler, $producerConfigOptions) ->andReturn($kafkaProducer); $kafkaProducer->expects() - ->newTopic($topicId) + ->newTopic($topic) ->andReturn($producerTopic); $kafkaProducer->expects() diff --git a/tests/Unit/TopicHandler/ConfigOptions/Factories/ConsumerFactoryTest.php b/tests/Unit/TopicHandler/ConfigOptions/Factories/ConsumerFactoryTest.php index cb78b0f7..02aa3368 100644 --- a/tests/Unit/TopicHandler/ConfigOptions/Factories/ConsumerFactoryTest.php +++ b/tests/Unit/TopicHandler/ConfigOptions/Factories/ConsumerFactoryTest.php @@ -34,18 +34,15 @@ public function testShouldMakeConfigOptionWithAvroSchema(): void $topicData = [ 'topic_id' => 'kafka-test', 'consumer' => [ - 'consumer_groups' => [ - 'test-consumer-group' => [ - 'middlewares' => [], - 'auto_commit' => true, - 'commit_async' => true, - 'offset_reset' => 'earliest', - 'handler' => '\App\Kafka\Consumers\ConsumerExample', - 'partition' => 0, - 'offset' => 0, - 'timeout' => 20000, - ], - ], + 'consumer_group' => 'test-consumer-group', + 'middlewares' => [], + 'auto_commit' => true, + 'commit_async' => true, + 'offset_reset' => 'earliest', + 'handler' => '\App\Kafka\Consumers\ConsumerExample', + 'partition' => 0, + 'offset' => 0, + 'timeout' => 20000, ], ]; $expected = [