Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka: Support Arbitrary Producer/Consumer Props #9775

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public class KafkaProperties {
private String clientId;

/**
* Additional properties used to configure the client.
* Additional properties, common to producers and consumers, used to configure the
* client.
*/
private Map<String, String> properties = new HashMap<>();

Expand Down Expand Up @@ -268,6 +269,11 @@ public static class Consumer {
*/
private Integer maxPollRecords;

/**
* Additional properties used to configure the client.
*/
private Map<String, String> properties = new HashMap<>();

public Ssl getSsl() {
return this.ssl;
}
Expand Down Expand Up @@ -368,6 +374,14 @@ public void setMaxPollRecords(Integer maxPollRecords) {
this.maxPollRecords = maxPollRecords;
}

public Map<String, String> getProperties() {
return this.properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.autoCommitInterval != null) {
Expand Down Expand Up @@ -435,6 +449,7 @@ public Map<String, Object> buildProperties() {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.maxPollRecords);
}
properties.putAll(this.properties);
return properties;
}

Expand Down Expand Up @@ -492,6 +507,11 @@ public static class Producer {
*/
private Integer retries;

/**
* Additional properties used to configure the client.
*/
private Map<String, String> properties = new HashMap<>();

public Ssl getSsl() {
return this.ssl;
}
Expand Down Expand Up @@ -568,6 +588,14 @@ public void setRetries(Integer retries) {
this.retries = retries;
}

public Map<String, String> getProperties() {
return this.properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.acks != null) {
Expand Down Expand Up @@ -621,6 +649,7 @@ public Map<String, Object> buildProperties() {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer);
}
properties.putAll(this.properties);
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,15 @@ public void consumerProperties() {
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.properties.fiz.buz=fix.fox",
"spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
.getBean(DefaultKafkaConsumerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
consumerFactory).getPropertyValue("configs");
Map<String, Object> configs = consumerFactory.getConfigurationProperties();
// common
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("foo:1234"));
Expand Down Expand Up @@ -120,17 +119,21 @@ public void consumerProperties() {
assertThat(configs.get("foo")).isEqualTo("bar");
assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
}

@Test
public void producerProperties() {
load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all",
load("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.properties.fiz.buz=fix.fox",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
Expand All @@ -139,9 +142,7 @@ public void producerProperties() {
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
.getBean(DefaultKafkaProducerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
producerFactory).getPropertyValue("configs");
Map<String, Object> configs = producerFactory.getConfigurationProperties();
// common
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// producer
Expand All @@ -166,6 +167,8 @@ public void producerProperties() {
.isEqualTo(IntegerSerializer.class);
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.properties.*= # Additional properties used to configure the client.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.keystore-location= # Location of the key store file.
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
Expand All @@ -991,14 +992,15 @@ content into your application; rather pick only the properties that you need.
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.properties.*= # Additional properties used to configure the client.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.keystore-location= # Location of the key store file.
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties used to configure the client.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.
Expand Down
15 changes: 4 additions & 11 deletions spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5099,20 +5099,13 @@ are not directly supported, use the following:
[source,properties,indent=0]
----
spring.kafka.properties.foo.bar=baz
spring.kafka.consumer.properties.fiz.buz=qux
spring,kafka.producer.properties.baz.qux=fiz
----

This sets the common `foo.bar` Kafka property to `baz`.

These properties will be shared by both the consumer and producer factory beans.
If you wish to customize these components with different properties, such as to use a
different metrics reader for each, you can override the bean definitions, as follows:

[source,java,indent=0]
----
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
----

This sets the common `foo.bar` Kafka property to `baz` (applies to both producers and consumers), the consumer `fiz.buz` property to `qux` and the `baz.qux` producer property to `fiz`.

IMPORTANT: Properties set in this way will override properties that are in the subset that boot explicitly supports.

[[boot-features-resttemplate]]
== Calling REST services with '`RestTemplate`'
Expand Down

This file was deleted.