diff --git a/.checkstyle/checkstyle.xml b/.checkstyle/checkstyle.xml
index 3e16655..511c30a 100644
--- a/.checkstyle/checkstyle.xml
+++ b/.checkstyle/checkstyle.xml
@@ -87,9 +87,15 @@
-
+
+
+
+
-
+
+
+
+
diff --git a/src/main/resources/kafka_versions.json b/src/main/resources/kafka_versions.json
index e8bfa93..4edaadf 100644
--- a/src/main/resources/kafka_versions.json
+++ b/src/main/resources/kafka_versions.json
@@ -6,6 +6,7 @@
"3.2.3": "quay.io/strimzi-test-container/test-container:latest-kafka-3.2.3",
"3.3.2": "quay.io/strimzi-test-container/test-container:latest-kafka-3.3.2",
"3.4.1": "quay.io/strimzi-test-container/test-container:latest-kafka-3.4.1",
- "3.5.0": "quay.io/strimzi-test-container/test-container:latest-kafka-3.5.0"
+ "3.5.1": "quay.io/strimzi-test-container/test-container:latest-kafka-3.5.1",
+ "3.6.0": "quay.io/strimzi-test-container/test-container:latest-kafka-3.6.0"
}
}
\ No newline at end of file
diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java
index 9eb3e1a..03e95c2 100644
--- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java
+++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java
@@ -4,6 +4,19 @@
*/
package io.strimzi.test.container;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
@@ -13,6 +26,7 @@
import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
@@ -21,8 +35,16 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -42,15 +64,16 @@ public class StrimziKafkaContainerIT extends AbstractIT {
@MethodSource("retrieveKafkaVersionsFile")
void testStartContainerWithEmptyConfiguration(final String imageName) {
assumeDocker();
- systemUnderTest = new StrimziKafkaContainer(imageName)
- .withBrokerId(1)
- .waitForRunning();
- systemUnderTest.start();
- assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
- + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer(imageName)
+ .withBrokerId(1)
+ .waitForRunning()) {
- systemUnderTest.stop();
+ systemUnderTest.start();
+
+ assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
+ + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ }
}
@ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}")
@@ -311,7 +334,7 @@ void testStartBrokerWithProxyContainer(final String imageName) {
systemUnderTest.stop();
}
- @ParameterizedTest(name = "testStartBrokerWithProxyContainer-{0}")
+ @ParameterizedTest(name = "testGetProxyWithNoContainer-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testGetProxyWithNoContainer(final String imageName) {
systemUnderTest = new StrimziKafkaContainer(imageName)
@@ -319,4 +342,68 @@ void testGetProxyWithNoContainer(final String imageName) {
systemUnderTest.start();
assertThrows(IllegalStateException.class, () -> systemUnderTest.getProxy());
}
+
+ @Test
+ void testKafkaContainerFunctionality() {
+ // using try-with-resources for AdminClient, KafkaProducer and KafkaConsumer (implicit closing connection)
+ try (StrimziKafkaContainer systemUnderTest = new StrimziKafkaContainer()
+ .waitForRunning()) {
+
+ systemUnderTest.start();
+
+ try (final AdminClient adminClient = AdminClient.create(ImmutableMap.of(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers()));
+ KafkaProducer producer = new KafkaProducer<>(
+ ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers(),
+ ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
+ ),
+ new StringSerializer(),
+ new StringSerializer()
+ );
+ KafkaConsumer consumer = new KafkaConsumer<>(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT)
+ ),
+ new StringDeserializer(),
+ new StringDeserializer())) {
+
+ final String topicName = "example-topic";
+ final String recordKey = "strimzi";
+ final String recordValue = "the-best-project-in-the-world";
+
+ final Collection topics = Collections.singletonList(new NewTopic(topicName, 1, (short) 1));
+ adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
+
+ consumer.subscribe(Collections.singletonList(topicName));
+
+ producer.send(new ProducerRecord<>(topicName, recordKey, recordValue)).get();
+
+ Utils.waitFor("Consumer records are present", Duration.ofSeconds(10).toMillis(), Duration.ofMinutes(2).toMillis(),
+ () -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+
+ if (records.isEmpty()) {
+ return false;
+ }
+
+ // verify count
+ assertThat(records.count(), is(1));
+
+ ConsumerRecord consumerRecord = records.records(topicName).iterator().next();
+
+ // verify content of the record
+ assertThat(consumerRecord.topic(), is(topicName));
+ assertThat(consumerRecord.key(), is(recordKey));
+ assertThat(consumerRecord.value(), is(recordValue));
+
+ return true;
+ });
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}