Skip to content

Commit

Permalink
Merge pull request #2703 from ozangunalp/pulsar_improvements
Browse files Browse the repository at this point in the history
Pulsar improvements
  • Loading branch information
ozangunalp authored Jul 25, 2024
2 parents f6609ee + 6ed59de commit 1a2df0d
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package io.smallrye.reactive.messaging.pulsar;

import static io.smallrye.reactive.messaging.providers.helpers.CDIUtils.getInstanceById;
import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand All @@ -30,6 +40,7 @@

import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.json.JsonObject;

/**
Expand Down Expand Up @@ -110,6 +121,33 @@ public ClientBuilderImpl customize(ClientBuilderImpl builder, PulsarConnectorCom
return (ClientBuilderImpl) ConfigUtils.customize(cc.config(), clientConfigCustomizers, builder);
}

public ClientBuilderImpl configure(PulsarConnectorCommonConfiguration cc, ClientConfigurationData conf)
throws PulsarClientException {
setAuth(conf);
return customize(new ClientBuilderImpl(conf), cc);
}

/**
* Sets the authentication object in the given configuration object using
* `authPluginClassName` and `authParams`/`authParamMap` attributes
* This use to be done by the PulsarClientImpl
*
* @param conf client configuration
* @throws PulsarClientException
*/
private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (Validation.isBlank(conf.getAuthPluginClassName())
|| (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
return;
}

if (!Validation.isBlank(conf.getAuthParams())) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
} else if (conf.getAuthParamMap() != null) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap()));
}
}

/**
* Extract the configuration map for building Pulsar consumer
*
Expand All @@ -129,6 +167,62 @@ public <T> ConsumerBuilder<T> customize(ConsumerBuilder<T> builder, PulsarConnec
return (ConsumerBuilder<T>) ConfigUtils.customize(ic.config(), consumerConfigCustomizers, builder);
}

public <T> ConsumerBuilder<T> configure(ConsumerBuilder<T> builder,
PulsarConnectorIncomingConfiguration ic,
ConsumerConfigurationData<?> conf) {
builder.loadConf(configToMap(conf));
ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i)));
ic.getNegativeAckRedeliveryBackoff()
.ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s, ic.getChannel())));
ic.getAckTimeoutRedeliveryBackoff()
.ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s, ic.getChannel())));
if (conf.getConsumerEventListener() != null) {
builder.consumerEventListener(conf.getConsumerEventListener());
}
if (conf.getPayloadProcessor() != null) {
builder.messagePayloadProcessor(conf.getPayloadProcessor());
}
if (conf.getKeySharedPolicy() != null) {
builder.keySharedPolicy(conf.getKeySharedPolicy());
} else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) {
builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
if (conf.getMessageCrypto() != null) {
builder.messageCrypto(conf.getMessageCrypto());
}
if (ic.getBatchReceive()) {
builder.batchReceivePolicy(
Objects.requireNonNullElse(conf.getBatchReceivePolicy(), BatchReceivePolicy.DEFAULT_POLICY));
}
return customize(builder, ic);
}

private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) {
return DeadLetterPolicy.builder()
.maxRedeliverCount(redeliverCount)
.deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null))
.retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null))
.initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null))
.build();
}

private RedeliveryBackoff parseBackoff(String backoffString, String channel) {
String[] strings = backoffString.split(",");
try {
return MultiplierRedeliveryBackoff.builder()
.minDelayMs(Long.parseLong(strings[0]))
.maxDelayMs(Long.parseLong(strings[1]))
.multiplier(Double.parseDouble(strings[2]))
.build();
} catch (Exception e) {
log.unableToParseRedeliveryBackoff(backoffString, channel);
return null;
}
}

/**
* Extract the configuration map for building Pulsar producer
*
Expand All @@ -148,6 +242,25 @@ public <T> ProducerBuilder<T> customize(ProducerBuilder<T> builder, PulsarConnec
return (ProducerBuilder<T>) ConfigUtils.customize(oc.config(), producerConfigCustomizers, builder);
}

public <T> ProducerBuilder<T> configure(ProducerBuilder<T> builder,
PulsarConnectorOutgoingConfiguration oc,
ProducerConfigurationData conf) {
builder.loadConf(configToMap(conf));
if (conf.getCustomMessageRouter() != null) {
builder.messageRouter(conf.getCustomMessageRouter());
}
if (conf.getBatcherBuilder() != null) {
builder.batcherBuilder(conf.getBatcherBuilder());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
for (String encryptionKey : conf.getEncryptionKeys()) {
builder.addEncryptionKey(encryptionKey);
}
return customize(builder, oc);
}

private Map<String, Object> mergeMap(Map<String, Object> defaultConfig, Map<String, Object> channelConfig) {
Map<String, Object> map = new HashMap<>(defaultConfig);
map.putAll(channelConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.Config;
Expand All @@ -41,7 +39,6 @@
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.mutiny.core.Vertx;

@ApplicationScoped
Expand Down Expand Up @@ -166,36 +163,14 @@ public void terminate(

private PulsarClientImpl createPulsarClient(PulsarConnectorCommonConfiguration cc, ClientConfigurationData configuration) {
try {
setAuth(configuration);
log.createdClientWithConfig(configuration);
ClientBuilderImpl customized = configResolver.customize(new ClientBuilderImpl(configuration), cc);
return new PulsarClientImpl(customized.getClientConfigurationData(), vertx.nettyEventLoopGroup());
ClientConfigurationData data = configResolver.configure(cc, configuration).getClientConfigurationData();
log.createdClientWithConfig(data);
return new PulsarClientImpl(data, vertx.nettyEventLoopGroup());
} catch (PulsarClientException e) {
throw ex.illegalStateUnableToBuildClient(e);
}
}

/**
* Sets the authentication object in the given configuration object using
* `authPluginClassName` and `authParams`/`authParamMap` attributes
* This use to be done by the PulsarClientImpl
*
* @param conf client configuration
* @throws PulsarClientException
*/
private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (Validation.isBlank(conf.getAuthPluginClassName())
|| (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
return;
}

if (!Validation.isBlank(conf.getAuthParams())) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
} else if (conf.getAuthParamMap() != null) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap()));
}
}

