Skip to content

A test library 🧰 which aim is to make the experience in testing Kafka application less painful 🍺

License

Notifications You must be signed in to change notification settings

farajist/kafkaesque

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GitHub Package Registry version

Kafkaesque

Kafkaesque is a test library whose aim is to make the experience in testing Kafka application less painful. By now, the project is in its early stage, defining the API that we will implement in the near future.

Every help will be very useful :)

The library allows to test the following use cases:

Use Case 1: The Application Produces Some Messages on a Topic

The first use case tests the messages produced by an application, reading them from the topic. The code, producing the messages, is external to Kafkaesque. Through Kafkaesque, it is possible to assert some properties on the messages generated by the application.

Kafkaesque
  .at("broker:port")
  .<Key, Value>consume()
  .fromTopic("topic-name")
  .withDeserializers(keyDeserializer, valueDeserializer)
  .waitingAtMost(10, SECONDS)
  .waitingEmptyPolls(2, 50L, MILLISECONDS)
  .expectingConsumed()
  .havingRecordsSize(3) // <-- from here we use a ConsumedResult
  .havingHeaders(headers -> {
    // Assertions on headers
  })
  .havingKeys(keys -> {
    // Assertions on keys
  })
  .havingPayloads(payloads -> {
    // Asserions on payloads
  })
  .havingConsumerRecords(records -> {
    // Assertions on the full list of ConsumerRecord<Key, Value>
  })
  .assertingThatPayloads(contains("42")) // Uses Hamcrest.Matchers on collections :)
  .andCloseConsumer();

Use Case 2: The Application Consumes Some Messages from a Topic

The second use case tests an application that reads messages from a topic. Kafkaesque is responsible to produce such messages to trigger the execution of the application. It is also possible to assert conditions on the system state after the consumption of the messages.

Kafkaesque
  .at("broker:port")
  .<Key, Value>produce()
  .toTopic("topic-name")
  .withDeserializers(keyDeserializer, valueDeserializer)
  .messages( /* Some list of messages */)
  .waitingAtMostForEachAck(100, MILLISECONDS) // Waiting time for each ack from the broker
  .waitingForTheConsumerAtMost(10, SECONDS) // Waiting time for the consumer to read one / all the messages
  .andAfterAll()
  .asserting(messages -> {
    // Assertions on the consumer process after the sending of all the messages
  });

An equivalent method pipeline is available to test assertions after the consumption of each message:

Kafkaesque
  .at("broker:port")
  .<Key, Value>produce()
  .toTopic("topic-name")
  .withDeserializers(keyDeserializer, valueDeserializer)
  .messages( /* Some list of messages */)
  .waitingAtMostForEachAck(100, MILLISECONDS) // Waiting time for each ack from the broker
  .waitingForTheConsumerAtMost(10, SECONDS) // Waiting time for the consumer to read one / all the messages
  .andAfterEach()
  .asserting(message -> {
    // Assertions on the consumer process after the sending of each message
  });

Use Case 3: Synchronize on Produced or Consumed Messages and Test Them Outside Kafkaesque

The kafka-streams-test-utils testing library offers to developers some useful and powerful abstractions. Indeed, the TestInputTopic and the TestOutputTopic let developers manage asynchronous communication with a broker as it is fully synchronous. In this case, the library does not start any broker, not even embedded.

Kafkaesque offers to developers the same abstractions, trying to achieve the same synchronous behavior, using the yolo.Kfksq class.

var kfksq = Kfksq.at("broker:port");
var inputTopic = kfksq.createInputTopic("inputTopic", keySerializer, valueSerializer);
inputTopic.pipeInput("key", "value");

var outputTopic = kfksq.createOutputTopic("outputTopic", keyDeserializer, valueDeserializer);
var records = outputTopic.readRecordsToList();

Modules

Core module

The Kafkaesque library contains many submodules. The kafkaesque-core module contains the interfaces and agnostic concrete classes offering the above fluid API. Add the following dependency to your pom.xml file to use module:

<dependency>
  <groupId>in.rcard</groupId>
  <artifactId>kafkaesque-core</artifactId>
  <version>1.0.0-RC1</version>
  <scope>test</scope>
</dependency>

In detail, the kafkaesque-core module uses the Awaitility Java library to deal with the asynchronicity nature of each of the above use cases.

Since the library is published in the GitHub Packages repository, you need to add also the following definition to your pom.xml:

<repository>
  <id>github</id>
  <name>GitHub rcardin Apache Maven Packages</name>
  <url>https://maven.pkg.github.com/rcardin/kafkaesque</url>
</repository>

About

A test library 🧰 which aim is to make the experience in testing Kafka application less painful 🍺

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 100.0%