Skip to content

Commit

Permalink
Merge pull request #2695 from ozangunalp/client_config_interceptor
Browse files Browse the repository at this point in the history
ClientCustomizer
  • Loading branch information
cescoffier authored Jul 25, 2024
2 parents cf057ef + 5235ea3 commit f6609ee
Show file tree
Hide file tree
Showing 62 changed files with 826 additions and 260 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.smallrye.reactive.messaging;

import jakarta.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.config.Config;

/**
* A customizer that can be used to modify the configuration used to create a messaging client.
*
* @param <T> the type of the configuration object
*/
public interface ClientCustomizer<T> extends Prioritized {

/**
* The default priority for config customizers.
*/
int CLIENT_CONFIG_CUSTOMIZER_DEFAULT_PRIORITY = 100;

/**
* Customize the given configuration object.
*
* @param channel the channel name
* @param channelConfig the channel configuration
* @param config the configuration object
* @return the modified configuration object, or {@code null} to skip this customizer
*/
T customize(String channel, Config channelConfig, T config);

@Override
default int getPriority() {
return CLIENT_CONFIG_CUSTOMIZER_DEFAULT_PRIORITY;
}

}
25 changes: 25 additions & 0 deletions documentation/src/main/docs/concepts/client-customizers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Client Customizers

Client customizers allow to customize the client instance created by the connector.
Only connectors which create their own client instance support customizers.

To define a client customizer, you need to provide a CDI bean that implements the `ClientCustomizer<T>` interface,
parameterized with the config type:

``` java
{{ insert('customizers/MyClientCustomizer.java') }}
```

Connectors which support client customizers will discover all beans and call the `customize` method,
with the _channel name_, the _channel configuration_ and the configuration that'll be used to create the client instance.
If the customizer returns `null` it'll be skipped.

If you have multiple customizers, customizers can override the `getPriority` method to define the order in which they are called.

Currently, the following core connectors support client customizers:

- Kafka: `ClientCustomizer<Map<String, Object>>`
- RabbitMQ: `ClientCustomizer<RabbitMQOptions>`
- AMQP 1.0: `ClientCustomizer<AmqpClientOptions>`
- MQTT: `ClientCustomizer<MqttClientSessionOptions>`
- Pulsar: `ClientCustomizer<ClientBuilder>`, `ClientCustomizer<ConsumerBuilder<?>>`, `ClientCustomizer<ProducerBuilder<?>>`
4 changes: 4 additions & 0 deletions documentation/src/main/java/customizers/ClientConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package customizers;

public class ClientConfig {
}
17 changes: 17 additions & 0 deletions documentation/src/main/java/customizers/MyClientCustomizer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package customizers;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.Config;

import io.smallrye.reactive.messaging.ClientCustomizer;

