From d38d62cf904d1a3c9664985ed58c343afc721e3e Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 14 Dec 2016 16:51:48 +0100 Subject: [PATCH] case insensitive hostname comparison in kafka broker matching - re-use common.LocalIPAddrs in partition module for resolving IPs - add missing net.IPAddr type switch to common.LocalIPAddrs - update matching to extract addresses early on using strings.ToLower => ensure case insensitive matching by lowercasing --- CHANGELOG.asciidoc | 1 + libbeat/common/net.go | 24 +++- .../module/kafka/partition/partition.go | 117 ++++++++---------- 3 files changed, 70 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 8ca838878b6..20794c2d66c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Metricbeat* - Fix service times-out at startup. {pull}3056[3056] +- Kafka module case sensitive host name matching. {pull}3193[3193] *Packetbeat* diff --git a/libbeat/common/net.go b/libbeat/common/net.go index 9c9645cc22d..0d258046226 100644 --- a/libbeat/common/net.go +++ b/libbeat/common/net.go @@ -8,15 +8,29 @@ import ( // LocalIPAddrs finds the IP addresses of the hosts on which // the shipper currently runs on. func LocalIPAddrs() ([]net.IP, error) { - var localIPAddrs = []net.IP{} + var localIPAddrs []net.IP ipaddrs, err := net.InterfaceAddrs() if err != nil { - return []net.IP{}, err + return nil, err } - for _, ipaddr := range ipaddrs { - if ipnet, ok := ipaddr.(*net.IPNet); ok { - localIPAddrs = append(localIPAddrs, ipnet.IP) + for _, addr := range ipaddrs { + var ip net.IP + ok := true + + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + default: + ok = false } + + if !ok { + continue + } + + localIPAddrs = append(localIPAddrs, ip) } return localIPAddrs, nil } diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index ae0c0e6f0e0..6cdd8f02d85 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -97,7 +97,7 @@ func (m *MetricSet) connect() (*sarama.Broker, error) { return nil, err } - other := findMatchingBroker(b.Addr(), meta.Brokers) + other := findMatchingBroker(brokerAddress(b), meta.Brokers) if other == nil { // no broker found closeBroker(b) return nil, fmt.Errorf("No advertised broker with address %v found", b.Addr()) @@ -328,13 +328,22 @@ func findMatchingBroker( addr string, brokers []*sarama.Broker, ) *sarama.Broker { + lst := brokerAddresses(brokers) + if idx, found := findMatchingAddress(addr, lst); found { + return brokers[idx] + } + return nil +} + +func findMatchingAddress( + addr string, + brokers []string, +) (int, bool) { debugf("Try to match broker to: %v", addr) // compare connection address to list of broker addresses - for _, b := range brokers { - if b.Addr() == addr { - return b - } + if i, found := indexOf(addr, brokers); found { + return i, true } // get connection 'port' @@ -344,9 +353,9 @@ func findMatchingBroker( } // lookup local machines ips for comparing with broker addresses - localIPs, err := interfaceIPs() + localIPs, err := common.LocalIPAddrs() if err != nil || len(localIPs) == 0 { - return nil + return -1, false } debugf("local machine ips: %v", localIPs) @@ -356,9 +365,8 @@ func findMatchingBroker( debugf("local machine addresses: %v", localHosts) for _, host := range localHosts { debugf("try to match with fqdn: %v (%v)", host, port) - - if b := findBroker(host, port, brokers); b != nil { - return b + if i, found := indexOf(net.JoinHostPort(host, port), brokers); found { + return i, true } } @@ -367,16 +375,17 @@ func findMatchingBroker( if host, err := os.Hostname(); err == nil { debugf("try to match with hostname only: %v (%v)", host, port) - if b := findBroker(host, port, brokers); b != nil { - return b + tmp := net.JoinHostPort(strings.ToLower(host), port) + if i, found := indexOf(tmp, brokers); found { + return i, true } } // lookup ips for all brokers debugf("match by ips") - for _, b := range brokers { - debugf("test broker address: %v", b.Addr()) - bh, bp, err := net.SplitHostPort(b.Addr()) + for i, b := range brokers { + debugf("test broker address: %v", b) + bh, bp, err := net.SplitHostPort(b) if err != nil { continue } @@ -396,59 +405,12 @@ func findMatchingBroker( debugf("broker (%v) ips: %v", bh, ips) // check if ip is known - if ipsMatch(ips, localIPs) { - return b - } - } - - return nil -} - -func findBroker(host, port string, brokers []*sarama.Broker) *sarama.Broker { - for _, b := range brokers { - debugf("test broker address: %v", b.Addr()) - - bh, bp, err := net.SplitHostPort(b.Addr()) - if err != nil { - debugf("failed to parse broker address: %v", err) - continue - } - - if bh == host && port == bp { - return b + if anyIPsMatch(ips, localIPs) { + return i, true } } - return nil -} - -func interfaceIPs() ([]net.IP, error) { - var ips []net.IP - addrs, err := net.InterfaceAddrs() - if err != nil { - return nil, err - } - for _, addr := range addrs { - var ip net.IP - ok := true - - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - default: - debugf("non ip address: %v", addr) - ok = false - } - - if !ok { - continue - } - - ips = append(ips, ip) - } - return ips, nil + return -1, false } func lookupHosts(ips []net.IP) []string { @@ -466,7 +428,7 @@ func lookupHosts(ips []net.IP) []string { } for _, host := range hosts { - h := strings.TrimSuffix(host, ".") + h := strings.ToLower(strings.TrimSuffix(host, ".")) set[h] = struct{}{} } } @@ -478,7 +440,7 @@ func lookupHosts(ips []net.IP) []string { return hosts } -func ipsMatch(as, bs []net.IP) bool { +func anyIPsMatch(as, bs []net.IP) bool { for _, a := range as { for _, b := range bs { if bytes.Equal(a, b) { @@ -488,3 +450,24 @@ func ipsMatch(as, bs []net.IP) bool { } return false } + +func brokerAddresses(brokers []*sarama.Broker) []string { + addresses := make([]string, len(brokers)) + for i, b := range brokers { + addresses[i] = brokerAddress(b) + } + return addresses +} + +func brokerAddress(b *sarama.Broker) string { + return strings.ToLower(b.Addr()) +} + +func indexOf(s string, lst []string) (int, bool) { + for i, v := range lst { + if s == v { + return i, true + } + } + return -1, false +}