Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow bootstrap.servers to be provided for Kafka ingestion. #9693

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ For Concise bitmaps:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`, which may be either provided as a [Password Provider](../../operations/password-provider.md) or as a String. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of an abuse of the "password provider" name, but I can see why it's useful. Perhaps, in the future, we'd like to change the name to something more generic. I wouldn't do that in this patch though. (Each patch should just do one thing, ideally.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, part of #9351 is moving to CredentialProvider from PasswordProvider, which seems like a better term for this use case as well. I suppose eventually it might make sense to just go with Provider or the like if we want to allow any field to be provided at runtime.

|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ public static void addConsumerPropertiesFromConfig(
String propertyKey = entry.getKey();
if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
|| propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)
|| propertyKey.equals(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY)) {
PasswordProvider configPasswordProvider = configMapper.convertValue(
entry.getValue(),
PasswordProvider.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":{\"type\": \"default\", \"password\": \"localhost:9092\"},\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is going to throw a wrench into backwards compatibility (we'll be writing ioConfigs that prior versions can't read). It might negatively affect rolling upgrades.

I think the easiest way to solve this is to adjust DefaultPasswordProvider so it serializes as a simple string. (It already is able to deserialize itself from a simple string, so no problem on that end.) The way to do that is add a @JsonValue annotation to the getPassword() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's a good point (in order to get the DefaultPasswordProvider to serialize back to a simple string I think we also need to add @JsonTypeInfo(use = JsonTypeInfo.Id.NONE) so it won't serialize the type info). The tricky piece here is not breaking the PasswordProviderRedactionMixIn, as it seems that @JsonValue takes precedence over @JsonIgnore, even when I set @JsonValue(false) in the redaction mixin. I'll whip up a custom serializer to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point about the redaction mixin. Looking forward to what you come up with.

+ " \"ssl.truststore.password\":{\"type\": \"default\", \"password\": \"mytruststorepassword\"},\n"
+ " \"ssl.keystore.password\":{\"type\": \"default\", \"password\": \"mykeystorepassword\"},\n"
+ " \"ssl.key.password\":\"mykeypassword\"}\n"
Expand Down