-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New (key|value).multi.type option for Avro serialization #680
Conversation
It looks like @ept hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @ept just signed our Contributor License Agreement. 👍 Always at your service, clabot |
@ept thanks for your patch. While for most part, it looks good; I'm just thinking gout aloud here if it would help to generalize this. We have different scenarios where users would want to share the same schema across topic. Your solution can be used to fix that scenario as well. So, may be we could call the config to be something key.subject.name.strategy and value.subject.name.strategy. The default strategy could always use topic-ket and topic-value. Let me know your thoughts. |
@mageshn Thanks for the suggestion — I think that's a good idea, so I've implemented the key.subject.name.strategy and value.subject.name.strategy configs. They currently have three valid settings:
Both the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really great improvement. Thanks, @ept. I do have one suggestion below:
throw new SerializationException("Unknown value for " | ||
+ AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + ": " | ||
+ valueSubjectNameStrategy); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is called frequently, and doing all these string comparisons in each call is less than ideal. What do you think about creating a functional interface like:
public interface SubjectNamingStrategy {
String getSubjectName(String topic, Object value);
}
with a separate implementation for each strategy (see below). The strategy for keys and values could be instantiated once in configureClientProperties(...)
above:
if ("topic-key".equals(keySubjectNameStrategy)) {
keySubjectStrategy = new TopicNamingStrategy();
} else if ("type".equals(keySubjectNameStrategy)) {
keySubjectStrategy = new RecordSchemaNamingStrategy("");
} else if ("topic-type".equals(keySubjectNameStrategy)) {
keySubjectStrategy = new RecordSchemaNamingStrategy("topic-");
} else {
throw new SerializationException("Unknown value for "
+ AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY + ": "
+ keySubjectNameStrategy);
}
if ("topic-value".equals(valueSubjectNameStrategy)) {
keySubjectStrategy = new TopicNamingStrategy();
} else if ("type".equals(valueSubjectNameStrategy)) {
valueSubjectStrategy = new RecordSchemaNamingStrategy("");
} else if ("topic-type".equals(valueSubjectNameStrategy)) {
valueSubjectStrategy = new RecordSchemaNamingStrategy("topic-");
} else {
throw new SerializationException("Unknown value for "
+ AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + ": "
+ valueSubjectNameStrategy);
}
One benefit is that an incorrect value for the configuration property is detected right away. However, the big benefit is that the getSubjectName
member method that is called frequently by subclasses can be far more efficient by delegating to the correct strategy and forgoing all of the string comparisons:
protected String getSubjectName(String topic, boolean isKey, Object value) {
return isKey ? keyStrategy.getSubjectName(topic, isKey, value) : valueStrategy.getSubjectName(topic, isKey, value);
}
Each SubjectNamingStrategy
implementation would be quite straightforward. For example, the TopicSubjectNamingStrategy
might be implemented as follows:
protected static class TopicNamingStrategy implements SubjectNamingStrategy {
public String getSubjectName(String topic, boolean isKey, Object value) {
if (isKey) {
return topic + "-key";
}
return topic + "-value";
}
}
while another RecordSchemaNamingStrategy
implementation could handle both type
and topic-type
options by just using different prefixes:
protected static class RecordSchemaNamingStrategy implements SubjectNamingStrategy {
private final String prefix;
public RecordSchemaNamingStrategy(String prefix) {
this.prefix = prefix != null ? prefix : "";
}
public String getSubjectName(String topic, boolean isKey, Object value) {
// Null is passed through unserialized, since it has special meaning in
// log-compacted Kafka topics.
if (value == null) {
return null;
}
if (value instanceof GenericContainer) {
Schema schema = ((GenericContainer) value).getSchema();
if (schema.getType() == Schema.Type.RECORD) {
return prefix + schema.getFullName();
}
}
// isKey is only used to produce more helpful error messages
if (isKey) {
throw new SerializationException("In configuration "
+ AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY + " = "
+ keySubjectNameStrategy + ", the message key must only be an Avro record");
} else {
throw new SerializationException("In configuration "
+ AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + " = "
+ valueSubjectNameStrategy + ", the message value must only be an Avro record");
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, I would prefer making the Strategy interface public so that users could choose to use their own topic to subject mapping strategy if they need to. Essentially, the config becomes a class name.
@rhauch @mageshn Happy new year! I have updated the patch as you suggested, using different classes to implement the different subject-name choosing strategies. The configuration is now a fully-qualified Java classname, so that people can easily plug in their own strategies if desired. Could you let me know if it looks good now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ept, happy new year to you! Thanks for the changes. I have one really minor question below -- otherwise this looks great!
Approving as is in case it's difficult to find succinct and clear text to add.
TopicNameStrategy.class.getName(); | ||
public static final String KEY_SUBJECT_NAME_STRATEGY_DOC = | ||
"Determines how to construct the subject name under which the key schema is registered " | ||
+ "with the schema registry"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: perhaps the key and value doc strings could say that by default the older topic naming behavior/strategy is used. It's not essential, but if it can be said clearly and succinctly it might help people understand that the behavior won't change if they use the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added clarification of default behaviour in bf574fe.
BTW, not sure if these pass locally, but the build is failing with the NPEs in the following tests:
For example: java.lang.NullPointerException |
Whoops, sorry about the test failures. I had just ran the tests on the kafka-avro-serializer module but not the rest. Pushed 6a9092a to fix the build. |
@@ -41,6 +43,8 @@ | |||
|
|||
private static final Map<String, Schema> primitiveSchemas; | |||
protected SchemaRegistryClient schemaRegistry; | |||
protected SubjectNameStrategy keySubjectNameStrategy = new TopicNameStrategy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : default is already specified in the config def. So, this is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary: if the field is not initialised here, we get the NPEs that @rhauch complained about in tests for the kafka-connect-avro-converter module. It might be that those tests aren't properly configuring the SerDe, but I didn't want to get into debugging tests that are unrelated to the feature at hand.
public class RecordNameStrategy implements SubjectNameStrategy { | ||
|
||
@Override | ||
public void configure(Map<String, ?> config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing this being invoked after instance creation. Since this is a public API, users would possibly expect it to work if they use it. We should either invoke it or not extend Configurable in the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is invoked here, right?
I think it's useful to extend Configurable -- it's too hard w/ Java 7 to add it later when we need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. I missed it is using getConfiguredInstance. LGTM
Could someone at Confluent please tell me why the Jenkins check is failing? The branch builds fine for me locally, and jenkins.confluent.io is not accessible to the public. |
@ept Its failing for some find bug errors thats already fixed in master. Can you try rebasing your branch. |
In some situations, an application needs to store events of several different types in the same Kafka topic. In particular, when developing a data model in an Event Sourcing style, you might have several kinds of event that affect the state of an entity. For example, for a customer entity there may be customerCreated, customerAddressChanged, customerEnquiryReceived, customerInvoicePaid, etc. events, and the application may require that those events are always read in the same order. Thus, they need to go in the same Kafka partition (to maintain ordering). The Avro schema registry currently assumes a 1:1 mapping between Kafka topics and Avro schemas, making it difficult to support scenarios like the one above. Users who want several event types in the same topic currently either have to put them in one big Avro union (which works, but gets unwieldy very quickly), or turn off the registry's schema compatibility checking (which would be unfortunate, since the compatibility check is very valuable). This patch introduces two new boolean config settings, key.multi.type and value.multi.type. When set to true, they allow the key (or value, respectively) of a message to be *any* Avro record type. The schema of the type is stored in the schema registry as usual; however, instead of using "<topic>-key" or "<topic>-value" as subject, the fully-qualified name of the record type is used as subject. This has the effect that a Kafka producer will happily accept any mixture of Avro record types and publish them to the same topic. Since the schema registry's ID for a schema is globally unique, the binary message encoding does not need to change, and consumers also handle the mixture of record types without change. When a schema is changed, the registry checks compatibility with previous schemas of the same fully-qualified type name; different record types can be evolved independently without any interference.
Using a string value for the configuration allows more than two settings. Added an option to use topic name + record name as subject.
@mageshn Ok, rebased onto master. |
Hi, |
I second @arnaudbos question. What is the release timeline for this? |
This will be released with the upcoming 4.1 release. I don't have exact timelines but should be tentatively around end of March or early April. |
@mageshn Are there any plans to port this behaviour to the REST proxy too? Currently the same restriction applies there by only allowing single key/value schemas with a record batch. |
This looks like a very nice improvement - Our event schema is getting HUGE ;-) Will this integrate with Kafka Streams? Currently we provide a concrete keySerde and valueSerde per topic there... |
I second @eventSourcerer 's question. How will we provide the key/value serde when reading a topic into a stream? |
Is there an example of this somewhere (with a union type as the root element of the schema) ? |
Hi, |
I don't think Avro supports this out of the box. Now that 4.1 is out, can we use the multi-schema feature? I didn't see it mentioned in the release note. We have implemented our own version of schema-registry based on etcd (we had etcd at hand) because we needed this feature before it was released, though it doesn't support schema compatibility enforcements and rather than adding it on top of our own we'd like to migrate whenever possible. Edit: Yes it has been released in 4.1.0, see changelog and docs. |
Where can I get this patch for version 3.3.0 |
Why is the Avro By inventing a new out-of-band method of type composition, the utility of Avro compatibility checks is reduced and the burden on consumers increased. It unnecessarily creates another method of encoding/decoding records onto a topic. There is no longer a single contract for the whole topic, and now to validate a consumers ability to read one needs to:
Under this scheme, the 'total schema' for the topic is opaque to the consumer and this feels misaligned with the principles of shared schemas and registries. |
@teabot , unions can now be used with schema references to store multiple event types in the same topic: https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/ |
In some situations, an application needs to store events of several different types in the same Kafka topic. In particular, when developing a data model in an Event Sourcing style, you might have several kinds of event that affect the state of an entity. For example, for a customer entity there may be
customerCreated
,customerAddressChanged
,customerEnquiryReceived
,customerInvoicePaid
, etc. events, and the application may require that those events are always read in the same order. Thus, they need to go in the same Kafka partition (to maintain ordering).The Avro schema registry currently assumes a 1:1 mapping between Kafka topics and Avro schemas, making it difficult to support scenarios like the one above. Users who want several event types in the same topic currently either have to put them in one big Avro union (which works, but gets unwieldy very quickly), or turn off the registry's schema compatibility checking (which would be unfortunate, since the compatibility check is very valuable).
This patch introduces two new boolean config settings,
key.multi.type
andvalue.multi.type
. When set to true, they allow the key (or value, respectively) of a message to be any Avro record type. The schema of the type is stored in the schema registry as usual; however, instead ofusing
<topic>-key
or<topic>-value
as subject, the fully-qualified name of the record type is used as subject.This has the effect that a Kafka producer will happily accept any mixture of Avro record types and publish them to the same topic. Since the schema registry's ID for a schema is globally unique, the binary message encoding does not need to change, and consumers also handle the mixture of record types without change. When a schema is changed, the registry checks compatibility with previous schemas of the same fully-qualified type name; different record types can be evolved independently without any interference.