Skip to content

Commit

Permalink
resource_syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyd1988 committed Sep 13, 2023
1 parent 376e916 commit 9b72958
Showing 1 changed file with 113 additions and 120 deletions.
233 changes: 113 additions & 120 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package syncer
import (
"context"
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"reflect"
"strings"
"sync"
Expand All @@ -37,9 +38,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -48,7 +47,6 @@ import (
)

const OrigNamespaceLabelKey = "submariner-io/originatingNamespace"

var ErrResourceNotsupported = "could not find the requested resource"
var EndpointSliceVersions = []string{"v1", "v1beta1"}
var EndpointSliceKind = "EndpointSlice"
Expand All @@ -66,18 +64,16 @@ const (
)

func (d SyncDirection) String() string {
s := "unknown"

switch d {
case LocalToRemote:
s = "localToRemote"
return "localToRemote"
case RemoteToLocal:
s = "remoteToLocal"
return "remoteToLocal"
case None:
s = "none"
return "none"
}

return s
return "unknown"
}

type Operation int
Expand All @@ -89,18 +85,16 @@ const (
)

func (o Operation) String() string {
s := "unknown"

switch o {
case Create:
s = "create"
return "create"
case Update:
s = "update"
return "update"
case Delete:
s = "delete"
return "delete"
}

return s
return "unknown"
}

const (
Expand Down Expand Up @@ -274,10 +268,6 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error {

go func() {
defer func() {
if r.config.SyncCounterOpts != nil {
prometheus.Unregister(r.syncCounter)
}

r.stopped <- struct{}{}
r.log.V(log.LIBDEBUG).Infof("Syncer %q stopped", r.config.Name)
}()
Expand Down Expand Up @@ -315,122 +305,99 @@ func (r *resourceSyncer) GetResource(name, namespace string) (runtime.Object, bo
return nil, false, nil
}

return r.mustConvert(obj.(*unstructured.Unstructured)), true, nil
}

func (r *resourceSyncer) RequeueResource(name, namespace string) {
obj, exists, err := r.store.GetByKey(namespace + "/" + name)
converted, err := r.convert(obj.(*unstructured.Unstructured))
if err != nil {
r.log.Errorf(err, "Error retrieving resource - unable to requeue")
return
return nil, false, err
}

if exists {
r.onCreate(obj)
}
}

func (r *resourceSyncer) ListResources() []runtime.Object {
return r.ListResourcesBySelector(k8slabels.Everything())
return converted, true, nil
}

func (r *resourceSyncer) ListResourcesBySelector(selector k8slabels.Selector) []runtime.Object {
var retObjects []runtime.Object

return r.runIfCacheSynced(retObjects, func() any {
list := r.store.List()
retObjects := make([]runtime.Object, 0, len(list))
func (r *resourceSyncer) ListResources() ([]runtime.Object, error) {
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
return nil, fmt.Errorf("failed to wait for informer cache to sync")
}

for _, obj := range list {
if !selector.Matches(k8slabels.Set(resourceUtil.MustToMeta(obj).GetLabels())) {
continue
}
list := r.store.List()
retObjects := make([]runtime.Object, 0, len(list))

retObjects = append(retObjects, r.mustConvert(obj.(*unstructured.Unstructured)))
for _, obj := range list {
converted, err := r.convert(obj.(*unstructured.Unstructured))
if err != nil {
return nil, err
}

return retObjects
}).([]runtime.Object)
retObjects = append(retObjects, converted)
}

return retObjects, nil
}

func (r *resourceSyncer) Reconcile(resourceLister func() []runtime.Object) {
go func() {
_ = r.runIfCacheSynced(nil, func() any {
r.doReconcile(resourceLister)
return nil
})
}()
}

func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) {
resourceType := reflect.TypeOf(r.config.ResourceType)

for _, resource := range resourceLister() {
if reflect.TypeOf(resource) != resourceType {
// This would happen if the custom transform function returned a different type. We would need a custom
// reverse transform function to handle this. Possible future work, for now bail.
r.log.Warningf("Unable to reconcile type %T - expected type %v", resource, resourceType)
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
r.log.Error(nil, "Unable to reconcile - failed to wait for informer cache to sync")
return
}

metaObj := resourceUtil.MustToMeta(resource)
clusterID, found := getClusterIDLabel(resource)
ns := r.config.SourceNamespace
resourceType := reflect.TypeOf(r.config.ResourceType)

switch r.config.Direction {
case None:
ns = metaObj.GetNamespace()
case RemoteToLocal:
if !found || clusterID == r.config.LocalClusterID {
continue
}
case LocalToRemote:
if clusterID != r.config.LocalClusterID {
continue
for _, resource := range resourceLister() {
if reflect.TypeOf(resource) != resourceType {
// This would happen if the custom transform function returned a different type. We would need a custom
// reverse transform function to handle this. Possible future work, for now bail.
r.log.Warningf("Unable to reconcile type %T - expected type %T", resource, resourceType)
return
}

labels := metaObj.GetLabels()
delete(labels, federate.ClusterIDLabelKey)
metaObj.SetLabels(labels)
metaObj := resourceUtil.MustToMeta(resource)
clusterID, found := getClusterIDLabel(resource)
ns := r.config.SourceNamespace

if ns == metav1.NamespaceAll {
ns = metaObj.GetLabels()[OrigNamespaceLabelKey]
}
}

if ns == "" {
r.log.Warningf("Unable to reconcile resource %s/%s - cannot determine originating namespace",
metaObj.GetNamespace(), metaObj.GetName())
continue
}
switch r.config.Direction {
case None:
ns = metaObj.GetNamespace()
case RemoteToLocal:
if !found || clusterID == r.config.LocalClusterID {
continue
}
case LocalToRemote:
if clusterID != r.config.LocalClusterID {
continue
}

metaObj.SetNamespace(ns)
labels := metaObj.GetLabels()
delete(labels, federate.ClusterIDLabelKey)
metaObj.SetLabels(labels)

key, _ := cache.MetaNamespaceKeyFunc(resource)
if ns == metav1.NamespaceAll {
ns = metaObj.GetLabels()[OrigNamespaceLabelKey]
}
}

_, exists, _ := r.store.GetByKey(key)
if exists {
continue
}
if ns == "" {
r.log.Warningf("Unable to reconcile resource %s/%s - cannot determine originating namespace",
metaObj.GetNamespace(), metaObj.GetName())
continue
}

obj := resourceUtil.MustToUnstructuredUsingScheme(resource, r.config.Scheme)
r.deleted.Store(key, obj)
r.workQueue.Enqueue(obj)
}
}
metaObj.SetNamespace(ns)

func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any {
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
// This means the cache was stopped.
r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name)
key, _ := cache.MetaNamespaceKeyFunc(resource)

return defaultReturn
}
_, exists, _ := r.store.GetByKey(key)
if exists {
continue
}

return run()
obj, _ := resourceUtil.ToUnstructured(resource)
r.deleted.Store(key, obj)
r.workQueue.Enqueue(obj)
}
}()
}

func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) {
func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) {
obj, exists, err := r.store.GetByKey(key)
if err != nil {
return true, errors.Wrapf(err, "error retrieving resource %q", key)
Expand Down Expand Up @@ -466,7 +433,6 @@ func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) {
r.log.V(log.LIBDEBUG).Infof("Syncer %q syncing resource %q", r.config.Name, resource.GetName())

err = r.config.Federator.Distribute(resource)

// yd modify
utilruntime.HandleError(fmt.Errorf("#processNextWorkItem,resource.GetKind():%v, err:%w;", resource.GetKind(), err))
if err != nil && strings.EqualFold(resource.GetKind(), EndpointSliceKind) && strings.Contains(err.Error(), ErrResourceNotsupported) {
Expand All @@ -482,6 +448,7 @@ func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) {
}
}


if err != nil || r.onSuccessfulSync(resource, transformed, op) {
return true, errors.Wrapf(err, "error distributing resource %q", key)
}
Expand Down Expand Up @@ -559,12 +526,20 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {
return requeue, nil
}

func (r *resourceSyncer) mustConvert(from interface{}) runtime.Object {
func (r *resourceSyncer) convertNoError(from interface{}) runtime.Object {
converted, err := r.convert(from)
if err != nil {
r.log.Error(err, "Unable to convert: %#v", from)
}

return converted
}

func (r *resourceSyncer) convert(from interface{}) (runtime.Object, error) {
converted := r.config.ResourceType.DeepCopyObject()
err := r.config.Scheme.Convert(from, converted, nil)
utilruntime.Must(err)

return converted
return converted, errors.Wrapf(err, "Syncer %q: error converting %#v to %T", r.config.Name, from, r.config.ResourceType)
}

//nolint:interfacer //false positive for "`from` can be `k8s.io/apimachinery/pkg/runtime.Object`" as it returns 'from' as Unstructured
Expand All @@ -577,15 +552,22 @@ func (r *resourceSyncer) transform(from *unstructured.Unstructured, key string,

clusterID, _ := getClusterIDLabel(from)

converted := r.mustConvert(from)
converted := r.convertNoError(from)
if converted == nil {
return nil, nil, false
}

transformed, requeue := r.config.Transform(converted, r.workQueue.NumRequeues(key), op)
if transformed == nil {
r.log.V(log.LIBDEBUG).Infof("Syncer %q: transform function returned nil - not syncing - requeue: %v", r.config.Name, requeue)
return nil, nil, requeue
}

result := resourceUtil.MustToUnstructuredUsingScheme(transformed, r.config.Scheme)
result, err := resourceUtil.ToUnstructured(transformed)
if err != nil {
r.log.Errorf(err, "Syncer %q: error converting transform function result", r.config.Name)
return nil, nil, false
}

// Preserve the cluster ID label
if clusterID != "" {
Expand All @@ -601,7 +583,10 @@ func (r *resourceSyncer) onSuccessfulSync(resource, converted runtime.Object, op
}

if converted == nil {
converted = r.mustConvert(resource)
converted = r.convertNoError(resource)
if converted == nil {
return false
}
}

r.log.V(log.LIBTRACE).Infof("Syncer %q: invoking OnSuccessfulSync function with: %#v", r.config.Name, converted)
Expand Down Expand Up @@ -638,13 +623,21 @@ func (r *resourceSyncer) onUpdate(oldObj, newObj interface{}) {
}

func (r *resourceSyncer) onDelete(obj interface{}) {
switch t := obj.(type) {
case cache.DeletedFinalStateUnknown:
obj = t.Obj
default:
}
var resource *unstructured.Unstructured
var ok bool
if resource, ok = obj.(*unstructured.Unstructured); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
r.log.Errorf(nil, "Syncer %q: could not convert object %v to DeletedFinalStateUnknown", r.config.Name, obj)
return
}

resource := r.assertUnstructured(obj)
resource, ok = tombstone.Obj.(*unstructured.Unstructured)
if !ok {
r.log.Errorf(nil, "Syncer %q: could not convert object tombstone %v to Unstructured", r.config.Name, tombstone.Obj)
return
}
}

if !r.shouldProcess(resource, Delete) {
return
Expand Down Expand Up @@ -701,4 +694,4 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr
func getClusterIDLabel(resource runtime.Object) (string, bool) {
clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey]
return clusterID, found
}
}

0 comments on commit 9b72958

Please sign in to comment.