Skip to content

Commit

Permalink
Seldon add ssl (#3813)
Browse files Browse the repository at this point in the history
* Add SSL for Kafka loggers

* Remove SeldonDeployment used for testing

* Add ssl producer for workers as well

* Add tests that don't work

* more tests

* Refactor code and change tests

To avoid having to write the same piece of code twice, I refactored the
code.
I cannot get the test to do what I want though, if I don't set a
`broker` then it just hangs and if I set a broker then it throws an
error with
```
%7|1640872656.856|BROKERFAIL|rdkafka#producer-1| [thrd:ssl://broker:9092/bootstrap]: ssl://broker:9092/bootstrap: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1640872656.856|STATE|rdkafka#producer-1| [thrd:ssl://broker:9092/bootstrap]: ssl://broker:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1640872656.856|BROADCAST|rdkafka#producer-1| [thrd:ssl://broker:9092/bootstrap]: Broadcasting state change
%7|1640872656.856|BUFQ|rdkafka#producer-1| [thrd:ssl://broker:9092/bootstrap]: ssl://broker:9092/bootstrap: Purging bufq with 0 buffers
%7|1640872656.856|BUFQ|rdkafka#producer-1| [thrd:ssl://broker:9092/bootstrap]: ssl://broker:9092/bootstrap: Purging bufq with 0 buffers
%7|1640872656.856|BUFQ|rdkafka#producer-1| [thrd:ssl://broker:9092/bootstrap]: ssl://broker:9092/bootstrap: Updating 0 buffers on connection reset
```

* Address comments from PR

* Update executor/logger/worker_test.go

Co-authored-by: Alex Rakowski <20504869+agrski@users.noreply.github.com>

* Add Documentation on how to use SSL with Kakfa

* Improve documentation based on feedback

* Update doc/source/analytics/logging.md

Co-authored-by: Alex Rakowski <20504869+agrski@users.noreply.github.com>

* use function to get KafkaSecurityProtocol

* Add tests for config files

* Add links to Confluent page and librdkafka

* Cleanup logging and remove `/certs`

* Update doc/source/analytics/logging.md

Co-authored-by: Alex Rakowski <20504869+agrski@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Alex Rakowski <20504869+agrski@users.noreply.github.com>

* Remove test that doesn't work, add gomega for test

* Import gomega as immediate identifiers

* Only check sslElements if we use SSL :)

Co-authored-by: Alex Rakowski <20504869+agrski@users.noreply.github.com>
  • Loading branch information
stephen37 and agrski authored Jan 17, 2022
1 parent 77ba6b0 commit e99888f
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 18 deletions.
54 changes: 54 additions & 0 deletions doc/source/analytics/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,60 @@ The two required environment variables are:
* LOGGER_KAFKA_BROKER : The Kafka Broker service endpoint.
* LOGGER_KAFKA_TOPIC : The kafka Topic to log the requests.

### Logging to encrypted Kafka with SSL

You can log requests to an encrypted Kafka with SSL. SSL uses private-key/ certificate pairs, which are used during the SSL handshake process.

To be able to log payloads, the client needs:
* to authenticate with SSL
* its own keystore, made up of a key pair and a signed certificate
* the CA certificate used to sign the key-certificate pair

The CA certificate needs to be recognised by the broker and can also be used for verifying the broker's certificate.

It is possible to read more about the different options available on the [Confluent documentation](https://docs.confluent.io/platform/current/kafka/authentication_ssl.html) and [librdkafka Configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) pages.

Here is an example on how to define these for a deployment:

```yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: cifar10
namespace: seldon
spec:
name: resnet32
predictors:
- graph:
implementation: TRITON_SERVER
logger:
mode: all
modelUri: gs://seldon-models/triton/tf_cifar10
name: cifar10
name: default
svcOrchSpec:
env:
- name: LOGGER_KAFKA_BROKER
value: seldon-kafka-plain-0.kafka:9092
- name: LOGGER_KAFKA_TOPIC
value: seldon
- name: KAFKA_SECURITY_PROTOCOL
value: ssl
- name: KAFKA_SSL_CA_CERT_FILE
value: /path/to/ca.pem
- name: KAFKA_SSL_CLIENT_CERT_FILE
value: /path/to/access.cert
- name: KAFKA_SSL_CLIENT_KEY_FILE
value: /path/to/access.key
- name: KAFKA_SSL_CLIENT_KEY_PASS
valueFrom:
secretKeyRef:
name: my-kafka-secret
key: ssl-password # Key password, if any (optional field)
replicas: 1
protocol: kfserving
```
Follow a [benchmarking notebook for CIFAR10 image payload logging showing 3K predictions per second with Triton Inference Server](../examples/kafka_logger.html).

## Setting Global Default
Expand Down
1 change: 1 addition & 0 deletions executor/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ openapi/
executor/api/rest/openapi/
triton-inference-server/
_operator
certs/
31 changes: 22 additions & 9 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package kafka

import (
"fmt"
"net/url"
"os"
"os/signal"
"reflect"
"syscall"
"time"

"github.com/cloudevents/sdk-go/pkg/bindings/http"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
Expand All @@ -13,14 +20,9 @@ import (
"github.com/seldonio/seldon-core/executor/api/grpc/tensorflow"
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/api/rest"
"github.com/seldonio/seldon-core/executor/api/util"
"github.com/seldonio/seldon-core/executor/predictor"
v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
"net/url"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)

const (
Expand Down Expand Up @@ -76,12 +78,23 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot
return nil, fmt.Errorf("Unknown transport %s", transport)
}
}
var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
}
if broker != "" {
if util.GetKafkaSecurityProtocol() == "SSL" {
sslKakfaServer := util.GetSslElements()
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile
producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any

}
}
// Create Producer
log.Info("Creating producer", "broker", broker)
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
})
p, err := kafka.NewProducer(&producerConfigMap)
if err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions executor/api/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"os"
"strconv"
"strings"

