diff --git a/.checkstyle/checkstyle.xml b/.checkstyle/checkstyle.xml index 3e16655..511c30a 100644 --- a/.checkstyle/checkstyle.xml +++ b/.checkstyle/checkstyle.xml @@ -87,9 +87,15 @@ - + + + + - + + + + diff --git a/src/main/resources/kafka_versions.json b/src/main/resources/kafka_versions.json index e8bfa93..4edaadf 100644 --- a/src/main/resources/kafka_versions.json +++ b/src/main/resources/kafka_versions.json @@ -6,6 +6,7 @@ "3.2.3": "quay.io/strimzi-test-container/test-container:latest-kafka-3.2.3", "3.3.2": "quay.io/strimzi-test-container/test-container:latest-kafka-3.3.2", "3.4.1": "quay.io/strimzi-test-container/test-container:latest-kafka-3.4.1", - "3.5.0": "quay.io/strimzi-test-container/test-container:latest-kafka-3.5.0" + "3.5.1": "quay.io/strimzi-test-container/test-container:latest-kafka-3.5.1", + "3.6.0": "quay.io/strimzi-test-container/test-container:latest-kafka-3.6.0" } } \ No newline at end of file diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java index 9eb3e1a..03e95c2 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java @@ -4,6 +4,19 @@ */ package io.strimzi.test.container; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; @@ -13,6 +26,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.Network; import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; @@ -21,8 +35,16 @@ import java.net.Socket; import java.net.SocketAddress; import java.net.SocketTimeoutException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -42,15 +64,16 @@ public class StrimziKafkaContainerIT extends AbstractIT { @MethodSource("retrieveKafkaVersionsFile") void testStartContainerWithEmptyConfiguration(final String imageName) { assumeDocker(); - systemUnderTest = new StrimziKafkaContainer(imageName) - .withBrokerId(1) - .waitForRunning(); - systemUnderTest.start(); - assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://" - + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); + try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer(imageName) + .withBrokerId(1) + .waitForRunning()) { - systemUnderTest.stop(); + systemUnderTest.start(); + + assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://" + + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); + } } @ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}") @@ -311,7 +334,7 @@ void testStartBrokerWithProxyContainer(final String imageName) { systemUnderTest.stop(); } - @ParameterizedTest(name = "testStartBrokerWithProxyContainer-{0}") + @ParameterizedTest(name = "testGetProxyWithNoContainer-{0}") @MethodSource("retrieveKafkaVersionsFile") void testGetProxyWithNoContainer(final String imageName) { systemUnderTest = new StrimziKafkaContainer(imageName) @@ -319,4 +342,68 @@ void testGetProxyWithNoContainer(final String imageName) { systemUnderTest.start(); assertThrows(IllegalStateException.class, () -> systemUnderTest.getProxy()); } + + @Test + void testKafkaContainerFunctionality() { + // using try-with-resources for AdminClient, KafkaProducer and KafkaConsumer (implicit closing connection) + try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer() + .waitForRunning()) { + + systemUnderTest.start(); + + try (final AdminClient adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers())); + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers(), + ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT) + ), + new StringDeserializer(), + new StringDeserializer())) { + + final String topicName = "example-topic"; + final String recordKey = "strimzi"; + final String recordValue = "the-best-project-in-the-world"; + + final Collection topics = Collections.singletonList(new NewTopic(topicName, 1, (short) 1)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(topicName)); + + producer.send(new ProducerRecord<>(topicName, recordKey, recordValue)).get(); + + Utils.waitFor("Consumer records are present", Duration.ofSeconds(10).toMillis(), Duration.ofMinutes(2).toMillis(), + () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + if (records.isEmpty()) { + return false; + } + + // verify count + assertThat(records.count(), is(1)); + + ConsumerRecord consumerRecord = records.records(topicName).iterator().next(); + + // verify content of the record + assertThat(consumerRecord.topic(), is(topicName)); + assertThat(consumerRecord.key(), is(recordKey)); + assertThat(consumerRecord.value(), is(recordValue)); + + return true; + }); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + } + } }