Skip to content

Commit

Permalink
该代码
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyd1988 committed Sep 12, 2023
1 parent 7b0ad0b commit 8729c5b
Showing 1 changed file with 138 additions and 110 deletions.
248 changes: 138 additions & 110 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -36,7 +37,9 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -46,6 +49,10 @@ import (

const OrigNamespaceLabelKey = "submariner-io/originatingNamespace"

var ErrResourceNotsupported = "could not find the requested resource"
var EndpointSliceVersions = []string{"v1", "v1beta1"}
var EndpointSliceKind = "EndpointSlice"

type SyncDirection int

const (
Expand All @@ -59,16 +66,18 @@ const (
)

func (d SyncDirection) String() string {
s := "unknown"

switch d {
case LocalToRemote:
return "localToRemote"
s = "localToRemote"
case RemoteToLocal:
return "remoteToLocal"
s = "remoteToLocal"
case None:
return "none"
s = "none"
}

return "unknown"
return s
}

type Operation int
Expand All @@ -80,16 +89,18 @@ const (
)

func (o Operation) String() string {
s := "unknown"

switch o {
case Create:
return "create"
s = "create"
case Update:
return "update"
s = "update"
case Delete:
return "delete"
s = "delete"
}

return "unknown"
return s
}

const (
Expand Down Expand Up @@ -263,6 +274,10 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error {

go func() {
defer func() {
if r.config.SyncCounterOpts != nil {
prometheus.Unregister(r.syncCounter)
}

r.stopped <- struct{}{}
r.log.V(log.LIBDEBUG).Infof("Syncer %q stopped", r.config.Name)
}()
Expand Down Expand Up @@ -300,99 +315,122 @@ func (r *resourceSyncer) GetResource(name, namespace string) (runtime.Object, bo
return nil, false, nil
}

converted, err := r.convert(obj.(*unstructured.Unstructured))
return r.mustConvert(obj.(*unstructured.Unstructured)), true, nil
}

func (r *resourceSyncer) RequeueResource(name, namespace string) {
obj, exists, err := r.store.GetByKey(namespace + "/" + name)
if err != nil {
return nil, false, err
r.log.Errorf(err, "Error retrieving resource - unable to requeue")
return
}

return converted, true, nil
if exists {
r.onCreate(obj)
}
}

func (r *resourceSyncer) ListResources() ([]runtime.Object, error) {
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
return nil, fmt.Errorf("failed to wait for informer cache to sync")
}
func (r *resourceSyncer) ListResources() []runtime.Object {
return r.ListResourcesBySelector(k8slabels.Everything())
}

list := r.store.List()
retObjects := make([]runtime.Object, 0, len(list))
func (r *resourceSyncer) ListResourcesBySelector(selector k8slabels.Selector) []runtime.Object {
var retObjects []runtime.Object

for _, obj := range list {
converted, err := r.convert(obj.(*unstructured.Unstructured))
if err != nil {
return nil, err
}
return r.runIfCacheSynced(retObjects, func() any {
list := r.store.List()
retObjects := make([]runtime.Object, 0, len(list))

retObjects = append(retObjects, converted)
}
for _, obj := range list {
if !selector.Matches(k8slabels.Set(resourceUtil.MustToMeta(obj).GetLabels())) {
continue
}

retObjects = append(retObjects, r.mustConvert(obj.(*unstructured.Unstructured)))
}

return retObjects, nil
return retObjects
}).([]runtime.Object)
}

func (r *resourceSyncer) Reconcile(resourceLister func() []runtime.Object) {
go func() {
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
r.log.Error(nil, "Unable to reconcile - failed to wait for informer cache to sync")
_ = r.runIfCacheSynced(nil, func() any {
r.doReconcile(resourceLister)
return nil
})
}()
}

func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) {
resourceType := reflect.TypeOf(r.config.ResourceType)

for _, resource := range resourceLister() {
if reflect.TypeOf(resource) != resourceType {
// This would happen if the custom transform function returned a different type. We would need a custom
// reverse transform function to handle this. Possible future work, for now bail.
r.log.Warningf("Unable to reconcile type %T - expected type %v", resource, resourceType)
return
}

resourceType := reflect.TypeOf(r.config.ResourceType)
metaObj := resourceUtil.MustToMeta(resource)
clusterID, found := getClusterIDLabel(resource)
ns := r.config.SourceNamespace

for _, resource := range resourceLister() {
if reflect.TypeOf(resource) != resourceType {
// This would happen if the custom transform function returned a different type. We would need a custom
// reverse transform function to handle this. Possible future work, for now bail.
r.log.Warningf("Unable to reconcile type %T - expected type %T", resource, resourceType)
return
switch r.config.Direction {
case None:
ns = metaObj.GetNamespace()
case RemoteToLocal:
if !found || clusterID == r.config.LocalClusterID {
continue
}
case LocalToRemote:
if clusterID != r.config.LocalClusterID {
continue
}

metaObj := resourceUtil.MustToMeta(resource)
clusterID, found := getClusterIDLabel(resource)
ns := r.config.SourceNamespace
labels := metaObj.GetLabels()
delete(labels, federate.ClusterIDLabelKey)
metaObj.SetLabels(labels)

switch r.config.Direction {
case None:
ns = metaObj.GetNamespace()
case RemoteToLocal:
if !found || clusterID == r.config.LocalClusterID {
continue
}
case LocalToRemote:
if clusterID != r.config.LocalClusterID {
continue
}
if ns == metav1.NamespaceAll {
ns = metaObj.GetLabels()[OrigNamespaceLabelKey]
}
}

labels := metaObj.GetLabels()
delete(labels, federate.ClusterIDLabelKey)
metaObj.SetLabels(labels)
if ns == "" {
r.log.Warningf("Unable to reconcile resource %s/%s - cannot determine originating namespace",
metaObj.GetNamespace(), metaObj.GetName())
continue
}

if ns == metav1.NamespaceAll {
ns = metaObj.GetLabels()[OrigNamespaceLabelKey]
}
}
metaObj.SetNamespace(ns)

if ns == "" {
r.log.Warningf("Unable to reconcile resource %s/%s - cannot determine originating namespace",
metaObj.GetNamespace(), metaObj.GetName())
continue
}
key, _ := cache.MetaNamespaceKeyFunc(resource)

metaObj.SetNamespace(ns)
_, exists, _ := r.store.GetByKey(key)
if exists {
continue
}

key, _ := cache.MetaNamespaceKeyFunc(resource)
obj := resourceUtil.MustToUnstructuredUsingScheme(resource, r.config.Scheme)
r.deleted.Store(key, obj)
r.workQueue.Enqueue(obj)
}
}

_, exists, _ := r.store.GetByKey(key)
if exists {
continue
}
func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any {
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
// This means the cache was stopped.
r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name)

obj, _ := resourceUtil.ToUnstructured(resource)
r.deleted.Store(key, obj)
r.workQueue.Enqueue(obj)
}
}()
return defaultReturn
}

return run()
}

func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) {
func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) {
obj, exists, err := r.store.GetByKey(key)
if err != nil {
return true, errors.Wrapf(err, "error retrieving resource %q", key)
Expand Down Expand Up @@ -428,6 +466,22 @@ func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error)
r.log.V(log.LIBDEBUG).Infof("Syncer %q syncing resource %q", r.config.Name, resource.GetName())

err = r.config.Federator.Distribute(resource)

// yd modify
utilruntime.HandleError(fmt.Errorf("#processNextWorkItem,resource.GetKind():%v, err:%w;", resource.GetKind(), err))
if err != nil && strings.EqualFold(resource.GetKind(), EndpointSliceKind) && strings.Contains(err.Error(), ErrResourceNotsupported) {
for _, endpointSliceVersion := range EndpointSliceVersions {
utilruntime.HandleError(fmt.Errorf("#endpointSliceVersion:%v",endpointSliceVersion))
if strings.EqualFold(resource.GetAPIVersion(), endpointSliceVersion) {
continue
}
resource.SetAPIVersion(endpointSliceVersion)
utilruntime.HandleError(fmt.Errorf("#resource:%v",resource))
err = r.config.Federator.Distribute(resource)
utilruntime.HandleError(fmt.Errorf("#processNextWorkItem,resource.GetKind():%v, err:%w;", resource.GetKind(), err))
}
}

if err != nil || r.onSuccessfulSync(resource, transformed, op) {
return true, errors.Wrapf(err, "error distributing resource %q", key)
}
Expand Down Expand Up @@ -505,20 +559,12 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {
return requeue, nil
}

func (r *resourceSyncer) convertNoError(from interface{}) runtime.Object {
converted, err := r.convert(from)
if err != nil {
r.log.Error(err, "Unable to convert: %#v", from)
}

return converted
}

func (r *resourceSyncer) convert(from interface{}) (runtime.Object, error) {
func (r *resourceSyncer) mustConvert(from interface{}) runtime.Object {
converted := r.config.ResourceType.DeepCopyObject()
err := r.config.Scheme.Convert(from, converted, nil)
utilruntime.Must(err)

return converted, errors.Wrapf(err, "Syncer %q: error converting %#v to %T", r.config.Name, from, r.config.ResourceType)
return converted
}

//nolint:interfacer //false positive for "`from` can be `k8s.io/apimachinery/pkg/runtime.Object`" as it returns 'from' as Unstructured
Expand All @@ -531,22 +577,15 @@ func (r *resourceSyncer) transform(from *unstructured.Unstructured, key string,

clusterID, _ := getClusterIDLabel(from)

converted := r.convertNoError(from)
if converted == nil {
return nil, nil, false
}
converted := r.mustConvert(from)

transformed, requeue := r.config.Transform(converted, r.workQueue.NumRequeues(key), op)
if transformed == nil {
r.log.V(log.LIBDEBUG).Infof("Syncer %q: transform function returned nil - not syncing - requeue: %v", r.config.Name, requeue)
return nil, nil, requeue
}

result, err := resourceUtil.ToUnstructured(transformed)
if err != nil {
r.log.Errorf(err, "Syncer %q: error converting transform function result", r.config.Name)
return nil, nil, false
}
result := resourceUtil.MustToUnstructuredUsingScheme(transformed, r.config.Scheme)

// Preserve the cluster ID label
if clusterID != "" {
Expand All @@ -562,10 +601,7 @@ func (r *resourceSyncer) onSuccessfulSync(resource, converted runtime.Object, op
}

if converted == nil {
converted = r.convertNoError(resource)
if converted == nil {
return false
}
converted = r.mustConvert(resource)
}

r.log.V(log.LIBTRACE).Infof("Syncer %q: invoking OnSuccessfulSync function with: %#v", r.config.Name, converted)
Expand Down Expand Up @@ -602,22 +638,14 @@ func (r *resourceSyncer) onUpdate(oldObj, newObj interface{}) {
}

func (r *resourceSyncer) onDelete(obj interface{}) {
var resource *unstructured.Unstructured
var ok bool
if resource, ok = obj.(*unstructured.Unstructured); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
r.log.Errorf(nil, "Syncer %q: could not convert object %v to DeletedFinalStateUnknown", r.config.Name, obj)
return
}

resource, ok = tombstone.Obj.(*unstructured.Unstructured)
if !ok {
r.log.Errorf(nil, "Syncer %q: could not convert object tombstone %v to Unstructured", r.config.Name, tombstone.Obj)
return
}
switch t := obj.(type) {
case cache.DeletedFinalStateUnknown:
obj = t.Obj
default:
}

resource := r.assertUnstructured(obj)

if !r.shouldProcess(resource, Delete) {
return
}
Expand Down

0 comments on commit 8729c5b

Please sign in to comment.