-
Notifications
You must be signed in to change notification settings - Fork 828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(gateway): wait for kafka topic creation #5359
fix(gateway): wait for kafka topic creation #5359
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming that we do not need to do anything for pipelinegateway (waiting for topic creation - pipeline input and pipeline output) as this is is done by dataflow engine and it will wait for the these topics to be created (with the other PR).
@@ -241,7 +287,7 @@ func (kc *InferKafkaHandler) AddModel(modelName string) error { | |||
kc.subscribedTopics[inputTopic] = true | |||
err := kc.subscribeTopics() | |||
if err != nil { | |||
kc.logger.WithError(err).Warn("Failed to subscribe to topics") | |||
kc.logger.WithError(err).Errorf("failed to subscribe to topics") | |||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should investigate more on why all of these errors are being swallowed in infer.go
instead of being returned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
Kafka topic creation happens asynchronously. This means that even when the return value from createTopics(...) indicates that the topic has been created successfuly, the topic can not be immediately subscribed to. We retry DescribeTopics until all of the topics for the pipeline can be described successfully. This indicates that the topic has been fully created at least on one broker, and can now be subscribed to. Which issue(s) this PR fixes: Fixes gateway component for #INFRA-663 (internal): Pipeline creation goes into ERROR state
- timeout config now in gateway/constants.go - reversal of the `pipeline.wg.Add(1)` code move - adding error returns when reaching error states - tidying-up of error reporting
ea3101d
to
79fc658
Compare
What this PR does / why we need it:
Kafka topic creation happens asynchronously. This means that even when the
return value from createTopics(...) indicates that the topic has been created
successfuly, the topic can not be immediately subscribed to.
We retry DescribeTopics until all of the topics for the pipeline can be
described successfully. This indicates that the topic has been fully created at
least on one broker, and can now be subscribed to.
Which issue(s) this PR fixes:
Fixes gateway component for #INFRA-663 (internal): Pipeline creation goes into ERROR state