public PulsarClient getClient(String channel) {
return clientsByChannel.get(channel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import jakarta.enterprise.inject.Instance;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
Expand Down Expand Up @@ -62,7 +61,6 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
this.channel = ic.getChannel();
this.healthEnabled = ic.getHealthEnabled();
this.tracingEnabled = ic.getTracingEnabled();
ConsumerBuilder<T> builder = client.newConsumer(schema);
ConsumerConfigurationData<?> conf = configResolver.getConsumerConf(ic);
if (conf.getSubscriptionName() == null) {
String s = UUID.randomUUID().toString();
Expand All @@ -75,32 +73,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
if (conf.getConsumerName() == null) {
conf.setConsumerName(channel);
}
builder.loadConf(configResolver.configToMap(conf));
ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i)));
ic.getNegativeAckRedeliveryBackoff().ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s)));
ic.getAckTimeoutRedeliveryBackoff().ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s)));
if (conf.getConsumerEventListener() != null) {
builder.consumerEventListener(conf.getConsumerEventListener());
}
if (conf.getPayloadProcessor() != null) {
builder.messagePayloadProcessor(conf.getPayloadProcessor());
}
if (conf.getKeySharedPolicy() != null) {
builder.keySharedPolicy(conf.getKeySharedPolicy());
} else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) {
builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
if (conf.getMessageCrypto() != null) {
builder.messageCrypto(conf.getMessageCrypto());
}
if (ic.getBatchReceive() && conf.getBatchReceivePolicy() == null) {
builder.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY);
}

this.consumer = configResolver.customize(builder, ic).subscribe();
ConsumerBuilder<T> builder = configResolver.configure(client.newConsumer(schema), ic, conf);
this.consumer = builder.subscribe();
log.createdConsumerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf);
this.ackHandler = ackHandlerFactory.create(consumer, ic);
this.failureHandler = failureHandlerFactory.create(consumer, ic, this::reportFailure);
Expand Down Expand Up @@ -196,29 +170,6 @@ private boolean isEndOfStream(PulsarClient client, Throwable throwable) {
return false;
}

private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) {
return DeadLetterPolicy.builder()
.maxRedeliverCount(redeliverCount)
.deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null))
.retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null))
.initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null))
.build();
}

