-
Notifications
You must be signed in to change notification settings - Fork 831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Only create kafka config once in model gateway #4940
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
getEnVar(logger, gateway.EnvMaxNumConsumers, gateway.DefaultMaxNumConsumers)) | ||
if err != nil { | ||
logger.WithError(err).Fatal("Failed to create consumer manager") | ||
} | ||
defer kafkaConsumer.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is an error, can we still call kafkaConsumer.Stop()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deferred functions will not be run when Fatal
is called.
consumerConfigMap kafka.ConfigMap, | ||
producerConfigMap kafka.ConfigMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if these configs are kafka specific could we make it explicit in their names. just to make it explicit that it is different from consumerConfig
? or rename consumerConfig
to managerConfig
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually consumerConfig
has KafkaConfig
. Is there a way perhaps to merge all configs together to reduce confusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KafkaConfig is from the Kubernetes/file config. We use that to create the lower level kafka.configMap
@@ -97,26 +99,13 @@ func NewInferKafkaHandler(logger log.FieldLogger, consumerConfig *ConsumerConfig | |||
numPartitions: numPartitions, | |||
tlsClientOptions: tlsClientOptions, | |||
} | |||
return ic, ic.setup() | |||
return ic, ic.setup(consumerConfigMap, producerConfigMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
producerConfigAsJSON, err := json.Marshal(&producerConfigMap) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens when we fail serialisation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its purely informational so the output will be less formatted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming last changes have been pushed.
Fixes: #4936
At present the model gateway creates Kafka config every time a new consumer/producer is required. This is wasteful especially when TLS with k8s secrets are used as k8s calls to get the secret contents are required. This also seems to create a race condition as secrets are saved to disk at the same location. This change creates the config only once and copies it for each consumer/produced needed.
In future, we should change to pass the certificates directly now this issue is fixed. However, we need to wait for a splunk otel kafka PR to upgrade to v2 of confluent kafka-go to allow an easy change as one needs to change to a new module version for confluent go v2 which is where the issue fixed is released.
A future PR can do the kafka change to not use files for ca.crt and other certificates.