Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow bootstrap.servers to be provided for Kafka ingestion. #9693

Conversation

JulianJaffePinterest
Copy link
Contributor

@JulianJaffePinterest JulianJaffePinterest commented Apr 13, 2020

Addresses #8685, allowing for bootstrap.servers to be provided by a PasswordProvider instead of hard-coded.

Description

Modified KafkaRecordSupplier.addConsumerPropertiesFromConfig to check for a password provider when deserializing bootstrap.servers as it currently does when deserializing trust and key store passwords. This does not implement the further suggestion on #8685 to allow any field to be provided by a PasswordProvider because #6666 is still an open issue.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • KafkaRecordSupplier

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in getting back to you. I think we need to add a bit more to this patch, as mentioned in the review comments.

@@ -171,7 +171,7 @@ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topic\": \"my-topic\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":{\"type\": \"default\", \"password\": \"localhost:9092\"},\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is going to throw a wrench into backwards compatibility (we'll be writing ioConfigs that prior versions can't read). It might negatively affect rolling upgrades.

I think the easiest way to solve this is to adjust DefaultPasswordProvider so it serializes as a simple string. (It already is able to deserialize itself from a simple string, so no problem on that end.) The way to do that is add a @JsonValue annotation to the getPassword() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's a good point (in order to get the DefaultPasswordProvider to serialize back to a simple string I think we also need to add @JsonTypeInfo(use = JsonTypeInfo.Id.NONE) so it won't serialize the type info). The tricky piece here is not breaking the PasswordProviderRedactionMixIn, as it seems that @JsonValue takes precedence over @JsonIgnore, even when I set @JsonValue(false) in the redaction mixin. I'll whip up a custom serializer to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point about the redaction mixin. Looking forward to what you come up with.

@@ -194,7 +194,7 @@ For Concise bitmaps:
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`, which may be either provided as a [Password Provider](../../operations/password-provider.md) or as a String. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of an abuse of the "password provider" name, but I can see why it's useful. Perhaps, in the future, we'd like to change the name to something more generic. I wouldn't do that in this patch though. (Each patch should just do one thing, ideally.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, part of #9351 is moving to CredentialProvider from PasswordProvider, which seems like a better term for this use case as well. I suppose eventually it might make sense to just go with Provider or the like if we want to allow any field to be provided at runtime.

@stale
Copy link

stale bot commented Jul 3, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jul 3, 2020
@himanshug
Copy link
Contributor

I have an alternate implementation proposal of achieving this.

Re #9351 : We can still introduce the new interface and use that in this use case. Migrating all other places from PasswordProvider to new interface could be taken up in #9351 . No reason for continuing to use PasswordProvider when there is general agreement on the newer interface.
However, considering this use case, I would name new interface DynamicConfigProvider instead of CredentialProvider .

Then KafkaRecordSupplier code change would end up looking something like ...

  public static void addConsumerPropertiesFromConfig(
      Properties properties,
      ObjectMapper configMapper,
      Map<String, Object> consumerProperties
  )
  {
    Object dynamicConfigProviderJson = consumerProperties.get("DRUID_DYNAMIC_CONFIG_PROVIDER_KEY");
    if (dynamicConfigProviderJson != null) {
      DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
      Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig();

      for (Map.Entry<String, Object> entry : consumerProperties.entrySet()) {
        String propertyKey = entry.getKey();

        if ("DRUID_DYNAMIC_CONFIG_PROVIDER_KEY".equals(propertyKey)) continue; //skip the dynamicConfig entry

        String properyValue = String.valueOf(entry.getValue());
        String valueFromDynamicConfig = dynamicConfig.get(properyValue);

        if (valueFromDynamicConfig == null) {
          properties.setProperty(propertyKey, properyValue);
        } else {
          properties.setProperty(propertyKey, valueFromDynamicConfig);
        }
      }
    } else {
      // this is required for backward compabitibility.
      for (Map.Entry<String, Object> entry : consumerProperties.entrySet()) {
        String propertyKey = entry.getKey();
        if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
            || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
            || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
          PasswordProvider configPasswordProvider = configMapper.convertValue(
              entry.getValue(),
              PasswordProvider.class
          );
          properties.setProperty(propertyKey, configPasswordProvider.getPassword());
        } else {
          properties.setProperty(propertyKey, String.valueOf(entry.getValue()));
        }
      }
    }
  }

which maintains backward compatibility as well which is a concern raised in another comment.

@gianm @JulianJaffePinterest does that sound reasonable ?

@stale
Copy link

stale bot commented Jul 27, 2020

This issue is no longer marked as stale.

@stale stale bot removed the stale label Jul 27, 2020
@himanshug
Copy link
Contributor

@JulianJaffePinterest are you still planning to work on this? I need bootstrap server dynamic configurability as well. So, I can probably finish it off if you want.

@himanshug
Copy link
Contributor

closing this in favor of #10309 which makes all kafka consumer props including bootstrap.servers to be provided in a dynamic way via extensions.

@himanshug himanshug closed this Aug 22, 2020
debasatwa29 pushed a commit to debasatwa29/druid that referenced this pull request Jun 2, 2022
Summary: This incorporates the changes in apache#9693 into our fork of Druid. This will allow the `bootstrap.servers` property in KafkaSupervisorSpecs to be provided at run time instead of hardcoded at the time of submission. Coupled with D540761, this will allow bootstrap servers to be read from a serverset at task launch time.

Reviewers: #druideng, fjaros

Reviewed By: #druideng, fjaros

Subscribers: jenkins, #security

Differential Revision: https://phabricator.pinadmin.com/D540968
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants