Skip to content
This repository has been archived by the owner on Sep 19, 2018. It is now read-only.

Commit

Permalink
Merge pull request #14 from utilitywarehouse/opt
Browse files Browse the repository at this point in the history
optimisations and improvements
  • Loading branch information
alkar committed Nov 16, 2017
2 parents da0d2c4 + 26eb4f2 commit bee792b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 95 deletions.
40 changes: 14 additions & 26 deletions ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,34 @@ import (
type eventHandlerFunc func(eventType watch.EventType, oldIngress *v1beta1.Ingress, newIngress *v1beta1.Ingress)

type ingressWatcher struct {
client kubernetes.Interface
eventHandler eventHandlerFunc
resyncPeriod time.Duration
stopChannel chan struct{}
client kubernetes.Interface
eventHandler eventHandlerFunc
resyncPeriod time.Duration
labelSelector string
stopChannel chan struct{}
}

func newIngressWatcher(client kubernetes.Interface, eventHandler eventHandlerFunc, resyncPeriod time.Duration) *ingressWatcher {
func newIngressWatcher(client kubernetes.Interface, eventHandler eventHandlerFunc, labelSelector string, resyncPeriod time.Duration) *ingressWatcher {
return &ingressWatcher{
client: client,
eventHandler: eventHandler,
resyncPeriod: resyncPeriod,
stopChannel: make(chan struct{}),
client: client,
eventHandler: eventHandler,
resyncPeriod: resyncPeriod,
labelSelector: labelSelector,
stopChannel: make(chan struct{}),
}
}

func (iw *ingressWatcher) Start() {
lw := &cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = iw.labelSelector
return iw.client.Extensions().Ingresses(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = iw.labelSelector
return iw.client.Extensions().Ingresses(v1.NamespaceAll).Watch(options)
},
}

eh := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
iw.eventHandler(watch.Added, nil, obj.(*v1beta1.Ingress))
Expand All @@ -51,20 +54,9 @@ func (iw *ingressWatcher) Start() {
iw.eventHandler(watch.Deleted, obj.(*v1beta1.Ingress), nil)
},
}

_, controller := cache.NewInformer(lw, &v1beta1.Ingress{}, iw.resyncPeriod, eh)
log.Println("[INFO] starting ingress watcher")

// TODO: change controller startup when it's fixed upstream
// The controller is started like so: wait.Until(c.processLoop, time.Second, stopCh)
// However, the processLoop function does not ever return which means that
// `wait.Until()` is unable to exit cleanly when `stopCh` is closed.
// Closing `stopCh`, however, will stop the controller from processing
// events since the internal reflector is stopped properly. This is why the
// controller is started in a go func below.
go controller.Run(iw.stopChannel)
<-iw.stopChannel

controller.Run(iw.stopChannel)
log.Println("[INFO] ingress watcher stopped")
}

Expand All @@ -75,21 +67,17 @@ func (iw *ingressWatcher) Stop() {

func getHostnamesFromIngress(ingress *v1beta1.Ingress) []string {
hostnames := []string{}

for _, rule := range ingress.Spec.Rules {
found := false

for _, h := range hostnames {
if h == rule.Host {
found = true
break
}
}

if !found {
hostnames = append(hostnames, rule.Host)
}
}

return hostnames
}
2 changes: 1 addition & 1 deletion ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestIngressWatcher(t *testing.T) {
pM.Lock()
processed = append(processed, testIngressEvent{t, o, n})
pM.Unlock()
}, 0)
}, "", 0)

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
75 changes: 27 additions & 48 deletions registrator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -81,37 +80,27 @@ func newRegistratorWithOptions(options registratorOptions) (*registrator, error)
if len(options.Targets) == 0 || options.Route53ZoneID == "" || options.TargetLabelName == "" {
return nil, errRegistratorMissingOption
}

var sats []selectorAndTarget
for _, target := range options.Targets {
var sb bytes.Buffer
sb.WriteString(options.TargetLabelName)
sb.WriteString("=")
sb.WriteString(target)

s, err := labels.Parse(sb.String())
s, err := labels.Parse(options.TargetLabelName + "=" + target)
if err != nil {
return nil, err
}
sats = append(sats, selectorAndTarget{Selector: s, Target: target})
}

if options.AWSSessionOptions == nil {
options.AWSSessionOptions = &session.Options{}
}

if options.KubernetesConfig == nil {
c, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
options.KubernetesConfig = c
}

if options.ResyncPeriod == 0 {
options.ResyncPeriod = defaultResyncPeriod
}