@ApplicationScoped
public class MyClientCustomizer implements ClientCustomizer<ClientConfig> {
@Override
public ClientConfig customize(String channel, Config channelConfig, ClientConfig config) {
// customize the client configuration
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@

import java.util.Optional;

import javax.net.ssl.SSLContext;

import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;

import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.spi.tls.SslContextFactory;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.core.Vertx;

public class AmqpClientHelper {

Expand All @@ -30,25 +23,22 @@ private AmqpClientHelper() {
}

static AmqpClient createClient(AmqpConnector connector, AmqpConnectorCommonConfiguration config,
Instance<AmqpClientOptions> amqpClientOptions, Instance<SSLContext> clientSslContexts) {
AmqpClient client;
Instance<AmqpClientOptions> amqpClientOptions,
Instance<ClientCustomizer<AmqpClientOptions>> configCustomizers) {
Optional<String> clientOptionsName = config.getClientOptionsName();
Optional<String> clientSslContextName = config.getClientSslContextName();
if (clientOptionsName.isPresent() && clientSslContextName.isPresent()) {
throw ProviderLogging.log.cannotSpecifyBothClientOptionsNameAndClientSslContextName();
}
Vertx vertx = connector.getVertx();
AmqpClientOptions options;
if (clientOptionsName.isPresent()) {
client = createClientFromClientOptionsBean(vertx, amqpClientOptions, clientOptionsName.get(), config);
options = createClientFromClientOptionsBean(amqpClientOptions, clientOptionsName.get(), config);
} else {
SSLContext sslContext = getClientSslContext(clientSslContexts, clientSslContextName);
client = getClient(vertx, config, sslContext);
options = getOptionsFromChannel(config);
}
AmqpClientOptions clientOptions = ConfigUtils.customize(config.config(), configCustomizers, options);
AmqpClient client = AmqpClient.create(connector.getVertx(), clientOptions);
connector.addClient(client);
return client;
}

static AmqpClient createClientFromClientOptionsBean(Vertx vertx, Instance<AmqpClientOptions> instance,
static AmqpClientOptions createClientFromClientOptionsBean(Instance<AmqpClientOptions> instance,
String optionsBeanName, AmqpConnectorCommonConfiguration config) {
Instance<AmqpClientOptions> options = instance.select(Identifier.Literal.of(optionsBeanName));
if (options.isUnsatisfied()) {
Expand All @@ -67,7 +57,7 @@ static AmqpClient createClientFromClientOptionsBean(Vertx vertx, Instance<AmqpCl
// In case of conflict, use the channel config.
AmqpClientOptions customizerOptions = options.get();
merge(customizerOptions, config);
return AmqpClient.create(vertx, customizerOptions);
return customizerOptions;
}

/**
Expand Down Expand Up @@ -183,47 +173,4 @@ static AmqpClientOptions getOptionsFromChannel(AmqpConnectorCommonConfiguration
return options;
}

static AmqpClient getClient(Vertx vertx, AmqpConnectorCommonConfiguration config, SSLContext sslContext) {
try {
AmqpClientOptions options = getOptionsFromChannel(config);
if (sslContext != null) {
options.setSslEngineOptions(new JdkSSLEngineOptions() {
@Override
public SslContextFactory sslContextFactory() {
return new SslContextFactory() {
@Override
public SslContext create() {
return new JdkSslContext(
sslContext,
true,
null,
IdentityCipherSuiteFilter.INSTANCE,
ApplicationProtocolConfig.DISABLED,
io.netty.handler.ssl.ClientAuth.NONE,
null,
false);
}
};
}
});
}
return AmqpClient.create(vertx, options);
} catch (Exception e) {
log.unableToCreateClient(e);
throw ex.illegalStateUnableToCreateClient(e);
}
}

private static SSLContext getClientSslContext(Instance<SSLContext> clientSslContexts,
Optional<String> clientSslContextName) {
if (clientSslContextName.isPresent()) {
Instance<SSLContext> context = clientSslContexts
.select(Identifier.Literal.of(clientSslContextName.get()));
if (context.isUnsatisfied()) {
throw ProviderLogging.log.couldFindSslContextWithIdentifier(clientSslContextName.get());
}
return context.get();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.net.ssl.SSLContext;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
Expand All @@ -37,6 +35,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.amqp.fault.AmqpAccept;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailStop;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
Expand Down Expand Up @@ -119,7 +118,7 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt

@Inject
@Any
private Instance<SSLContext> clientSslContexts;
private Instance<ClientCustomizer<AmqpClientOptions>> configCustomizers;

@Inject
private Instance<OpenTelemetry> openTelemetryInstance;
Expand Down Expand Up @@ -221,7 +220,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
.setCapabilities(getClientCapabilities(ic))
.setSelector(ic.getSelector().orElse(null));

AmqpClient client = AmqpClientHelper.createClient(this, ic, clientOptions, clientSslContexts);
AmqpClient client = AmqpClientHelper.createClient(this, ic, clientOptions, configCustomizers);

Context root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext());
ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx(), root);
Expand Down Expand Up @@ -265,7 +264,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
opened.put(oc.getChannel(), false);

AtomicReference<AmqpSender> sender = new AtomicReference<>();
AmqpClient client = AmqpClientHelper.createClient(this, oc, clientOptions, clientSslContexts);
AmqpClient client = AmqpClientHelper.createClient(this, oc, clientOptions, configCustomizers);
String link = oc.getLinkName().orElseGet(oc::getChannel);
ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx(), null);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.smallrye.reactive.messaging.amqp;

import static io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions.ex;
import static io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging.log;

import java.util.Optional;

import javax.net.ssl.SSLContext;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;

import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.spi.tls.SslContextFactory;

@ApplicationScoped
public class SslContextClientCustomizer implements ClientCustomizer<AmqpClientOptions> {

@Inject
@Any
private Instance<SSLContext> clientSslContexts;

@Override
public AmqpClientOptions customize(String channel, Config channelConfig, AmqpClientOptions config) {
AmqpConnectorCommonConfiguration commonConfiguration = new AmqpConnectorCommonConfiguration(channelConfig);
Optional<String> clientSslContextName = commonConfiguration.getClientSslContextName();
if (clientSslContextName.isPresent()) {
SSLContext sslContext = CDIUtils.getInstanceById(clientSslContexts, clientSslContextName.get(), () -> null);
if (sslContext != null) {
try {
config.setSslEngineOptions(new JdkSSLEngineOptions() {
@Override
public SslContextFactory sslContextFactory() {
return new SslContextFactory() {
@Override
public SslContext create() {
return new JdkSslContext(
sslContext,
true,
null,
IdentityCipherSuiteFilter.INSTANCE,
ApplicationProtocolConfig.DISABLED,
io.netty.handler.ssl.ClientAuth.NONE,
null,
false);
}
};
}
});
} catch (Exception e) {
log.unableToCreateClient(e);
throw ex.illegalStateUnableToCreateClient(e);
}
}
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
Expand Down Expand Up @@ -150,6 +151,10 @@ public SpanBuilder spanBuilder(final String spanName) {
@Any
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers;

@Inject
@Any
Instance<ClientCustomizer<Map<String, Object>>> configCustomizers;

@Inject
@Any
Instance<SerializationFailureHandler<?>> serializationFailureHandlers;
Expand Down Expand Up @@ -216,7 +221,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, -1);
kafkaCDIEvents, configCustomizers, deserializationFailureHandlers, -1);
sources.add(source);
boolean broadcast = ic.getBroadcast();
Multi<? extends Message<?>> stream;
Expand All @@ -238,7 +243,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, i);
kafkaCDIEvents, configCustomizers, deserializationFailureHandlers, i);
sources.add(source);
if (!ic.getBatch()) {
streams.add(source.getStream());
Expand Down Expand Up @@ -273,7 +278,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
}
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
configCustomizers, serializationFailureHandlers, producerInterceptors);
sinks.add(sink);
return sink.getSink();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
Expand Down Expand Up @@ -85,6 +86,10 @@ public static class Factory implements KafkaFailureHandler.Factory {
@Any
Instance<SerializationFailureHandler<?>> serializationFailureHandlers;

@Inject
@Any
Instance<ClientCustomizer<Map<String, Object>>> configCustomizers;

@Inject
@Any
Instance<ProducerInterceptor<?, ?>> producerInterceptors;
Expand Down Expand Up @@ -132,7 +137,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

UnicastProcessor<Message<?>> processor = UnicastProcessor.create();
KafkaSink kafkaSink = new KafkaSink(producerConfig, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
configCustomizers, serializationFailureHandlers, producerInterceptors);
wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators,
producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX);
return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, kafkaSink, processor);
Expand Down
Loading

0 comments on commit f6609ee

Please sign in to comment.