Skip to content

Commit

Permalink
Allow bootstrap.servers to be provided for Kafka ingestion.
Browse files Browse the repository at this point in the history
Summary: This incorporates the changes in apache#9693 into our fork of Druid. This will allow the `bootstrap.servers` property in KafkaSupervisorSpecs to be provided at run time instead of hardcoded at the time of submission. Coupled with D540761, this will allow bootstrap servers to be read from a serverset at task launch time.

Reviewers: #druideng, fjaros

Reviewed By: #druideng, fjaros

Subscribers: jenkins, #security

Differential Revision: https://phabricator.pinadmin.com/D540968
  • Loading branch information
JulianJaffePinterest committed Apr 13, 2020
1 parent b037c14 commit b5e88a8
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ For Roaring bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`, which may be provided either as a [Password Provider](../../operations/password-provider.md) or as a String. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public static void addConsumerPropertiesFromConfig(
String propertyKey = entry.getKey();
if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY)) {
PasswordProvider configPasswordProvider = configMapper.convertValue(
entry.getValue(),
PasswordProvider.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":{\"type\": \"default\", \"password\": \"localhost:9092\"},\n"
+ " \"ssl.truststore.password\":{\"type\": \"default\", \"password\": \"mytruststorepassword\"},\n"
+ " \"ssl.keystore.password\":{\"type\": \"default\", \"password\": \"mykeystorepassword\"},\n"
+ " \"ssl.key.password\":\"mykeypassword\"}\n"
Expand Down

0 comments on commit b5e88a8

Please sign in to comment.