From 4f99c09685bfb2c17a91e1d070c0ee1bebdb849d Mon Sep 17 00:00:00 2001 From: Himanshu Date: Wed, 2 Dec 2020 16:38:27 -0800 Subject: [PATCH] introduce DynamicConfigProvider interface and make kafka consumer props extensible (#10309) * introduce DynamicConfigProvider interface and make kafka consumer props extensible * fix intellij inspection error * make DynamicConfigProvider generic Change-Id: I2e3e89f8617b6fe7fc96859deca4011f609dc5a3 * deprecate PasswordProvider --- .../druid/metadata/DynamicConfigProvider.java | 39 +++++++++++++++ .../MapStringDynamicConfigProvider.java | 47 +++++++++++++++++++ .../druid/metadata/PasswordProvider.java | 6 +++ .../MapStringDynamicConfigProviderTest.java | 44 +++++++++++++++++ .../indexing/kafka/KafkaRecordSupplier.java | 35 ++++++++++---- .../supervisor/KafkaSupervisorIOConfig.java | 1 + .../kafka/KafkaRecordSupplierTest.java | 31 ++++++++++++ 7 files changed, 193 insertions(+), 10 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java create mode 100644 core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java create mode 100644 core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java diff --git a/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java new file mode 100644 index 000000000000..52f032a0a195 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * This is used to get [secure] configuration in various places in an extensible way. + */ +@ExtensionPoint +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MapStringDynamicConfigProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "mapString", value = MapStringDynamicConfigProvider.class), +}) +public interface DynamicConfigProvider +{ + Map getConfig(); +} diff --git a/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java new file mode 100644 index 000000000000..1ef5a15e8a97 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class MapStringDynamicConfigProvider implements DynamicConfigProvider +{ + private final ImmutableMap config; + + @JsonCreator + public MapStringDynamicConfigProvider( + @JsonProperty("config") Map config + ) + { + this.config = ImmutableMap.copyOf(config); + } + + + @Override + @JsonProperty + public Map getConfig() + { + return config; + } +} diff --git a/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java b/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java index cd3a01c429f0..ab7ec7c22872 100644 --- a/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java +++ b/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java @@ -26,7 +26,13 @@ /** * Implement this for different ways to (optionally securely) access secrets. + * + * Any further use case of extensible configuration/secrets must use {@link DynamicConfigProvider} interface. Users + * may still implement this interface for existing use cases till https://github.com/apache/druid/issues/9351 is + * resolved. + * */ +@Deprecated @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultPasswordProvider.class) @JsonSubTypes(value = { diff --git a/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java b/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java new file mode 100644 index 000000000000..cdf46e12b36f --- /dev/null +++ b/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +public class MapStringDynamicConfigProviderTest +{ + @Test + public void testSerde() throws Exception + { + DynamicConfigProvider original = new MapStringDynamicConfigProvider(ImmutableMap.of("k", "v")); + + ObjectMapper jsonMapper = new ObjectMapper(); + + MapStringDynamicConfigProvider recreated = (MapStringDynamicConfigProvider) jsonMapper.readValue( + jsonMapper.writeValueAsString(original), + DynamicConfigProvider.class + ); + + Assert.assertEquals(1, recreated.getConfig().size()); + Assert.assertEquals("v", recreated.getConfig().get("k")); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec0..ad870f3ef000 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -192,16 +193,30 @@ public static void addConsumerPropertiesFromConfig( // Extract passwords before SSL connection to Kafka for (Map.Entry entry : consumerProperties.entrySet()) { String propertyKey = entry.getKey(); - if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { - PasswordProvider configPasswordProvider = configMapper.convertValue( - entry.getValue(), - PasswordProvider.class - ); - properties.setProperty(propertyKey, configPasswordProvider.getPassword()); - } else { - properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + + if (!KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(propertyKey)) { + if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { + PasswordProvider configPasswordProvider = configMapper.convertValue( + entry.getValue(), + PasswordProvider.class + ); + properties.setProperty(propertyKey, configPasswordProvider.getPassword()); + } else { + properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + } + } + } + + // Additional DynamicConfigProvider based extensible support for all consumer properties + Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY); + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + Map dynamicConfig = dynamicConfigProvider.getConfig(); + + for (Map.Entry e : dynamicConfig.entrySet()) { + properties.setProperty(e.getKey(), e.getValue()); } } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index a1360b5c63f3..62c1e790657b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -32,6 +32,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig { + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password"; public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password"; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index d15803352321..ca152210f57a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -24,10 +24,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.curator.test.TestingCluster; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -582,6 +586,33 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShoul Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition)); } + @Test + public void testAddConsumerPropertiesFromConfig() + { + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( + ImmutableMap.of("kafka.prop.2", "value.2", KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd2") + ); + + Properties properties = new Properties(); + + Map consumerProperties = ImmutableMap.of( + KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd1", + "kafka.prop.1", "value.1", + "druid.dynamic.config.provider", OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + + KafkaRecordSupplier.addConsumerPropertiesFromConfig( + properties, + OBJECT_MAPPER, + consumerProperties + ); + + Assert.assertEquals(3, properties.size()); + Assert.assertEquals("value.1", properties.getProperty("kafka.prop.1")); + Assert.assertEquals("value.2", properties.getProperty("kafka.prop.2")); + Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)); + } + private void insertData() throws ExecutionException, InterruptedException { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) {