Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New (key|value).multi.type option for Avro serialization #680

Merged
merged 5 commits into from
Jan 12, 2018

Conversation

ept
Copy link
Contributor

@ept ept commented Dec 1, 2017

In some situations, an application needs to store events of several different types in the same Kafka topic. In particular, when developing a data model in an Event Sourcing style, you might have several kinds of event that affect the state of an entity. For example, for a customer entity there may be customerCreated, customerAddressChanged, customerEnquiryReceived, customerInvoicePaid, etc. events, and the application may require that those events are always read in the same order. Thus, they need to go in the same Kafka partition (to maintain ordering).

The Avro schema registry currently assumes a 1:1 mapping between Kafka topics and Avro schemas, making it difficult to support scenarios like the one above. Users who want several event types in the same topic currently either have to put them in one big Avro union (which works, but gets unwieldy very quickly), or turn off the registry's schema compatibility checking (which would be unfortunate, since the compatibility check is very valuable).

This patch introduces two new boolean config settings, key.multi.type and value.multi.type. When set to true, they allow the key (or value, respectively) of a message to be any Avro record type. The schema of the type is stored in the schema registry as usual; however, instead of
using <topic>-key or <topic>-value as subject, the fully-qualified name of the record type is used as subject.

This has the effect that a Kafka producer will happily accept any mixture of Avro record types and publish them to the same topic. Since the schema registry's ID for a schema is globally unique, the binary message encoding does not need to change, and consumers also handle the mixture of record types without change. When a schema is changed, the registry checks compatibility with previous schemas of the same fully-qualified type name; different record types can be evolved independently without any interference.

@ghost
Copy link

ghost commented Dec 1, 2017

It looks like @ept hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@ept
Copy link
Contributor Author

ept commented Dec 1, 2017

[clabot:check]

@ghost
Copy link

ghost commented Dec 1, 2017

@confluentinc It looks like @ept just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@mageshn
Copy link
Member

mageshn commented Dec 4, 2017

@ept thanks for your patch. While for most part, it looks good; I'm just thinking gout aloud here if it would help to generalize this. We have different scenarios where users would want to share the same schema across topic. Your solution can be used to fix that scenario as well. So, may be we could call the config to be something key.subject.name.strategy and value.subject.name.strategy. The default strategy could always use topic-ket and topic-value. Let me know your thoughts.

@ept
Copy link
Contributor Author

ept commented Dec 11, 2017

@mageshn Thanks for the suggestion — I think that's a good idea, so I've implemented the key.subject.name.strategy and value.subject.name.strategy configs. They currently have three valid settings:

  • default: <topic>-key or <topic>-value as before
  • "type" setting: uses the fully-qualified Avro record name as subject
  • "topic-type" setting: uses the Kafka topic concatenated with the fully-qualified Avro record name as subject

Both the "type" and "topic-type" settings allow multiple event types in the same topic; the difference is just the scope at which the schema compatibility check is performed (per-topic per-type, or globally per-type).

Copy link
Member

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really great improvement. Thanks, @ept. I do have one suggestion below:

throw new SerializationException("Unknown value for "
+ AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + ": "
+ valueSubjectNameStrategy);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called frequently, and doing all these string comparisons in each call is less than ideal. What do you think about creating a functional interface like:

public interface SubjectNamingStrategy {
    String getSubjectName(String topic, Object value);
}

with a separate implementation for each strategy (see below). The strategy for keys and values could be instantiated once in configureClientProperties(...) above:

if ("topic-key".equals(keySubjectNameStrategy)) {
    keySubjectStrategy = new TopicNamingStrategy();
} else if ("type".equals(keySubjectNameStrategy)) {
    keySubjectStrategy = new RecordSchemaNamingStrategy("");
} else if ("topic-type".equals(keySubjectNameStrategy)) {
    keySubjectStrategy = new RecordSchemaNamingStrategy("topic-");
} else {
    throw new SerializationException("Unknown value for "
                + AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY + ": "
                + keySubjectNameStrategy);
}
if ("topic-value".equals(valueSubjectNameStrategy)) {
    keySubjectStrategy = new TopicNamingStrategy();
} else if ("type".equals(valueSubjectNameStrategy)) {
    valueSubjectStrategy = new RecordSchemaNamingStrategy("");
} else if ("topic-type".equals(valueSubjectNameStrategy)) {
    valueSubjectStrategy = new RecordSchemaNamingStrategy("topic-");
} else {
    throw new SerializationException("Unknown value for "
                + AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + ": "
                + valueSubjectNameStrategy);
}

One benefit is that an incorrect value for the configuration property is detected right away. However, the big benefit is that the getSubjectName member method that is called frequently by subclasses can be far more efficient by delegating to the correct strategy and forgoing all of the string comparisons:

