Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-974 Implement SASL for Kafka (producer+consumer) #424

Merged
merged 2 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
Expand Down
17 changes: 9 additions & 8 deletions pkg/api/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package api

type EncodeKafka struct {
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
SASL *SASLConfig `yaml:"sasl" json:"sasl" doc:"SASL configuration (optional)"`
}

type KafkaEncodeBalancerEnum struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type enums struct {
TransformFilterOperationEnum TransformFilterOperationEnum
TransformGenericOperationEnum TransformGenericOperationEnum
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
SASLTypeEnum SASLTypeEnum
ConnTrackOperationEnum ConnTrackOperationEnum
ConnTrackOutputRecordTypeEnum ConnTrackOutputRecordTypeEnum
DecoderEnum DecoderEnum
Expand Down
25 changes: 13 additions & 12 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package api

type IngestKafka struct {
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
PullQueueCapacity int `yaml:"pullQueueCapacity,omitempty" json:"pullQueueCapacity,omitempty" doc:"the capacity of the queue use to store pulled flows"`
PullMaxBytes int `yaml:"pullMaxBytes,omitempty" json:"pullMaxBytes,omitempty" doc:"the maximum number of bytes being pulled from kafka"`
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
PullQueueCapacity int `yaml:"pullQueueCapacity,omitempty" json:"pullQueueCapacity,omitempty" doc:"the capacity of the queue use to store pulled flows"`
PullMaxBytes int `yaml:"pullMaxBytes,omitempty" json:"pullMaxBytes,omitempty" doc:"the maximum number of bytes being pulled from kafka"`
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
SASL *SASLConfig `yaml:"sasl" json:"sasl" doc:"SASL configuration (optional)"`
}
16 changes: 16 additions & 0 deletions pkg/api/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package api

type SASLConfig struct {
Type string
ClientIDPath string `yaml:"clientIDPath,omitempty" json:"clientIDPath,omitempty" doc:"path to the client ID / SASL username"`
ClientSecretPath string `yaml:"clientSecretPath,omitempty" json:"clientSecretPath,omitempty" doc:"path to the client secret / SASL password"`
}

type SASLTypeEnum struct {
Plain string `yaml:"plain" json:"plain" doc:"Plain SASL"`
ScramSHA512 string `yaml:"scramSHA512" json:"scramSHA512" doc:"SCRAM/SHA512 SASL"`
}

func SASLTypeName(operation string) string {
return GetEnumName(SASLTypeEnum{}, operation)
}
2 changes: 1 addition & 1 deletion pkg/api/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ func (c *ClientTLS) Build() (*tls.Config, error) {
}
return tlsConfig, nil
}
return nil, nil
return tlsConfig, nil
}
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.JSONEq(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"},"tls":{"insecureSkipVerify":true,"caCertPath":"/ca.crt"}}}}`, string(b))
require.JSONEq(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"},"sasl":null,"tls":{"insecureSkipVerify":true,"caCertPath":"/ca.crt"}}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
Expand Down
12 changes: 10 additions & 2 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/kafka-go"
kafkago "github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -95,7 +95,7 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E
writeTimeoutSecs = config.WriteTimeout
}

transport := kafka.Transport{}
transport := kafkago.Transport{}
if config.TLS != nil {
log.Infof("Using TLS configuration: %v", config.TLS)
tlsConfig, err := config.TLS.Build()
Expand All @@ -105,6 +105,14 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E
transport.TLS = tlsConfig
}

if config.SASL != nil {
m, err := utils.SetupSASLMechanism(config.SASL)
if err != nil {
return nil, err
}
transport.SASL = m
}

// connect to the kafka server
kafkaWriter := kafkago.Writer{
Addr: kafkago.TCP(config.Address),
Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I
dialer.TLS = tlsConfig
}

if jsonIngestKafka.SASL != nil {
m, err := utils.SetupSASLMechanism(jsonIngestKafka.SASL)
if err != nil {
return nil, err
}
dialer.SASLMechanism = m
}

readerConfig := kafkago.ReaderConfig{
Brokers: jsonIngestKafka.Brokers,
Topic: jsonIngestKafka.Topic,
Expand Down
40 changes: 40 additions & 0 deletions pkg/pipeline/utils/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package utils

import (
"fmt"
"os"
"strings"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

func SetupSASLMechanism(cfg *api.SASLConfig) (sasl.Mechanism, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Since the function is tied to the kafka-go type sasl.Mechanism may be it should be in the kafka ingest file.

// Read client ID
id, err := os.ReadFile(cfg.ClientIDPath)
if err != nil {
return nil, err
}
strId := strings.TrimSpace(string(id))
// Read password
pwd, err := os.ReadFile(cfg.ClientSecretPath)
if err != nil {
return nil, err
}
strPwd := strings.TrimSpace(string(pwd))
var mechanism sasl.Mechanism
switch cfg.Type {
case api.SASLTypeName("Plain"):
mechanism = plain.Mechanism{Username: strId, Password: strPwd}
case api.SASLTypeName("ScramSHA512"):
mechanism, err = scram.Mechanism(scram.SHA512, strId, strPwd)
default:
return nil, fmt.Errorf("Unknown SASL type: %s", cfg.Type)
}
if err != nil {
return nil, err
}
return mechanism, nil
}
30 changes: 30 additions & 0 deletions vendor/github.com/segmentio/kafka-go/sasl/plain/plain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
11 changes: 11 additions & 0 deletions vendor/github.com/xdg/scram/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading