Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent map access and remove unwanted code #245

Merged
merged 3 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ SHELL := /bin/bash
GOCMD?=go
GOBUILD?=$(GOCMD) build
GOCLEAN?=$(GOCMD) clean
GOTEST?=$(GOCMD) test
GOTEST?=$(GOCMD) test -race
GOGET?=$(GOCMD) get
GOBIN?=$(GOPATH)/bin
OUT?=./out/
Expand Down
119 changes: 0 additions & 119 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

clusterId := dh.ClusterID

localDrName := obj.Name + "-local"

var localIdentityId string

syncNamespace := common.GetSyncNamespace()

r := dh.RemoteRegistry
Expand All @@ -272,13 +268,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing")

//Create label based service entry in source and dependent clusters for subset routing to work
host := destinationRule.Host

basicSEName := getIstioResourceName(host, "-se")

seName := getIstioResourceName(host, "-se")

allDependentClusters := make(map[string]string)

util.MapCopy(allDependentClusters, dependentClusters)
Expand All @@ -289,36 +278,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

rc := r.RemoteControllers[dependentCluster]

var newServiceEntry *v1alpha3.ServiceEntry

var existsServiceEntry *v1alpha3.ServiceEntry

var drServiceEntries = make(map[string]*v1alpha32.ServiceEntry)

exist, err := rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Get(basicSEName, v12.GetOptions{})

var identityId = ""

if exist == nil || err != nil {

log.Warnf(LogFormat, "Find", "ServiceEntry", basicSEName, dependentCluster, "Failed")

} else {

serviceEntry := exist.Spec

identityRaw, ok := r.AdmiralCache.CnameIdentityCache.Load(serviceEntry.Hosts[0])

if ok {
identityId = fmt.Sprintf("%v", identityRaw)
if dependentCluster == clusterId {
localIdentityId = identityId
}
drServiceEntries = createSeWithDrLabels(rc, dependentCluster == clusterId, identityId, seName, &serviceEntry, &destinationRule, r.AdmiralCache.ServiceEntryAddressStore, r.AdmiralCache.ConfigMapController)
}

}

if event == common.Delete {

err := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(obj.Name, &v12.DeleteOptions{})
Expand All @@ -327,27 +286,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
} else {
log.Errorf(LogErrFormat, "Delete", "DestinationRule", obj.Name, clusterId, err)
}
err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Delete(seName, &v12.DeleteOptions{})
if err != nil {
log.Infof(LogFormat, "Delete", "ServiceEntry", seName, clusterId, "success")
} else {
log.Errorf(LogErrFormat, "Delete", "ServiceEntry", seName, clusterId, err)
}
for _, subset := range destinationRule.Subsets {
sseName := seName + common.Dash + subset.Name
err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Delete(sseName, &v12.DeleteOptions{})
if err != nil {
log.Infof(LogFormat, "Delete", "ServiceEntry", sseName, clusterId, "success")
} else {
log.Errorf(LogErrFormat, "Delete", "ServiceEntry", sseName, clusterId, err)
}
}
err = rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(localDrName, &v12.DeleteOptions{})
if err != nil {
log.Infof(LogFormat, "Delete", "DestinationRule", localDrName, clusterId, "success")
} else {
log.Errorf(LogErrFormat, "Delete", "DestinationRule", localDrName, clusterId, err)
}

} else {

Expand All @@ -357,28 +295,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
if dependentCluster != clusterId {
addUpdateDestinationRule(obj, exist, syncNamespace, rc)
}

for _seName, se := range drServiceEntries {
existsServiceEntry, _ = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Get(_seName, v12.GetOptions{})
newServiceEntry = createServiceEntrySkeletion(*se, _seName, syncNamespace)
if err != nil {
log.Warnf(LogErrFormat, "Create", "ServiceEntry", seName, clusterId, err)
}
if newServiceEntry != nil {
addUpdateServiceEntry(newServiceEntry, existsServiceEntry, syncNamespace, rc)
r.AdmiralCache.SeClusterCache.Put(newServiceEntry.Spec.Hosts[0], rc.ClusterID, rc.ClusterID)
}
//cache the subset service entries for updating them later for pod events
if dependentCluster == clusterId && se.Resolution == v1alpha32.ServiceEntry_STATIC {
r.AdmiralCache.SubsetServiceEntryIdentityCache.Store(identityId, map[string]string{_seName: clusterId})
}
}

if dependentCluster == clusterId {
//we need a destination rule with local fqdn for destination rules created with cnames to work in local cluster
createDestinationRuleForLocal(rc, localDrName, localIdentityId, clusterId, &destinationRule)
}

}
}
return
Expand All @@ -404,41 +320,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
}
}