protected String getSubjectName(String topic, boolean isKey, Object value) {
    return isKey ? keyStrategy.getSubjectName(topic, isKey, value) : valueStrategy.getSubjectName(topic, isKey, value);
}

Each SubjectNamingStrategy implementation would be quite straightforward. For example, the TopicSubjectNamingStrategy might be implemented as follows:

protected static class TopicNamingStrategy implements SubjectNamingStrategy {
    public String getSubjectName(String topic, boolean isKey, Object value) {
       if (isKey) {
           return topic + "-key";
       }
       return topic + "-value"; 
    }
}

while another RecordSchemaNamingStrategy implementation could handle both type and topic-type options by just using different prefixes:

protected static class RecordSchemaNamingStrategy implements SubjectNamingStrategy {
    private final String prefix;
    public RecordSchemaNamingStrategy(String prefix) {
        this.prefix = prefix != null ? prefix : "";
    }
    public String getSubjectName(String topic, boolean isKey, Object value) {
        // Null is passed through unserialized, since it has special meaning in
        // log-compacted Kafka topics.
        if (value == null) {
          return null;
        }

        if (value instanceof GenericContainer) {
          Schema schema = ((GenericContainer) value).getSchema();
          if (schema.getType() == Schema.Type.RECORD) {
            return prefix + schema.getFullName();
          }
        }

        // isKey is only used to produce more helpful error messages
        if (isKey) {
          throw new SerializationException("In configuration "
              + AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY + " = "
              + keySubjectNameStrategy + ", the message key must only be an Avro record");
        } else {
          throw new SerializationException("In configuration "
              + AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + " = "
              + valueSubjectNameStrategy + ", the message value must only be an Avro record");
        }
    }
}

Copy link
Member

@mageshn mageshn Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, I would prefer making the Strategy interface public so that users could choose to use their own topic to subject mapping strategy if they need to. Essentially, the config becomes a class name.

@ept
Copy link
Contributor Author

ept commented Jan 5, 2018

@rhauch @mageshn Happy new year! I have updated the patch as you suggested, using different classes to implement the different subject-name choosing strategies. The configuration is now a fully-qualified Java classname, so that people can easily plug in their own strategies if desired. Could you let me know if it looks good now?

Copy link
Member

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ept, happy new year to you! Thanks for the changes. I have one really minor question below -- otherwise this looks great!

Approving as is in case it's difficult to find succinct and clear text to add.

TopicNameStrategy.class.getName();
public static final String KEY_SUBJECT_NAME_STRATEGY_DOC =
"Determines how to construct the subject name under which the key schema is registered "
+ "with the schema registry";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: perhaps the key and value doc strings could say that by default the older topic naming behavior/strategy is used. It's not essential, but if it can be said clearly and succinctly it might help people understand that the behavior won't change if they use the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clarification of default behaviour in bf574fe.

@rhauch
Copy link
Member

rhauch commented Jan 5, 2018

BTW, not sure if these pass locally, but the build is failing with the NPEs in the following tests:

  • io.confluent.connect.avro.AvroConverterTest.testPrimitive
  • io.confluent.connect.avro.AvroConverterTest.testVersionExtracted
  • io.confluent.connect.avro.AvroConverterTest.testVersionMaintained
  • io.confluent.connect.avro.AvroConverterTest.testComplex

For example:

java.lang.NullPointerException
at io.confluent.connect.avro.AvroConverterTest.testVersionMaintained(AvroConverterTest.java:210)

@ept
Copy link
Contributor Author

ept commented Jan 5, 2018

Whoops, sorry about the test failures. I had just ran the tests on the kafka-avro-serializer module but not the rest. Pushed 6a9092a to fix the build.

@@ -41,6 +43,8 @@

private static final Map<String, Schema> primitiveSchemas;
protected SchemaRegistryClient schemaRegistry;
protected SubjectNameStrategy keySubjectNameStrategy = new TopicNameStrategy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : default is already specified in the config def. So, this is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary: if the field is not initialised here, we get the NPEs that @rhauch complained about in tests for the kafka-connect-avro-converter module. It might be that those tests aren't properly configuring the SerDe, but I didn't want to get into debugging tests that are unrelated to the feature at hand.