return &registrator{
options: options,
sats: sats,
Expand All @@ -130,62 +119,61 @@ func (r *registrator) Start() error {
}
r.dnsZone = dns
log.Println("[INFO] setup route53 session")

kubeClient, err := kubernetes.NewForConfig(r.options.KubernetesConfig)
if err != nil {
return err
}
r.ingressWatcher = newIngressWatcher(kubeClient, r.handler, r.options.ResyncPeriod)
r.ingressWatcher = newIngressWatcher(kubeClient, r.handler, r.options.TargetLabelName, r.options.ResyncPeriod)
log.Println("[INFO] setup kubernetes ingress watcher")

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
r.processUpdateQueue()
}()

r.ingressWatcher.Start()

wg.Wait()

return nil
}

func (r *registrator) handler(eventType watch.EventType, oldIngress *v1beta1.Ingress, newIngress *v1beta1.Ingress) {
log.Printf("[DEBUG] received %s event", eventType)
switch eventType {
case watch.Added:
log.Printf("[DEBUG] received %s event for %s", eventType, newIngress.Name)
hostnames := getHostnamesFromIngress(newIngress)
target := r.getTargetForIngress(newIngress)
metricUpdatesReceived.WithLabelValues(newIngress.Name, "add").Inc()
if len(hostnames) > 0 && target != "" {
log.Printf("[DEBUG] queued update of %d records for ingress %s, pointing to %s", len(hostnames), newIngress.Name, target)
log.Printf("[DEBUG] queued update of %d record(s) for ingress %s, pointing to %s", len(hostnames), newIngress.Name, target)
r.queueUpdates(route53.ChangeActionUpsert, hostnames, target)
}
case watch.Modified:
log.Printf("[DEBUG] received %s event for %s -> %s", eventType, oldIngress.Name, newIngress.Name)
newHostnames := getHostnamesFromIngress(newIngress)
newTarget := r.getTargetForIngress(newIngress)
metricUpdatesReceived.WithLabelValues(newIngress.Name, "modify").Inc()
if len(newHostnames) > 0 && newTarget != "" {
log.Printf("[DEBUG] queued update of %d records for ingress %s, pointing to %s", len(newHostnames), newIngress.Name, newTarget)
log.Printf("[DEBUG] queued update of %d record(s) for ingress %s, pointing to %s", len(newHostnames), newIngress.Name, newTarget)
r.queueUpdates(route53.ChangeActionUpsert, newHostnames, newTarget)
}
oldHostnames := getHostnamesFromIngress(oldIngress)
oldTarget := r.getTargetForIngress(oldIngress)
diffHostnames := diffStringSlices(oldHostnames, newHostnames)
if len(diffHostnames) > 0 && oldTarget != "" {
log.Printf("[DEBUG] queued deletion of %d records for ingress %s", len(diffHostnames), oldIngress.Name)
log.Printf("[DEBUG] queued deletion of %d record(s) for ingress %s", len(diffHostnames), oldIngress.Name)
r.queueUpdates(route53.ChangeActionDelete, diffHostnames, oldTarget)
}
case watch.Deleted:
log.Printf("[DEBUG] received %s event for %s", eventType, oldIngress.Name)
hostnames := getHostnamesFromIngress(oldIngress)
target := r.getTargetForIngress(oldIngress)
metricUpdatesReceived.WithLabelValues(oldIngress.Name, "delete").Inc()
if len(hostnames) > 0 && target != "" {
log.Printf("[DEBUG] queued deletion of %d records for ingress %s", len(hostnames), oldIngress.Name)
log.Printf("[DEBUG] queued deletion of %d record(s) for ingress %s", len(hostnames), oldIngress.Name)
r.queueUpdates(route53.ChangeActionDelete, hostnames, target)
}
default:
log.Printf("[DEBUG] received %s event: cannot handle", eventType)
}
}