func createDestinationRuleForLocal(remoteController *RemoteController, localDrName string, identityId string, clusterId string,
destinationRule *v1alpha32.DestinationRule) {

deployment := remoteController.DeploymentController.Cache.Get(identityId)

if deployment == nil || len(deployment.Deployments) == 0 {
log.Errorf(LogFormat, "Find", "deployment", identityId, remoteController.ClusterID, "Couldn't find deployment with identity")
return
}

//TODO this will pull a random deployment from some cluster which might not be the right deployment
var deploymentInstance *k8sAppsV1.Deployment
for _, value := range deployment.Deployments {
deploymentInstance = value
break
}

syncNamespace := common.GetSyncNamespace()
serviceInstance := getServiceForDeployment(remoteController, deploymentInstance)

cname := common.GetCname(deploymentInstance, common.GetHostnameSuffix(), common.GetWorkloadIdentifier())
if cname == destinationRule.Host {
destinationRule.Host = serviceInstance.Name + common.Sep + serviceInstance.Namespace + common.DotLocalDomainSuffix
existsDestinationRule, err := remoteController.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Get(localDrName, v12.GetOptions{})
if err != nil {
log.Warnf(LogErrFormat, "Find", "DestinationRule", localDrName, clusterId, err)
}
newDestinationRule := createDestinationRuleSkeletion(*destinationRule, localDrName, syncNamespace)

if newDestinationRule != nil {
addUpdateDestinationRule(newDestinationRule, existsDestinationRule, syncNamespace, remoteController)
}
}
}

func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceHandler, event common.Event, resourceType common.ResourceType) error {

log.Infof(LogFormat, "Event", resourceType, obj.Name, vh.ClusterID, "Received event")
Expand Down
53 changes: 2 additions & 51 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
StartTime: time.Now(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, formatting issues.

}

err := w.deleteCacheController("I don't exit")
Expand All @@ -62,7 +62,7 @@ func TestDeleteCacheController(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
StartTime: time.Now(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, formatting issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be the right formatting now. Was messed up before.

}

r := rest.Config{
Expand Down Expand Up @@ -132,55 +132,6 @@ func TestCopySidecar(t *testing.T) {
}
}

func TestCreateDestinationRuleForLocalNoDeployLabel(t *testing.T) {

config := rest.Config{
Host: "localhost",
}

d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
}

rc := RemoteController{
DeploymentController: d,
}

des := networking.DestinationRule{
Host: "test.com",
Subsets: []*networking.Subset{
{Name: "subset1", Labels: map[string]string{"foo": "bar"}, TrafficPolicy: nil},
},
}

createDestinationRuleForLocal(&rc, "local.name", "identity", "cluster1", &des)

}

func TestCreateDestinationRuleForLocal(t *testing.T) {

rc, err := createMockRemoteController(
func(i interface{}) {

},
)

if err != nil {
t.Fail()
}
des := networking.DestinationRule{
Host: "dev.bar.global",
Subsets: []*networking.Subset{
{Name: "subset1", Labels: map[string]string{"foo": "bar"}, TrafficPolicy: nil},
},
}

createDestinationRuleForLocal(rc, "local.name", "bar", "cluster1", &des)

}

func createMockRemoteController(f func(interface{})) (*RemoteController, error) {
config := rest.Config{
Host: "localhost",
Expand Down
79 changes: 17 additions & 62 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
cnames := make(map[string]string)
var serviceInstance *k8sV1.Service
var weightedServices map[string]*WeightedService
var rollout *admiral.RolloutClusterEntry
var rollout *argo.Rollout
var gtps = make(map[string][]*v1.GlobalTrafficPolicy)

var namespace string
Expand All @@ -84,29 +84,27 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
start := time.Now()
for _, rc := range remoteRegistry.RemoteControllers {

deployment := rc.DeploymentController.Cache.Get(sourceIdentity)
deployment := rc.DeploymentController.Cache.Get(sourceIdentity, env)

if rc.RolloutController != nil {
rollout = rc.RolloutController.Cache.Get(sourceIdentity)
rollout = rc.RolloutController.Cache.Get(sourceIdentity, env)
}

if deployment != nil && deployment.Deployments[env] != nil {
deploymentInstance := deployment.Deployments[env]
if deployment != nil {

serviceInstance = getServiceForDeployment(rc, deploymentInstance)
serviceInstance = getServiceForDeployment(rc, deployment)
if serviceInstance == nil {
continue
}
namespace = deploymentInstance.Namespace
localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deploymentInstance)
namespace = deployment.Namespace
localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deployment)

cname = common.GetCname(deploymentInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
sourceDeployments[rc.ClusterID] = deploymentInstance
createServiceEntry(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deploymentInstance, serviceEntries)
} else if rollout != nil && rollout.Rollouts[env] != nil {
rolloutInstance := rollout.Rollouts[env]
cname = common.GetCname(deployment, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
sourceDeployments[rc.ClusterID] = deployment
createServiceEntry(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deployment, serviceEntries)
} else if rollout != nil {

weightedServices = getServiceForRollout(rc, rolloutInstance)
weightedServices = getServiceForRollout(rc, rollout)
if len(weightedServices) == 0 {
continue
}
Expand All @@ -116,13 +114,13 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
serviceInstance = sInstance.Service
break
}
namespace = rolloutInstance.Namespace
localMeshPorts := GetMeshPortsForRollout(rc.ClusterID, serviceInstance, rolloutInstance)
namespace = rollout.Namespace
localMeshPorts := GetMeshPortsForRollout(rc.ClusterID, serviceInstance, rollout)

cname = common.GetCnameForRollout(rolloutInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
cname = common.GetCnameForRollout(rollout, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
cnames[cname] = "1"
sourceRollouts[rc.ClusterID] = rolloutInstance
createServiceEntryForRollout(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, rolloutInstance, serviceEntries)
sourceRollouts[rc.ClusterID] = rollout
createServiceEntryForRollout(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, rollout, serviceEntries)
} else {
continue
}
Expand Down Expand Up @@ -391,49 +389,6 @@ func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar {
return newSidecarObj
}

func createSeWithDrLabels(remoteController *RemoteController, localCluster bool, identityId string, seName string, se *networking.ServiceEntry,
dr *networking.DestinationRule, seAddressCache *ServiceEntryAddressStore, configmapController admiral.ConfigMapControllerInterface) map[string]*networking.ServiceEntry {
var allSes = make(map[string]*networking.ServiceEntry)
var newSe = copyServiceEntry(se)

address, _, err := GetLocalAddressForSe(seName, seAddressCache, configmapController)
if err != nil {
log.Warnf("Failed to get address for dr service entry. Not creating it. err:%v", err)
return nil
}
newSe.Addresses = []string{address}

var endpoints = make([]*networking.ServiceEntry_Endpoint, 0)

for _, endpoint := range se.Endpoints {
for _, subset := range dr.Subsets {
newEndpoint := copyEndpoint(endpoint)
newEndpoint.Labels = subset.Labels

////create a service entry with name subsetSeName
//if localCluster {
// subsetSeName := seName + common.Dash + subset.Name
// subsetSeAddress := strings.Split(se.Hosts[0], common.DotMesh)[0] + common.Sep + subset.Name + common.DotMesh BROKEN MUST FIX //todo fix the cname format here
//
// //TODO uncomment the line below when subset routing across clusters is fixed
// //newEndpoint.Address = subsetSeAddress
//
// subSetSe := createSeWithPodIps(remoteController, identityId, subsetSeName, subsetSeAddress, newSe, newEndpoint, subset, seAddressMap)
// if subSetSe != nil {
// allSes[subsetSeName] = subSetSe
// //TODO create default DestinationRules for these subset SEs
// }
//}

endpoints = append(endpoints, newEndpoint)

}
}
newSe.Endpoints = endpoints
allSes[seName] = newSe
return allSes
}

//This will create the default service entries and also additional ones specified in GTP
func AddServiceEntriesWithDr(cache *AdmiralCache, sourceClusters map[string]string, rcs map[string]*RemoteController, serviceEntries map[string]*networking.ServiceEntry) {
syncNamespace := common.GetSyncNamespace()
Expand Down
Loading