Skip to content

Commit

Permalink
Add support for apachepulsar/pulsar-all image (#9448)
Browse files Browse the repository at this point in the history
`apachepulsar/pulsar-all` brings connectors in the image.
  • Loading branch information
eddumelendez authored Oct 22, 2024
1 parent 7c024ed commit 4f9594d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* Testcontainers implementation for Apache Pulsar.
* <p>
* Supported image: {@code apachepulsar/pulsar}
* Supported images: {@code apachepulsar/pulsar}, {@code apachepulsar/pulsar-all}
* <p>
* Exposed ports:
* <ul>
Expand Down Expand Up @@ -64,7 +64,7 @@ public PulsarContainer(String pulsarVersion) {

public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, DockerImageName.parse("apachepulsar/pulsar-all"));
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
setWaitStrategy(waitAllStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.testcontainers.containers;

import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class AbstractPulsar {

public static final String TEST_TOPIC = "test_topic";

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
Consumer consumer = client.newConsumer().topic(TEST_TOPIC).subscriptionName("test-subs").subscribe();
Producer<byte[]> producer = client.newProducer().topic(TEST_TOPIC).create()
) {
producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);

assertThat(new String(message.getData())).isEqualTo("test containers");
}
}

protected void testTransactionFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).enableTransaction(true).build();
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("transaction-topic")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test-transaction-sub")
.subscribe();
Producer<String> producer = client
.newProducer(Schema.STRING)
.sendTimeout(0, TimeUnit.SECONDS)
.topic("transaction-topic")
.create()
) {
final Transaction transaction = client.newTransaction().build().get();
producer.newMessage(transaction).value("first").send();
transaction.commit();
Message<String> message = consumer.receive();
assertThat(message.getValue()).isEqualTo("first");
}
}

protected void assertTransactionsTopicCreated(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
final List<String> topics = pulsarAdmin
.topics()
.getPartitionedTopicList("pulsar/system", ListTopicsOptions.builder().includeSystemTopic(true).build());
assertThat(topics).contains("persistent://pulsar/system/transaction_coordinator_assign");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.testcontainers.containers;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.utility.DockerImageName;

@RunWith(Parameterized.class)
public class CompatibleApachePulsarImageTest extends AbstractPulsar {

@Parameterized.Parameters(name = "{0}")
public static String[] params() {
return new String[] { "apachepulsar/pulsar:3.0.0", "apachepulsar/pulsar-all:3.0.0" };
}

@Parameterized.Parameter
public String imageName;

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse(this.imageName));) {
pulsar.start();
final String pulsarBrokerUrl = pulsar.getPulsarBrokerUrl();

testPulsarFunctionality(pulsarBrokerUrl);
}
}

@Test
public void testTransactions() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse(this.imageName)).withTransactions();) {
pulsar.start();

try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertTransactionsTopicCreated(pulsarAdmin);
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
package org.testcontainers.containers;

import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.junit.Test;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";
public class PulsarContainerTest extends AbstractPulsar {

private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.0.0");

Expand Down Expand Up @@ -84,7 +71,8 @@ public void shouldNotEnableFunctionsWorkerByDefault() throws Exception {
public void shouldWaitForFunctionsWorkerStarted() throws Exception {
try (
// constructorWithFunctionsWorker {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withFunctionsWorker();
PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.0.0"))
.withFunctionsWorker();
// }
) {
pulsar.start();
Expand All @@ -111,13 +99,6 @@ public void testTransactions() throws Exception {
}
}

private void assertTransactionsTopicCreated(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
final List<String> topics = pulsarAdmin
.topics()
.getPartitionedTopicList("pulsar/system", ListTopicsOptions.builder().includeSystemTopic(true).build());
assertThat(topics).contains("persistent://pulsar/system/transaction_coordinator_assign");
}

@Test
public void testTransactionsAndFunctionsWorker() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions().withFunctionsWorker()) {
Expand Down Expand Up @@ -149,41 +130,4 @@ public void testStartupTimeoutIsHonored() {
.hasRootCauseMessage("Precondition failed: timeout must be greater than zero");
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
Consumer consumer = client.newConsumer().topic(TEST_TOPIC).subscriptionName("test-subs").subscribe();
Producer<byte[]> producer = client.newProducer().topic(TEST_TOPIC).create()
) {
producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);

assertThat(new String(message.getData())).isEqualTo("test containers");
}
}

protected void testTransactionFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).enableTransaction(true).build();
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("transaction-topic")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test-transaction-sub")
.subscribe();
Producer<String> producer = client
.newProducer(Schema.STRING)
.sendTimeout(0, TimeUnit.SECONDS)
.topic("transaction-topic")
.create()
) {
final Transaction transaction = client.newTransaction().build().get();
producer.newMessage(transaction).value("first").send();
transaction.commit();
Message<String> message = consumer.receive();
assertThat(message.getValue()).isEqualTo("first");
}
}
}

0 comments on commit 4f9594d

Please sign in to comment.