Expand Down Expand Up @@ -231,14 +219,12 @@ func (r *registrator) applyBatch(changes []cnameChange) {
if len(pruned) == 0 {
return
}

hostnames := make([]string, len(pruned))
for i, p := range pruned {
hostnames[i] = p.Hostname
}

if action == route53.ChangeActionDelete {
log.Printf("[INFO] deleting %d records: %+v", len(pruned), hostnames)
log.Printf("[INFO] deleting %d record(s): %+v", len(pruned), hostnames)
if !*dryRun {
if err := r.DeleteCnames(pruned); err != nil {
log.Printf("[ERROR] error deleting records: %+v", err)
Expand All @@ -250,7 +236,7 @@ func (r *registrator) applyBatch(changes []cnameChange) {
}
}
} else {
log.Printf("[INFO] modifying %d records: %+v", len(pruned), hostnames)
log.Printf("[INFO] modifying %d record(s): %+v", len(pruned), hostnames)
if !*dryRun {
if err := r.UpsertCnames(pruned); err != nil {
log.Printf("[ERROR] error modifying records: %+v", err)
Expand All @@ -270,7 +256,7 @@ func (r *registrator) getTargetForIngress(ingress *v1beta1.Ingress) string {
return sat.Target
}
}
log.Printf("[DEBUG] cannot compute target for ingress '%s': invalid selector", ingress.Name) // XXX
log.Printf("[INFO] invalid ingress target for %s: %s", ingress.Name, ingress.Labels[r.options.TargetLabelName])
return ""
}

Expand All @@ -279,23 +265,28 @@ func (r *registrator) pruneBatch(action string, records []cnameRecord) []cnameRe
for _, u := range records {
if !r.canHandleRecord(u.Hostname) {
metricUpdatesRejected.Inc()
log.Printf("[INFO] cannot handle dns record '%s', will ignore it", u.Hostname)
log.Printf("[INFO] cannot handle dns record %s, will ignore it", u.Hostname)
continue
}

t, err := resolveCname(fmt.Sprintf("%s.", strings.Trim(u.Hostname, ".")), r.ListNameservers())
switch action {
case route53.ChangeActionDelete:
if err != errDNSEmptyAnswer {
log.Printf("[DEBUG] error resolving '%s': %+v, will try to update the record", u.Hostname, err)
if err == nil {
pruned = append(pruned, u)
} else if err != errDNSEmptyAnswer {
log.Printf("[DEBUG] error resolving %s: %+v, will try to delete the record", u.Hostname, err)
pruned = append(pruned, u)
} else {
log.Printf("[DEBUG] %s does not resolve, no-op", u.Hostname)
}
case route53.ChangeActionUpsert:
if err != nil {
log.Printf("[DEBUG] error resolving '%s': %+v, will try to update the record", u.Hostname, err)
log.Printf("[DEBUG] error resolving %s: %+v, will try to update the record", u.Hostname, err)
pruned = append(pruned, u)
} else if strings.Trim(t, ".") != u.Target {
pruned = append(pruned, u)
} else {
log.Printf("[DEBUG] %s resolves correctly, no-op", u.Hostname)
}
}
}
Expand All @@ -308,41 +299,36 @@ func (r *registrator) canHandleRecord(record string) bool {
record = strings.Trim(record, ".")
matches, err := regexp.MatchString(fmt.Sprintf("^[^.]+\\.%s$", strings.Replace(zone, ".", "\\.", -1)), record)
if err != nil {
log.Printf("[DEBUG] regexp match error, will not handle record '%s': %+v", record, err)
log.Printf("[DEBUG] regexp match error, will not handle record %s: %+v", record, err)
return false
}
return matches
}

func resolveCname(name string, nameservers []string) (string, error) {
m := dns.Msg{}
m.SetQuestion(name, dns.TypeCNAME)

var retError error
var retTarget string

for _, nameserver := range nameservers {
r, _, err := dnsClient.Exchange(&m, nameserver)
if err != nil {
retError = err
continue
}

if len(r.Answer) == 0 {
retError = errDNSEmptyAnswer
continue
}

retTarget = r.Answer[0].(*dns.CNAME).Target
retError = nil
break
}

return retTarget, retError
}

func diffStringSlices(a []string, b []string) []string {
ret := []string{}

for _, va := range a {
exists := false
for _, vb := range b {
Expand All @@ -351,43 +337,36 @@ func diffStringSlices(a []string, b []string) []string {
break
}
}

if !exists {
ret = append(ret, va)
}
}

return ret
}

func uniqueRecords(records []cnameRecord) []cnameRecord {
uniqueRecords := []cnameRecord{}
rejectedRecords := []string{}

for i, r1 := range records {
if stringInSlice(r1.Hostname, rejectedRecords) || recordHostnameInSlice(r1.Hostname, uniqueRecords) {
continue
}

duplicates := []cnameRecord{}
for j, r2 := range records {
if i != j && r1.Hostname == r2.Hostname {
duplicates = append(duplicates, r2)
}
}

if recordTargetsAllMatch(r1.Target, duplicates) {
uniqueRecords = append(uniqueRecords, r1)
} else {
rejectedRecords = append(rejectedRecords, r1.Hostname)
}
}

if len(rejectedRecords) > 0 {
metricUpdatesRejected.Add(float64(len(rejectedRecords)))
log.Printf("[INFO] refusing to modify the following records: [%s]: claimed by multiple ingress pointing to different controllers", strings.Join(rejectedRecords, ", "))
log.Printf("[INFO] refusing to modify the following records: [%s]: they are claimed by multiple ingresses but are pointing to different targets", strings.Join(rejectedRecords, ", "))
}

return uniqueRecords
}

Expand Down
Loading

0 comments on commit bee792b

Please sign in to comment.