diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2d986597364..76f8d0eb448 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add document_id setting to decode_json_fields processor. {pull}15859[15859] - Include network information by default on add_host_metadata and add_observer_metadata. {issue}15347[15347] {pull}16077[16077] - Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823] +- Add support for multiple password in redis output. {issue}16058[16058] {pull}16206[16206] *Auditbeat* diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 97a6b9fdafa..fe090dd150c 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -19,6 +19,9 @@ package redis import ( "errors" + "fmt" + "net/url" + "strings" "time" "github.com/elastic/beats/libbeat/beat" @@ -42,6 +45,8 @@ const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second defaultPort = 6379 + redisScheme = "redis" + tlsRedisScheme = "rediss" ) func init() { @@ -110,27 +115,63 @@ func makeRedis( return outputs.Fail(err) } - transp := &transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, - TLS: tls, - Stats: observer, - } - clients := make([]outputs.NetworkClient, len(hosts)) - for i, host := range hosts { - enc, err := codec.CreateEncoder(beat, config.Codec) + for i, h := range hosts { + hasScheme := true + if parts := strings.SplitN(h, "://", 2); len(parts) != 2 { + h = fmt.Sprintf("%s://%s", redisScheme, h) + hasScheme = false + } + + hostUrl, err := url.Parse(h) + if err != nil { + return outputs.Fail(err) + } + + if hostUrl.Host == "" { + return outputs.Fail(fmt.Errorf("invalid redis url host %s", hostUrl.Host)) + } + + if hostUrl.Scheme != redisScheme && hostUrl.Scheme != tlsRedisScheme { + return outputs.Fail(fmt.Errorf("invalid redis url scheme %s", hostUrl.Scheme)) + } + + transp := &transport.Config{ + Timeout: config.Timeout, + Proxy: &config.Proxy, + TLS: tls, + Stats: observer, + } + + switch hostUrl.Scheme { + case redisScheme: + if hasScheme { + transp.TLS = nil // disable TLS if user explicitely set `redis` scheme + } + case tlsRedisScheme: + if transp.TLS == nil { + transp.TLS = &transport.TLSConfig{} // enable with system default if TLS was not configured + } + } + + conn, err := transport.NewClient(transp, "tcp", hostUrl.Host, defaultPort) if err != nil { return outputs.Fail(err) } - conn, err := transport.NewClient(transp, "tcp", host, defaultPort) + pass := config.Password + hostPass, passSet := hostUrl.User.Password() + if passSet { + pass = hostPass + } + + enc, err := codec.CreateEncoder(beat, config.Codec) if err != nil { return outputs.Fail(err) } client := newClient(conn, observer, config.Timeout, - config.Password, config.Db, key, dataType, config.Index, enc) + pass, config.Db, key, dataType, config.Index, enc) clients[i] = newBackoffClient(client, config.Backoff.Init, config.Backoff.Max) } diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index 5d5a5df9f0c..ef691f25d44 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -53,7 +53,7 @@ const ( ) func TestPublishListTCP(t *testing.T) { - key := "test_publist_tcp" + key := "test_publish_tcp" db := 0 redisConfig := map[string]interface{}{ "hosts": []string{getRedisAddr()}, @@ -67,7 +67,7 @@ func TestPublishListTCP(t *testing.T) { } func TestPublishListTLS(t *testing.T) { - key := "test_publist_tls" + key := "test_publish_tls" db := 0 redisConfig := map[string]interface{}{ "hosts": []string{getSRedisAddr()}, @@ -85,6 +85,44 @@ func TestPublishListTLS(t *testing.T) { testPublishList(t, redisConfig) } +func TestWithSchema(t *testing.T) { + redisURL := "redis://" + getRedisAddr() + sredisURL := "rediss://" + getSRedisAddr() + + cases := map[string]struct { + host string + }{ + "redis ignores ssl settings": { + host: redisURL, + }, + "sredis schema sends via tls": { + host: sredisURL, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + key := "test_publish_tls" + db := 0 + redisConfig := map[string]interface{}{ + "hosts": []string{test.host}, + "key": key, + "db": db, + "datatype": "list", + "timeout": "5s", + + "ssl.verification_mode": "full", + "ssl.certificate_authorities": []string{ + "../../../testing/environments/docker/sredis/pki/tls/certs/sredis.crt", + }, + } + + testPublishList(t, redisConfig) + }) + } + +} + func testPublishList(t *testing.T, cfg map[string]interface{}) { batches := 100 batchSize := 1000 diff --git a/libbeat/outputs/redis/redis_test.go b/libbeat/outputs/redis/redis_test.go new file mode 100644 index 00000000000..387c6b363c4 --- /dev/null +++ b/libbeat/outputs/redis/redis_test.go @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" + _ "github.com/elastic/beats/libbeat/outputs/codec/json" +) + +type checker func(*testing.T, outputs.Group) + +func checks(cs ...checker) checker { + return func(t *testing.T, g outputs.Group) { + for _, c := range cs { + c(t, g) + } + } +} + +func clientsLen(required int) checker { + return func(t *testing.T, group outputs.Group) { + assert.Len(t, group.Clients, required) + } +} + +func clientPassword(index int, pass string) checker { + return func(t *testing.T, group outputs.Group) { + redisClient := group.Clients[index].(*backoffClient) + assert.Equal(t, redisClient.client.password, pass) + } +} + +func TestMakeRedis(t *testing.T) { + tests := map[string]struct { + config map[string]interface{} + valid bool + checks checker + }{ + "no host": { + config: map[string]interface{}{ + "hosts": []string{}, + }, + }, + "invald scheme": { + config: map[string]interface{}{ + "hosts": []string{"redisss://localhost:6379"}, + }, + }, + "Single host": { + config: map[string]interface{}{ + "hosts": []string{"localhost:6379"}, + }, + valid: true, + checks: checks(clientsLen(1), clientPassword(0, "")), + }, + "Multiple hosts": { + config: map[string]interface{}{ + "hosts": []string{"redis://localhost:6379", "rediss://localhost:6380"}, + }, + valid: true, + checks: clientsLen(2), + }, + "Default password": { + config: map[string]interface{}{ + "hosts": []string{"redis://localhost:6379"}, + "password": "defaultPassword", + }, + valid: true, + checks: checks(clientsLen(1), clientPassword(0, "defaultPassword")), + }, + "Specific and default password": { + config: map[string]interface{}{ + "hosts": []string{"redis://localhost:6379", "rediss://:mypassword@localhost:6380"}, + "password": "defaultPassword", + }, + valid: true, + checks: checks( + clientsLen(2), + clientPassword(0, "defaultPassword"), + clientPassword(1, "mypassword"), + ), + }, + } + beatInfo := beat.Info{Beat: "libbeat", Version: "1.2.3"} + for name, test := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := common.NewConfigFrom(test.config) + assert.NoError(t, err) + groups, err := makeRedis(nil, beatInfo, outputs.NewNilObserver(), cfg) + assert.Equal(t, err == nil, test.valid) + if err != nil && test.valid { + t.Log(err) + } + if test.checks != nil { + test.checks(t, groups) + } + }) + } +}