"github.com/golang/protobuf/jsonpb"
"github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto"
Expand Down Expand Up @@ -141,3 +142,28 @@ func GetEnvAsBool(key string, fallback bool) bool {

return fallback
}

type SslKakfa struct {
ClientCertFile string
ClientKeyFile string
CACertFile string
ClientKeyPass string
}

func (o SslKakfa) String() string {
return "SslKakfa"
}

func GetKafkaSecurityProtocol() string {
return strings.ToUpper(GetEnv("KAFKA_SECURITY_PROTOCOL", ""))
}

func GetSslElements() *SslKakfa {
sslElements := SslKakfa{
ClientCertFile: GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""),
ClientKeyFile: GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""),
CACertFile: GetEnv("KAFKA_SSL_CA_CERT_FILE", ""),
ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
}
return &sslElements
}
7 changes: 7 additions & 0 deletions executor/api/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,10 @@ func TestInjectRouteSeldonJson(t *testing.T) {

g.Expect(routes).To(Equal(testRouting))
}

func TestSSLSecurityProtocol(t *testing.T) {
g := NewGomegaWithT(t)
os.Setenv("KAFKA_SECURITY_PROTOCOL", "ssl")
val := GetKafkaSecurityProtocol()
g.Expect(val).To(Equal("SSL"))
}
4 changes: 2 additions & 2 deletions executor/logger/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package logger

import (
"github.com/go-logr/logr"
"os"

"github.com/go-logr/logr"
)

const (
Expand All @@ -25,7 +26,6 @@ func StartDispatcher(nworkers int, logBufferSize int, writeTimeoutMs int, log lo

workQueue = make(chan LogRequest, logBufferSize)
writeTimeoutMilliseconds = writeTimeoutMs

// Now, create all of our workers.
for i := 0; i < nworkers; i++ {
log.Info("Starting", "worker", i+1)
Expand Down
20 changes: 15 additions & 5 deletions executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/api/util"
)

const (
Expand All @@ -37,10 +38,20 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri
var err error
if kafkaBroker != "" {
log.Info("Creating producer", "broker", kafkaBroker, "topic", kafkaTopic)
producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBroker,
var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": kafkaBroker,
"go.delivery.reports": false, // Need this othewise will get memory leak
})
}
log.Info("kafkaSecurityProtocol", "kafkaSecurityProtocol", util.GetKafkaSecurityProtocol())
if util.GetKafkaSecurityProtocol() == "SSL" {
sslKafka := util.GetSslElements()
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile
producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile
producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any
}

producer, err = kafka.NewProducer(&producerConfigMap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -94,7 +105,6 @@ func getCEType(logReq LogRequest) (string, error) {
}

func (w *Worker) sendKafkaEvent(logReq LogRequest) error {

reqType, err := getCEType(logReq)
if err != nil {
return err
Expand All @@ -109,7 +119,7 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error {
{Key: NamespaceAttr, Value: []byte(w.Namespace)},
{Key: EndpointAttr, Value: []byte(w.PredictorName)},
}

w.Log.Info("kafkaHeaders is", "kafkaHeaders", kafkaHeaders)
err = w.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &w.KafkaTopic, Partition: kafka.PartitionAny},
Value: *logReq.Bytes,
Expand Down
Loading

0 comments on commit e99888f

Please sign in to comment.