public class RecordNameStrategy implements SubjectNameStrategy {

@Override
public void configure(Map<String, ?> config) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing this being invoked after instance creation. Since this is a public API, users would possibly expect it to work if they use it. We should either invoke it or not extend Configurable in the interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is invoked here, right?

I think it's useful to extend Configurable -- it's too hard w/ Java 7 to add it later when we need it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I missed it is using getConfiguredInstance. LGTM

@ept
Copy link
Contributor Author

ept commented Jan 9, 2018

Could someone at Confluent please tell me why the Jenkins check is failing? The branch builds fine for me locally, and jenkins.confluent.io is not accessible to the public.

@mageshn
Copy link
Member

mageshn commented Jan 10, 2018

@ept Its failing for some find bug errors thats already fixed in master. Can you try rebasing your branch.

In some situations, an application needs to store events of several
different types in the same Kafka topic. In particular, when developing
a data model in an Event Sourcing style, you might have several kinds of
event that affect the state of an entity. For example, for a customer
entity there may be customerCreated, customerAddressChanged,
customerEnquiryReceived, customerInvoicePaid, etc. events, and the
application may require that those events are always read in the same
order. Thus, they need to go in the same Kafka partition (to maintain
ordering).

The Avro schema registry currently assumes a 1:1 mapping between Kafka
topics and Avro schemas, making it difficult to support scenarios like
the one above. Users who want several event types in the same topic
currently either have to put them in one big Avro union (which works,
but gets unwieldy very quickly), or turn off the registry's schema
compatibility checking (which would be unfortunate, since the
compatibility check is very valuable).

This patch introduces two new boolean config settings, key.multi.type
and value.multi.type. When set to true, they allow the key (or value,
respectively) of a message to be *any* Avro record type. The schema of
the type is stored in the schema registry as usual; however, instead of
using "<topic>-key" or "<topic>-value" as subject, the fully-qualified
name of the record type is used as subject.

This has the effect that a Kafka producer will happily accept any
mixture of Avro record types and publish them to the same topic. Since
the schema registry's ID for a schema is globally unique, the binary
message encoding does not need to change, and consumers also handle the
mixture of record types without change. When a schema is changed, the
registry checks compatibility with previous schemas of the same
fully-qualified type name; different record types can be evolved
independently without any interference.
Using a string value for the configuration allows more than two
settings. Added an option to use topic name + record name as subject.
@ept
Copy link
Contributor Author

ept commented Jan 12, 2018

@mageshn Ok, rebased onto master.

@arnaudbos
Copy link

Hi,
What release of schema-registry (and which docker image tag) is this patch planned for?
Thanks for the hard work 🤓

@defpearlpilot
Copy link

I second @arnaudbos question. What is the release timeline for this?

@mageshn
Copy link
Member

mageshn commented Feb 15, 2018

This will be released with the upcoming 4.1 release. I don't have exact timelines but should be tentatively around end of March or early April.

@tPl0ch
Copy link

tPl0ch commented Feb 18, 2018

@mageshn Are there any plans to port this behaviour to the REST proxy too? Currently the same restriction applies there by only allowing single key/value schemas with a record batch.

@eventSourcerer
Copy link

eventSourcerer commented Feb 20, 2018

This looks like a very nice improvement - Our event schema is getting HUGE ;-)

Will this integrate with Kafka Streams? Currently we provide a concrete keySerde and valueSerde per topic there...

@jxbes
Copy link

jxbes commented Mar 14, 2018

I second @eventSourcerer 's question. How will we provide the key/value serde when reading a topic into a stream?

@gphilipp
Copy link

gphilipp commented Mar 23, 2018

Users who want several event types in the same topic currently either have to put them in one big Avro union (which works, but gets unwieldy very quickly).

Is there an example of this somewhere (with a union type as the root element of the schema) ?

@Yahampath
Copy link

Hi,
Is there any example of this. If any please comment below.

@arnaudbos
Copy link

arnaudbos commented May 24, 2018

I don't think Avro supports this out of the box.
What you'd have to do is have a union type field.

Now that 4.1 is out, can we use the multi-schema feature? I didn't see it mentioned in the release note.

We have implemented our own version of schema-registry based on etcd (we had etcd at hand) because we needed this feature before it was released, though it doesn't support schema compatibility enforcements and rather than adding it on top of our own we'd like to migrate whenever possible.

Edit: Yes it has been released in 4.1.0, see changelog and docs.

@buntyray
Copy link

Where can I get this patch for version 3.3.0

@teabot
Copy link
Contributor

teabot commented Jul 16, 2019

Why is the Avro union not a better fit for this? The criticism of union was that this create unwieldy schemas - which I assume is a design time concern. However I believe that this is alleviated through the used of IDL imports to compose multiple smaller schemas into a single union.

By inventing a new out-of-band method of type composition, the utility of Avro compatibility checks is reduced and the burden on consumers increased. It unnecessarily creates another method of encoding/decoding records onto a topic. There is no longer a single contract for the whole topic, and now to validate a consumers ability to read one needs to:

  • list all subjects
  • filter subjects by topic prefix
  • extract schema for each subject

Under this scheme, the 'total schema' for the topic is opaque to the consumer and this feels misaligned with the principles of shared schemas and registries.

@rayokota
Copy link
Member

rayokota commented Jul 8, 2020

@teabot , unions can now be used with schema references to store multiple event types in the same topic:

https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.