Skip to content

Commit

Permalink
Merge pull request #404 from michalvavrik/feature/fix-daily-failure-v…
Browse files Browse the repository at this point in the history
…ertx-kafka

Fix Quarkus Vert.x scenarios with Kafka failures
  • Loading branch information
mjurc authored Aug 6, 2023
2 parents 1cd327a + 30a7624 commit f5e9576
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 200 deletions.
29 changes: 0 additions & 29 deletions 301-quarkus-vertx-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,6 @@ so we don't really need a new verticle to consume Kafka in a reactive Non-blocki

[1]: https://vertx.io/docs/vertx-core/java/#_verticles
[2]: https://vertx.io/docs/vertx-core/java/#_executing_periodic_and_delayed_actions

### Live coding with Quarkus

#### Strimzi:

> docker-compose -f docker-compose-strimzi.yaml up
>
> mvn quarkus:dev
#### Confluent:
> docker-compose -f docker-compose-confluent.yaml up
>
> mvn -Dquarkus.profile=confluent quarkus:dev

#### Run automated tests
> mvn test
### Troubleshooting

#### When I swap from Strimzi/confluent sometimes Kafka broker doesn't weak up (docker-compose).
Is something that We should investigate, must be for some service name collisions. It's the middle time you could clean your docker containers by running the following commands:
> docker stop $(docker ps -a -q)
>
> docker rm $(docker ps -a -q)
#### [Only Fedora] When I launch a docker compose, kafka broker can't reach Zookeper node rather a internal network issue.
`error: kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING

Looks that the default backend for firewalld was changed from iptables to nftables, since Fedora32. A quick patch it's a rollback to iptables as firewalld backend.
> sudo sed -i 's/FirewallBackend=nftables/FirewallBackend=iptables/g' /etc/firewalld/firewalld.conf
>
> sudo systemctl restart firewalld docker
37 changes: 17 additions & 20 deletions 301-quarkus-vertx-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,46 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry-exporter-otlp</artifactId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<!-- Avro -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro</artifactId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<exclusions>
<exclusion>
<groupId>org.jboss.spec.javax.interceptor</groupId>
<artifactId>jboss-interceptors-api_1.2_spec</artifactId>
</exclusion>
</exclusions>
<artifactId>apicurio-registry-client</artifactId>
</dependency>
<!-- quarkiverse-apicurio-registry-client: native mode dependency -->
<dependency>
<groupId>io.quarkiverse.apicurio</groupId>
<artifactId>quarkiverse-apicurio-registry-client</artifactId>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jsonb</artifactId>
</dependency>
<!-- quarkiverse-apicurio-registry-client: native mode dependency end-->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
Expand All @@ -83,8 +81,7 @@
</dependencies>
<profiles>
<!--
Skip native build and run due to stale deps
apicurio-registry-utils-serde and quarkiverse-apicurio-registry-client are not developed anymore
Skip native build as Confluent Kafka serializers don't work in native OOTB
-->
<profile>
<id>native</id>
Expand Down
23 changes: 11 additions & 12 deletions 301-quarkus-vertx-kafka/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,34 @@
kafka.bootstrap.servers=localhost:9092

# Kafka AVRO scenario
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2

vertx.kafka.producer.delay-ms=100
vertx.kafka.producer.batch-size=1

mp.messaging.outgoing.source-stock-price.connector=smallrye-kafka
mp.messaging.outgoing.source-stock-price.topic=stock-price
mp.messaging.outgoing.source-stock-price.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.source-stock-price.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.source-stock-price.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.source-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
mp.messaging.outgoing.source-stock-price.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer
mp.messaging.outgoing.source-stock-price.apicurio.registry.auto-register=true
mp.messaging.outgoing.source-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.serde.avro.ReflectAvroDatumProvider

mp.messaging.incoming.channel-stock-price.connector=smallrye-kafka
mp.messaging.incoming.channel-stock-price.specific.avro.reader=true
mp.messaging.incoming.channel-stock-price.topic=stock-price
mp.messaging.incoming.channel-stock-price.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.channel-stock-price.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer
mp.messaging.incoming.channel-stock-price.auto.offset.reset=earliest
mp.messaging.incoming.channel-stock-price.enable.auto.commit=true
mp.messaging.incoming.channel-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
mp.messaging.incoming.channel-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
mp.messaging.incoming.channel-stock-price.apicurio.registry.use-specific-avro-reader=true

mp.messaging.outgoing.sink-stock-price.connector=smallrye-kafka
mp.messaging.outgoing.sink-stock-price.topic=end-stock-price
mp.messaging.outgoing.sink-stock-price.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.sink-stock-price.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.sink-stock-price.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.sink-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
mp.messaging.outgoing.sink-stock-price.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer
mp.messaging.outgoing.sink-stock-price.apicurio.registry.auto-register=true
mp.messaging.outgoing.sink-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.serde.avro.ReflectAvroDatumProvider

# Jaeger
quarkus.opentelemetry.tracer.exporter.otlp.endpoint=http://localhost:4317/api/traces
quarkus.otel.exporter.otlp.traces.endpoint=http://localhost:4317/api/traces

# Configuration file - Quarkus profile: Confluent

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void producerConsumerTest() throws InterruptedException {
@Test
public void kafkaProducerShouldTrace() {
final int pageLimit = 50;
final String expectedOperationName = "stock-price send";
final String expectedOperationName = "stock-price publish";
await().atMost(1, TimeUnit.MINUTES).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
thenRetrieveTraces(pageLimit, "1h", expectedOperationName);
thenStatusCodeMustBe(HttpStatus.SC_OK);
Expand Down Expand Up @@ -70,7 +70,8 @@ public void kafkaConsumerShouldTrace() throws InterruptedException {
}

private void thenRetrieveTraces(int pageLimit, String lookBack, String operationName) {
resp = given().when()
resp = given()
.when()
.queryParam("limit", pageLimit)
.queryParam("lookback", lookBack)
.queryParam("service", getServiceName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class ConfluentKafkaResource implements QuarkusTestResourceLifecycleManag
@Override
public Map<String, String> start() {
Network network = Network.newNetwork();
kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")).withNetwork(network);
schemaRegistry = new SchemaRegistryContainer("confluentinc/cp-schema-registry", "6.1.1", 8081).withNetwork(network)
kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.1")).withNetwork(network);
schemaRegistry = new SchemaRegistryContainer("confluentinc/cp-schema-registry", "7.4.1", 8081).withNetwork(network)
.withKafka(kafkaContainer, 9092);

Startables.deepStart(Stream.of(kafkaContainer, schemaRegistry)).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

public class JaegerContainer extends GenericContainer<JaegerContainer> {
private static final String COLLECTOR_OTLP_ENABLED = "COLLECTOR_OTLP_ENABLED";
private static final int OLTP_PORT = 4317;
public static final int OLTP_PORT = 4317;
public static final int TRACE_PORT = 16686;

public JaegerContainer() {
super("quay.io/jaegertracing/all-in-one:1.37.0");
super("quay.io/jaegertracing/all-in-one:1.41.0");
waitingFor(new LogMessageWaitStrategy().withRegEx(".*\"Health Check state change\",\"status\":\"ready\".*\\s"));
addFixedExposedPort(OLTP_PORT, OLTP_PORT);
addFixedExposedPort(TRACE_PORT, TRACE_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public Map<String, String> start() {
container.start();

return Collections.singletonMap(
"quarkus.opentelemetry.tracer.exporter.jaeger.endpoint",
String.format("http://%s:%s/api/traces", container.getContainerIpAddress(), JaegerContainer.TRACE_PORT));
"quarkus.otel.exporter.otlp.traces.endpoint",
String.format("http://%s:%s/api/traces", container.getHost(), JaegerContainer.OLTP_PORT));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ protected SchemaRegistryContainer withConfluentKafka(Network network, String boo
* @return Schema Registry URL
*/
public String getSchemaRegistryUrl() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(port);
return "http://" + getHost() + ":" + getMappedPort(port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ public class StrimziKafkaResource implements QuarkusTestResourceLifecycleManager
public Map<String, String> start() {
Network network = Network.newNetwork();

kafkaContainer = new StrimziKafkaContainer("quay.io/strimzi/kafka:latest-kafka-2.8.0").withNetwork(network);
schemaRegistry = new SchemaRegistryContainer("apicurio/apicurio-registry-mem", "1.2.2.Final", 8080).withNetwork(network)
.withKafka(kafkaContainer, 9092);
kafkaContainer = new StrimziKafkaContainer("quay.io/strimzi/kafka:0.34.0-kafka-3.4.0").withNetwork(network);
schemaRegistry = new SchemaRegistryContainer("quay.io/apicurio/apicurio-registry-mem", "2.4.2.Final", 8080);

Startables.deepStart(Stream.of(kafkaContainer, schemaRegistry)).join();

Expand All @@ -38,7 +37,7 @@ public Map<String, String> start() {
Map<String, String> config = new HashMap<>();
config.put("kafka.bootstrap.servers", kafkaUrl);
config.put("quarkus.kafka-streams.bootstrap-servers", kafkaUrl);
config.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", registryUrl + "/api");
config.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", registryUrl + "/apis/registry/v2");

return config;
}
Expand Down
14 changes: 1 addition & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@
<surefire-plugin.version>3.1.0</surefire-plugin.version>
<testcontainers.version>1.18.3</testcontainers.version>
<wiremock.version>2.35.0</wiremock.version>
<apicurio-registry-utils-serde.version>1.3.2.Final</apicurio-registry-utils-serde.version>
<confluent.kafka-avro-serializer.version>6.0.0</confluent.kafka-avro-serializer.version>
<confluent.kafka-avro-serializer.version>7.4.0</confluent.kafka-avro-serializer.version>
<version.assertj>3.24.2</version.assertj>
<version.quarkiverse.apicurio.registry.client>0.0.2</version.quarkiverse.apicurio.registry.client>
<version.quarkiverse.consul>2.0.1</version.quarkiverse.consul>
<version.quarkus.camel>2.16.0</version.quarkus.camel>
<formatter-maven-plugin.version>2.23.0</formatter-maven-plugin.version>
Expand Down Expand Up @@ -85,21 +83,11 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>${apicurio-registry-utils-serde.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.kafka-avro-serializer.version}</version>
</dependency>
<dependency>
<groupId>io.quarkiverse.apicurio</groupId>
<artifactId>quarkiverse-apicurio-registry-client</artifactId>
<version>${version.quarkiverse.apicurio.registry.client}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kamelet</artifactId>
Expand Down

0 comments on commit f5e9576

Please sign in to comment.