private RedeliveryBackoff parseBackoff(String backoffString) {
String[] strings = backoffString.split(",");
try {
return MultiplierRedeliveryBackoff.builder()
.minDelayMs(Long.parseLong(strings[0]))
.maxDelayMs(Long.parseLong(strings[1]))
.multiplier(Double.parseDouble(strings[2]))
.build();
} catch (Exception e) {
log.unableToParseRedeliveryBackoff(backoffString, this.channel);
return null;
}
}

static boolean hasTopicConfig(ConsumerConfigurationData<?> conf) {
return conf.getTopicsPattern() != null
|| (conf.getTopicNames() != null && !conf.getTopicNames().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,8 @@ public PulsarOutgoingChannel(PulsarClient client, Schema<T> schema, PulsarConnec
if (conf.getMaxPendingMessages() > 0 && conf.getMaxPendingMessagesAcrossPartitions() == 0) {
conf.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessages());
}
Map<String, Object> producerConf = configResolver.configToMap(conf);
ProducerBuilder<T> builder = client.newProducer(schema)
.loadConf(producerConf);
if (conf.getBatcherBuilder() != null) {
builder.batcherBuilder(conf.getBatcherBuilder());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
for (String encryptionKey : conf.getEncryptionKeys()) {
builder.addEncryptionKey(encryptionKey);
}
this.producer = configResolver.customize(builder, oc).create();
ProducerBuilder<T> builder = configResolver.configure(client.newProducer(schema), oc, conf);
this.producer = builder.create();
log.createdProducerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf);
long requests = getRequests(oc, conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public PulsarTransactionsImpl(EmitterConfiguration config, long defaultBufferSiz
this.pulsarClient = pulsarClientService.getClient(config.name());
}

private static boolean isConflict(Throwable throwable) {
return throwable instanceof PulsarClientException.TransactionConflictException
|| throwable.getCause() instanceof PulsarClientException.TransactionConflictException;
}

@Override
public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
return new PulsarTransactionEmitter<R>().execute(work);
Expand All @@ -57,17 +62,14 @@ public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmit
@Override
public <R> Uni<R> withTransaction(Duration txnTimeout, Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
return new PulsarTransactionEmitter<R>(txnTimeout,
txn -> Uni.createFrom().completionStage(message.ack()),
txn -> Uni.createFrom().completionStage(message.ack())
.onFailure(PulsarTransactionsImpl::isConflict).recoverWithUni(throwable -> VOID_UNI),
PulsarTransactionsImpl::defaultAfterCommit,
(txn, throwable) -> {
if (!(throwable.getCause() instanceof PulsarClientException.TransactionConflictException)) {
// If TransactionConflictException is not thrown,
// you need to redeliver or negativeAcknowledge this message,
// or else this message will not be received again.
return Uni.createFrom().completionStage(() -> message.nack(throwable));
} else {
return VOID_UNI;
}
// If TransactionConflictException is not thrown,
// you need to redeliver or negativeAcknowledge this message,
// or else this message will not be received again.
return isConflict(throwable) ? VOID_UNI : Uni.createFrom().completionStage(message.nack(throwable));
},
PulsarTransactionsImpl::defaultAfterAbort)
.execute(e -> {
Expand Down Expand Up @@ -200,7 +202,8 @@ private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> wo
.onCancellation().call(() -> abort(new RuntimeException("Transaction cancelled")))
// when there was no exception,
// commit or rollback the transaction
.call(() -> abort ? abort(new RuntimeException("Transaction aborted")) : commit())
.call(() -> abort ? abort(new RuntimeException("Transaction aborted"))
: commit().onFailure().call(throwable -> abort(throwable)))
.onFailure().recoverWithUni(throwable -> afterAbort.apply(throwable))
.onItem().transformToUni(result -> afterCommit.apply(result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.2.2");
public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.3.0");

public static final String STARTER_SCRIPT = "/run_pulsar.sh";

Expand Down Expand Up @@ -45,6 +45,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n";
command += "export PULSAR_PREFIX_transactionCoordinatorEnabled=true\n";
command += "export PULSAR_PREFIX_systemTopicEnabled=true\n";
command += "export PULSAR_PREFIX_brokerDeduplicationEnabled=true\n";
command += "export PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true\n";
command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss";
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
Expand Down
Loading

0 comments on commit 1a2df0d

Please sign in to comment.