Skip to content

Commit

Permalink
Add optional manual commit to seldon kafka server (#4117)
Browse files Browse the repository at this point in the history
* Add optional flag to disable kafka autocommit

Signed-off-by: Emirhan Karagül <emirhan350z@gmail.com>

* Remove reqKey from KafkaJob as it is available from the message object

Signed-off-by: Emirhan Karagül <emirhan350z@gmail.com>
  • Loading branch information
YmirKhang authored Jun 7, 2022
1 parent d4d3ef1 commit 9086778
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
19 changes: 15 additions & 4 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ const (
ENV_KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC"
ENV_KAFKA_FULL_GRAPH = "KAFKA_FULL_GRAPH"
ENV_KAFKA_WORKERS = "KAFKA_WORKERS"
ENV_KAFKA_AUTO_COMMIT = "KAFKA_AUTO_COMMIT"
)

type SeldonKafkaServer struct {
Client client.SeldonApiClient
Producer *kafka.Producer
Consumer *kafka.Consumer
DeploymentName string
Namespace string
Transport string
Expand All @@ -53,9 +55,10 @@ type SeldonKafkaServer struct {
Log logr.Logger
Protocol string
FullHealthCheck bool
AutoCommit bool
}

func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, protocol, transport string, annotations map[string]string, serverUrl *url.URL, predictor *v1.PredictorSpec, broker, topicIn, topicOut string, log logr.Logger, fullHealthCheck bool) (*SeldonKafkaServer, error) {
func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, protocol, transport string, annotations map[string]string, serverUrl *url.URL, predictor *v1.PredictorSpec, broker, topicIn, topicOut string, log logr.Logger, fullHealthCheck bool, autoCommit bool) (*SeldonKafkaServer, error) {
var apiClient client.SeldonApiClient
var err error
if fullGraph {
Expand Down Expand Up @@ -101,6 +104,11 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot

}
}

if !autoCommit && workers > 1 {
log.Info("Disabling auto commit for kafka can have undesired side effects with multiple workers")
}

// Create Producer
log.Info("Creating producer", "broker", broker)
p, err := kafka.NewProducer(&producerConfigMap)
Expand All @@ -124,6 +132,7 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot
Log: log.WithName("KafkaServer"),
Protocol: protocol,
FullHealthCheck: fullHealthCheck,
AutoCommit: autoCommit,
}, nil
}

Expand Down Expand Up @@ -167,8 +176,9 @@ func (ks *SeldonKafkaServer) Serve() error {
"broker.address.family": "v4",
"group.id": ks.getGroupName(),
"session.timeout.ms": 6000,
"enable.auto.commit": true,
"auto.offset.reset": "earliest"}
"enable.auto.commit": ks.AutoCommit,
"auto.offset.reset": "earliest",
}

if util.GetKafkaSecurityProtocol() == "SSL" {
sslKakfaServer := util.GetSslElements()
Expand All @@ -190,6 +200,7 @@ func (ks *SeldonKafkaServer) Serve() error {
if err != nil {
return err
}
ks.Consumer = c
ks.Log.Info("Created", "consumer", c.String(), "consumer group", ks.getGroupName(), "topic", ks.TopicIn)

err = c.SubscribeTopics([]string{ks.TopicIn}, nil)
Expand Down Expand Up @@ -275,7 +286,7 @@ func (ks *SeldonKafkaServer) Serve() error {

job := KafkaJob{
headers: headers,
reqKey: e.Key,
message: e,
reqPayload: reqPayload,
}
// enqueue a job
Expand Down
16 changes: 14 additions & 2 deletions executor/api/kafka/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
Expand All @@ -12,7 +13,7 @@ import (

type KafkaJob struct {
headers map[string][]string
reqKey []byte
message *kafka.Message
reqPayload payload.SeldonPayload
}

Expand Down Expand Up @@ -63,11 +64,22 @@ func (ks *SeldonKafkaServer) processKafkaRequest(job *KafkaJob) {

err = ks.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &ks.TopicOut, Partition: kafka.PartitionAny},
Key: job.reqKey,
Key: job.message.Key,
Value: resBytes,
Headers: kafkaHeaders,
}, nil)

if err != nil {
ks.Log.Error(err, "Failed to produce response")
}

// Commit the messages here
if !ks.AutoCommit {
_, err = ks.Consumer.CommitMessage(job.message)

if err != nil {
ks.Log.Error(err, "Failed to commit offsets")
}
}

}
14 changes: 13 additions & 1 deletion executor/cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
kafkaTopicOut = flag.String("kafka_output_topic", "", "The kafka output topic")
kafkaFullGraph = flag.Bool("kafka_full_graph", false, "Use kafka for internal graph processing")
kafkaWorkers = flag.Int("kafka_workers", 4, "Number of kafka workers")
kafkaAutoCommit = flag.Bool("kafka_auto_commit", true, "Use auto committing in the kafka consumer")
logKafkaBroker = flag.String("log_kafka_broker", "", "The kafka log broker")
logKafkaTopic = flag.String("log_kafka_topic", "", "The kafka log topic")
fullHealthChecks = flag.Bool("full_health_checks", false, "Full health checks via chosen protocol API")
Expand Down Expand Up @@ -285,6 +286,17 @@ func main() {
}
}

// Get Kafka Auto Commit
kafkaAutoCommitFromEnv := os.Getenv(kafka.ENV_KAFKA_AUTO_COMMIT)
if kafkaAutoCommitFromEnv != "" {
kafkaAutoCommitFromEnvBool, err := strconv.ParseBool(kafkaAutoCommitFromEnv)
if err != nil {
log.Fatalf("Failed to parse %s %s", kafka.ENV_KAFKA_AUTO_COMMIT, kafkaAutoCommitFromEnv)
} else {
*kafkaAutoCommit = kafkaAutoCommitFromEnvBool
}
}

//Kafka workers
kafkaWorkersFromEnv := os.Getenv(kafka.ENV_KAFKA_WORKERS)
if kafkaWorkersFromEnv != "" {
Expand Down Expand Up @@ -364,7 +376,7 @@ func main() {

if *serverType == "kafka" {
logger.Info("Starting kafka server")
kafkaServer, err := kafka.NewKafkaServer(*kafkaFullGraph, *kafkaWorkers, *sdepName, *namespace, *protocol, *transport, annotations, serverUrl, predictor, *kafkaBroker, *kafkaTopicIn, *kafkaTopicOut, logger, *fullHealthChecks)
kafkaServer, err := kafka.NewKafkaServer(*kafkaFullGraph, *kafkaWorkers, *sdepName, *namespace, *protocol, *transport, annotations, serverUrl, predictor, *kafkaBroker, *kafkaTopicIn, *kafkaTopicOut, logger, *fullHealthChecks, *kafkaAutoCommit)
if err != nil {
log.Fatalf("Failed to create kafka server: %v", err)
}
Expand Down

0 comments on commit 9086778

Please sign in to comment.