-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
136 lines (109 loc) · 3.16 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
secrets "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type (
controller struct {
kubeClient kubernetes.Interface
metricsHandler *prometheusHandler
secretLister listers.SecretLister
secretsSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
queueEntry struct {
key string
reason string
}
)
const (
updateReasonAdd = "add"
updateReasonDelete = "delete"
updateReasonUpdate = "update"
)
func newController(
kubeClient kubernetes.Interface,
informer informers.SecretInformer,
metricsHandler *prometheusHandler,
) *controller {
c := &controller{
kubeClient: kubeClient,
metricsHandler: metricsHandler,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"),
}
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { c.enqueue(obj, updateReasonAdd) },
DeleteFunc: func(obj any) { c.enqueue(obj, updateReasonDelete) },
UpdateFunc: func(_, obj any) { c.enqueue(obj, updateReasonUpdate) },
})
c.secretLister = informer.Lister()
c.secretsSynced = informer.Informer().HasSynced
return c
}
func (c *controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
logrus.Info("starting cert-monitor controller")
defer logrus.Info("shutting down cert-monitor controller")
if !cache.WaitForCacheSync(stopCh, c.secretsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (c *controller) enqueue(obj any, reason string) {
if _, ok := obj.(*secrets.Secret); !ok {
// Sanity-check, this should not happen but we check this nevertheless
utilruntime.HandleError(errors.New("object received was not core/v1.Secret"))
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(errors.Wrap(err, "getting key for object")) // We don't log the object for security reasons: It's a secret!
return
}
logrus.WithFields(logrus.Fields{
"key": key,
"reason": reason,
}).Info("enqueing secret for scan")
c.queue.Add(&queueEntry{key: key, reason: reason})
}
func (c *controller) processNextWorkItem() bool {
qei, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(qei)
qe, ok := qei.(*queueEntry)
if !ok {
// However this happened: Queue did not contain valid entry
utilruntime.HandleError(errors.Errorf("queue entry had wrong type %t", qei))
return true
}
if err := c.scan(qe); err != nil {
c.queue.AddRateLimited(qei)
utilruntime.HandleError(errors.Wrapf(err, "scan for %q failed", qe))
return true
}
c.queue.Forget(qei)
return true
}
func (c *controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (q queueEntry) String() string {
return strings.Join([]string{q.reason, q.key}, "::")
}