Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protobuf usage example #16

Closed
leosunmo opened this issue Sep 10, 2020 · 15 comments
Closed

Protobuf usage example #16

leosunmo opened this issue Sep 10, 2020 · 15 comments

Comments

@leosunmo
Copy link

Hey, I am trying to figure out if I can use this client to fetch Protobuf schemas in a Go consumer. Does anyone have any examples of how you could accomplish this?
Without running the Proto cli to pre-generate the code I am not sure how this would work at runtime.

Any pointers or example would be appreciated!

@shivakumarss
Copy link

@leosunmo I have not tried protobuf, but as per confluent documentation you will have to use wire format.
https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format

On the consumer side you have to ignore first 5 bytes of the message and then deserialize it.

@leosunmo
Copy link
Author

Yeah I've got that far. The approach I'll be taking is to compile all the schemas I care about, and then use the Schema Registry as supplementary information I think.
At first I was trying to use purely Schema Registry to handle the Protobuf at runtime. That seems like a very bad idea :P

@shivakumarss
Copy link

Hi @leosunmo Did you get a chance to get this working ?

@leosunmo
Copy link
Author

I've ended up using https://developers.google.com/protocol-buffers/docs/proto3#any instead to communicate what type the message is. I rely on having compiled the types already, so no "runtime" dynamic fetching of protobuf files.

@dineshgowda24
Copy link

dineshgowda24 commented Apr 7, 2021

@leosunmo will you be able to share a working example with protobufs.
I am stuck with avro error (#17) where I am unable to get producer and consumer with protobuf registry working.

@leosunmo
Copy link
Author

leosunmo commented Apr 7, 2021

@dineshgowda24 It's been a while since I looked at this, but I utilise Google's Any format, https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/any.

On the producer side I do something like this:

func makeNotifEventMessage() []byte {
	event := &metrics.Event{       # My Protobuf message
		Desc:         "Email got sent successfully",
		SourceSystem: "test-producer",
		Ts:           timestamppb.Now(),
		Type:         metrics.Event_EMAIL_SUCCESS,
		},
	}
	metricAny, _ := anypb.New(event)    # This wraps the "event"  protobuf object in an "Any" protobuf type. 

	out, _ := proto.Marshal(metricAny)
	return out
}

And on the consumer side it ends up looking something like this:

# Handle getting Kafka messages

# Unmarshal in to an "Any" type, which is wrapping the "real" message.
wrapper := &anypb.Any{} 
if err := proto.Unmarshal(kafkaMessage.Value, wrapper); err != nil {
	c.log.Error("Failed to parse event", zap.Error(err))
	continue
}

// This is probably where you could involve protoregistry in some fancy way
// and anypb.UnmarshalNew(), but for now we can keep it simple.

var msgBytes []byte

// Iterate over the possible message types and process them.
switch wrapper.MessageName() {   # The "Any" type has a MessageName field which allows you to see what kind of protobuf message the wrapped object is.
case "metrics.Event":
	me := &metrics.Event{}

	anypb.UnmarshalTo(wrapper, me, proto.UnmarshalOptions{})
	msgBytes = processMetricsEvent(me)
default:
	c.log.Info(fmt.Sprintf("Unknown message: %s, skipping.", wrapper.MessageName()))
	c.commitMessages(ctx, []kafka.Message{})
	continue
}

I hope this makes some sense.

@dineshgowda24
Copy link

@leosunmo Thanks, I get this part.

I wanted to know how you are registering the any protobuf schema with schema registry and using it in producer and consumer.
If you look at the project's readme it talks about an example of registering and using avro schema.
I wanted to know the similar thing for protobuf.

@leosunmo
Copy link
Author

leosunmo commented Apr 8, 2021

I am not registering anything with the Schema Registry in these examples. This is a way to work with multiple schemas without using Schema Registry. If you wanted to use the Schema Registry you probably wouldn't use Any at all here.

@dineshgowda24
Copy link

Got it, thanks 👍🏽

@bigkraig
Copy link
Contributor

@riferrei

Should Proto serialization/deserialization contributions be made to this repository? Something along the lines of what is available with Codec() for Avro.

@riferrei
Copy link
Owner

I welcome any change that adds value to this library. 😊

What do you have in mind?

@bigkraig
Copy link
Contributor

👍🏽
Basically parity with Avro. The unfortunate side effect is that the proto wire format has a bit more to it and so the serde is a bet more complicated and IMO probably not right to land in the client for the Schema Registry, I only ask about putting it here because it already supports the Avro & JSON serdes.

If you look at the Wire Format document and scroll past the table to the fine print, there is a message-indexes part of the payload that needs to be calculated. Similarly for deserialization we need to look up the proto message at the message-indexes location in the schema — which involves fetching the schema and reflecting through it. Lots of stuff that I feel like is outside the scope of the SR client — yet necessary to put proto on the same ground as Avro & JSON in the client. Most of this work is documented and completed in this PR already.

Then we get to publishing and reading the schema itself, which involves multiple schemas when there are references to external types, as documented in their Protobuf Serializer docs. I haven’t had the need for this yet and have yet to look into solving it.

@AtakanColak
Copy link
Collaborator

AtakanColak commented May 24, 2022

I'm inclined to add these to an examples folder and mention in the README.md to close this issue and #17 , what do you think @riferrei ?

@riferrei
Copy link
Owner

Good idea. 👍🏻

@AtakanColak
Copy link
Collaborator

I'm closing this issue as we have included existing examples in the repo and the rest is on PR at #85

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants