From 30a7624004797a4857f5219090f870f73a5161d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Vav=C5=99=C3=ADk?= Date: Sat, 5 Aug 2023 16:23:50 +0200 Subject: [PATCH] Fix Quarkus Vert.x scenarios with Kafka failures --- 301-quarkus-vertx-kafka/README.md | 29 -------- 301-quarkus-vertx-kafka/pom.xml | 37 +++++----- .../src/main/resources/application.properties | 23 +++---- .../resources/docker-compose-confluent.yaml | 69 ------------------- .../resources/docker-compose-strimzi.yaml | 44 ------------ .../io/quarkus/qe/kafka/KafkaCommonTest.java | 5 +- .../resources/ConfluentKafkaResource.java | 4 +- .../qe/kafka/resources/JaegerContainer.java | 4 +- .../kafka/resources/JaegerTestResource.java | 4 +- .../resources/SchemaRegistryContainer.java | 2 +- .../kafka/resources/StrimziKafkaResource.java | 7 +- pom.xml | 14 +--- 12 files changed, 42 insertions(+), 200 deletions(-) delete mode 100644 301-quarkus-vertx-kafka/src/main/resources/docker-compose-confluent.yaml delete mode 100644 301-quarkus-vertx-kafka/src/main/resources/docker-compose-strimzi.yaml diff --git a/301-quarkus-vertx-kafka/README.md b/301-quarkus-vertx-kafka/README.md index 19966917..4f468c26 100644 --- a/301-quarkus-vertx-kafka/README.md +++ b/301-quarkus-vertx-kafka/README.md @@ -40,35 +40,6 @@ so we don't really need a new verticle to consume Kafka in a reactive Non-blocki [1]: https://vertx.io/docs/vertx-core/java/#_verticles [2]: https://vertx.io/docs/vertx-core/java/#_executing_periodic_and_delayed_actions - -### Live coding with Quarkus - -#### Strimzi: - -> docker-compose -f docker-compose-strimzi.yaml up -> -> mvn quarkus:dev - -#### Confluent: -> docker-compose -f docker-compose-confluent.yaml up -> -> mvn -Dquarkus.profile=confluent quarkus:dev #### Run automated tests > mvn test - -### Troubleshooting - -#### When I swap from Strimzi/confluent sometimes Kafka broker doesn't weak up (docker-compose). -Is something that We should investigate, must be for some service name collisions. It's the middle time you could clean your docker containers by running the following commands: -> docker stop $(docker ps -a -q) -> -> docker rm $(docker ps -a -q) - -#### [Only Fedora] When I launch a docker compose, kafka broker can't reach Zookeper node rather a internal network issue. -`error: kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING - -Looks that the default backend for firewalld was changed from iptables to nftables, since Fedora32. A quick patch it's a rollback to iptables as firewalld backend. -> sudo sed -i 's/FirewallBackend=nftables/FirewallBackend=iptables/g' /etc/firewalld/firewalld.conf -> -> sudo systemctl restart firewalld docker diff --git a/301-quarkus-vertx-kafka/pom.xml b/301-quarkus-vertx-kafka/pom.xml index 9ceda001..076854b9 100644 --- a/301-quarkus-vertx-kafka/pom.xml +++ b/301-quarkus-vertx-kafka/pom.xml @@ -23,48 +23,46 @@ io.quarkus - quarkus-opentelemetry-exporter-otlp + quarkus-vertx io.quarkus - quarkus-vertx + quarkus-smallrye-reactive-messaging-kafka + io.quarkus - quarkus-smallrye-reactive-messaging-kafka + quarkus-kafka-client + io.confluent kafka-avro-serializer io.quarkus - quarkus-avro + quarkus-confluent-registry-avro - io.quarkus - quarkus-confluent-registry-avro + io.apicurio + apicurio-common-rest-client-vertx io.quarkus - quarkus-resteasy-jsonb + quarkus-apicurio-registry-avro io.apicurio - apicurio-registry-utils-serde - - - org.jboss.spec.javax.interceptor - jboss-interceptors-api_1.2_spec - - + apicurio-registry-client - - io.quarkiverse.apicurio - quarkiverse-apicurio-registry-client + io.quarkus + quarkus-resteasy-jsonb + + + io.quarkus + quarkus-rest-client-jsonb - org.testcontainers kafka @@ -83,8 +81,7 @@ native diff --git a/301-quarkus-vertx-kafka/src/main/resources/application.properties b/301-quarkus-vertx-kafka/src/main/resources/application.properties index d04008a5..e7bed21f 100644 --- a/301-quarkus-vertx-kafka/src/main/resources/application.properties +++ b/301-quarkus-vertx-kafka/src/main/resources/application.properties @@ -2,35 +2,34 @@ kafka.bootstrap.servers=localhost:9092 # Kafka AVRO scenario -mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api +mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 vertx.kafka.producer.delay-ms=100 vertx.kafka.producer.batch-size=1 mp.messaging.outgoing.source-stock-price.connector=smallrye-kafka mp.messaging.outgoing.source-stock-price.topic=stock-price -mp.messaging.outgoing.source-stock-price.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer -mp.messaging.outgoing.source-stock-price.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy -mp.messaging.outgoing.source-stock-price.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy -mp.messaging.outgoing.source-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +mp.messaging.outgoing.source-stock-price.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer +mp.messaging.outgoing.source-stock-price.apicurio.registry.auto-register=true +mp.messaging.outgoing.source-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.serde.avro.ReflectAvroDatumProvider mp.messaging.incoming.channel-stock-price.connector=smallrye-kafka mp.messaging.incoming.channel-stock-price.specific.avro.reader=true mp.messaging.incoming.channel-stock-price.topic=stock-price -mp.messaging.incoming.channel-stock-price.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer +mp.messaging.incoming.channel-stock-price.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer mp.messaging.incoming.channel-stock-price.auto.offset.reset=earliest mp.messaging.incoming.channel-stock-price.enable.auto.commit=true -mp.messaging.incoming.channel-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +mp.messaging.incoming.channel-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.serde.avro.ReflectAvroDatumProvider +mp.messaging.incoming.channel-stock-price.apicurio.registry.use-specific-avro-reader=true mp.messaging.outgoing.sink-stock-price.connector=smallrye-kafka mp.messaging.outgoing.sink-stock-price.topic=end-stock-price -mp.messaging.outgoing.sink-stock-price.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer -mp.messaging.outgoing.sink-stock-price.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy -mp.messaging.outgoing.sink-stock-price.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy -mp.messaging.outgoing.sink-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +mp.messaging.outgoing.sink-stock-price.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer +mp.messaging.outgoing.sink-stock-price.apicurio.registry.auto-register=true +mp.messaging.outgoing.sink-stock-price.apicurio.registry.avro-datum-provider=io.apicurio.registry.serde.avro.ReflectAvroDatumProvider # Jaeger -quarkus.opentelemetry.tracer.exporter.otlp.endpoint=http://localhost:4317/api/traces +quarkus.otel.exporter.otlp.traces.endpoint=http://localhost:4317/api/traces # Configuration file - Quarkus profile: Confluent diff --git a/301-quarkus-vertx-kafka/src/main/resources/docker-compose-confluent.yaml b/301-quarkus-vertx-kafka/src/main/resources/docker-compose-confluent.yaml deleted file mode 100644 index 2f21443b..00000000 --- a/301-quarkus-vertx-kafka/src/main/resources/docker-compose-confluent.yaml +++ /dev/null @@ -1,69 +0,0 @@ -version: '2' -services: - zookeeper: - image: confluentinc/cp-zookeeper:5.3.0 - hostname: zookeeper - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - - broker: - image: confluentinc/cp-enterprise-kafka:5.4.1 - hostname: broker - container_name: broker - depends_on: - - zookeeper - ports: - - "29092:29092" - - "9092:9092" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 - CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 - CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 - CONFLUENT_METRICS_ENABLE: 'true' - CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' - - schema-registry: - image: confluentinc/cp-schema-registry:5.4.1 - hostname: schema-registry - container_name: schema-registry - depends_on: - - zookeeper - - broker - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' - control-center: - image: confluentinc/cp-enterprise-control-center:5.4.1 - hostname: control-center - container_name: control-center - depends_on: - - zookeeper - - broker - - schema-registry - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' - CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' - CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083' - CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088" - CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088" - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 - CONFLUENT_METRICS_TOPIC_REPLICATION: 1 - PORT: 9021 \ No newline at end of file diff --git a/301-quarkus-vertx-kafka/src/main/resources/docker-compose-strimzi.yaml b/301-quarkus-vertx-kafka/src/main/resources/docker-compose-strimzi.yaml deleted file mode 100644 index 9040a5ad..00000000 --- a/301-quarkus-vertx-kafka/src/main/resources/docker-compose-strimzi.yaml +++ /dev/null @@ -1,44 +0,0 @@ -version: '2' - -services: - - zookeeper: - image: strimzi/kafka:0.18.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" - ] - ports: - - "2181:2181" - environment: - LOG_DIR: /tmp/logs - - kafka: - image: strimzi/kafka:0.18.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" - ] - depends_on: - - zookeeper - links: - - "zookeeper:zookeeper" - ports: - - "9092:9092" - environment: - LOG_DIR: "/tmp/logs" - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - - schema-registry: - image: apicurio/apicurio-registry-mem:1.3.1.Final - ports: - - 8081:8080 - depends_on: - - kafka - environment: - QUARKUS_PROFILE: prod - KAFKA_BOOTSTRAP_SERVERS: localhost:9092 - APPLICATION_ID: registry_id - APPLICATION_SERVER: localhost:9000 diff --git a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/KafkaCommonTest.java b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/KafkaCommonTest.java index 995cbb8b..e708d386 100644 --- a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/KafkaCommonTest.java +++ b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/KafkaCommonTest.java @@ -41,7 +41,7 @@ public void producerConsumerTest() throws InterruptedException { @Test public void kafkaProducerShouldTrace() { final int pageLimit = 50; - final String expectedOperationName = "stock-price send"; + final String expectedOperationName = "stock-price publish"; await().atMost(1, TimeUnit.MINUTES).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { thenRetrieveTraces(pageLimit, "1h", expectedOperationName); thenStatusCodeMustBe(HttpStatus.SC_OK); @@ -70,7 +70,8 @@ public void kafkaConsumerShouldTrace() throws InterruptedException { } private void thenRetrieveTraces(int pageLimit, String lookBack, String operationName) { - resp = given().when() + resp = given() + .when() .queryParam("limit", pageLimit) .queryParam("lookback", lookBack) .queryParam("service", getServiceName()) diff --git a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/ConfluentKafkaResource.java b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/ConfluentKafkaResource.java index c5af141a..8345aa51 100644 --- a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/ConfluentKafkaResource.java +++ b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/ConfluentKafkaResource.java @@ -23,8 +23,8 @@ public class ConfluentKafkaResource implements QuarkusTestResourceLifecycleManag @Override public Map start() { Network network = Network.newNetwork(); - kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")).withNetwork(network); - schemaRegistry = new SchemaRegistryContainer("confluentinc/cp-schema-registry", "6.1.1", 8081).withNetwork(network) + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.1")).withNetwork(network); + schemaRegistry = new SchemaRegistryContainer("confluentinc/cp-schema-registry", "7.4.1", 8081).withNetwork(network) .withKafka(kafkaContainer, 9092); Startables.deepStart(Stream.of(kafkaContainer, schemaRegistry)).join(); diff --git a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerContainer.java b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerContainer.java index c5adc624..5c0d96f2 100644 --- a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerContainer.java +++ b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerContainer.java @@ -5,11 +5,11 @@ public class JaegerContainer extends GenericContainer { private static final String COLLECTOR_OTLP_ENABLED = "COLLECTOR_OTLP_ENABLED"; - private static final int OLTP_PORT = 4317; + public static final int OLTP_PORT = 4317; public static final int TRACE_PORT = 16686; public JaegerContainer() { - super("quay.io/jaegertracing/all-in-one:1.37.0"); + super("quay.io/jaegertracing/all-in-one:1.41.0"); waitingFor(new LogMessageWaitStrategy().withRegEx(".*\"Health Check state change\",\"status\":\"ready\".*\\s")); addFixedExposedPort(OLTP_PORT, OLTP_PORT); addFixedExposedPort(TRACE_PORT, TRACE_PORT); diff --git a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerTestResource.java b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerTestResource.java index fec69461..d557c1cc 100644 --- a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerTestResource.java +++ b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/JaegerTestResource.java @@ -18,8 +18,8 @@ public Map start() { container.start(); return Collections.singletonMap( - "quarkus.opentelemetry.tracer.exporter.jaeger.endpoint", - String.format("http://%s:%s/api/traces", container.getContainerIpAddress(), JaegerContainer.TRACE_PORT)); + "quarkus.otel.exporter.otlp.traces.endpoint", + String.format("http://%s:%s/api/traces", container.getHost(), JaegerContainer.OLTP_PORT)); } @Override diff --git a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/SchemaRegistryContainer.java b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/SchemaRegistryContainer.java index 37fcb8c3..c7b4d234 100644 --- a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/SchemaRegistryContainer.java +++ b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/SchemaRegistryContainer.java @@ -50,6 +50,6 @@ protected SchemaRegistryContainer withConfluentKafka(Network network, String boo * @return Schema Registry URL */ public String getSchemaRegistryUrl() { - return "http://" + getContainerIpAddress() + ":" + getMappedPort(port); + return "http://" + getHost() + ":" + getMappedPort(port); } } diff --git a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/StrimziKafkaResource.java b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/StrimziKafkaResource.java index 11da4161..eb5b8075 100644 --- a/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/StrimziKafkaResource.java +++ b/301-quarkus-vertx-kafka/src/test/java/io/quarkus/qe/kafka/resources/StrimziKafkaResource.java @@ -23,9 +23,8 @@ public class StrimziKafkaResource implements QuarkusTestResourceLifecycleManager public Map start() { Network network = Network.newNetwork(); - kafkaContainer = new StrimziKafkaContainer("quay.io/strimzi/kafka:latest-kafka-2.8.0").withNetwork(network); - schemaRegistry = new SchemaRegistryContainer("apicurio/apicurio-registry-mem", "1.2.2.Final", 8080).withNetwork(network) - .withKafka(kafkaContainer, 9092); + kafkaContainer = new StrimziKafkaContainer("quay.io/strimzi/kafka:0.34.0-kafka-3.4.0").withNetwork(network); + schemaRegistry = new SchemaRegistryContainer("quay.io/apicurio/apicurio-registry-mem", "2.4.2.Final", 8080); Startables.deepStart(Stream.of(kafkaContainer, schemaRegistry)).join(); @@ -38,7 +37,7 @@ public Map start() { Map config = new HashMap<>(); config.put("kafka.bootstrap.servers", kafkaUrl); config.put("quarkus.kafka-streams.bootstrap-servers", kafkaUrl); - config.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", registryUrl + "/api"); + config.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", registryUrl + "/apis/registry/v2"); return config; } diff --git a/pom.xml b/pom.xml index a4b8cd47..dcb444c7 100644 --- a/pom.xml +++ b/pom.xml @@ -44,10 +44,8 @@ 3.1.0 1.18.3 2.35.0 - 1.3.2.Final - 6.0.0 + 7.4.0 3.24.2 - 0.0.2 2.0.1 2.16.0 2.23.0 @@ -85,21 +83,11 @@ pom import - - io.apicurio - apicurio-registry-utils-serde - ${apicurio-registry-utils-serde.version} - io.confluent kafka-avro-serializer ${confluent.kafka-avro-serializer.version} - - io.quarkiverse.apicurio - quarkiverse-apicurio-registry-client - ${version.quarkiverse.apicurio.registry.client} - org.apache.camel.quarkus camel-quarkus-kamelet