Skip to content

Commit

Permalink
Fix concurrent map access and remove unwanted code (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri committed Jul 14, 2022
1 parent 5541b68 commit fbe8af7
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 316 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ SHELL := /bin/bash
GOCMD?=go
GOBUILD?=$(GOCMD) build
GOCLEAN?=$(GOCMD) clean
GOTEST?=$(GOCMD) test
GOTEST?=$(GOCMD) test -race
GOGET?=$(GOCMD) get
GOBIN?=$(GOPATH)/bin
OUT?=./out/
Expand Down
119 changes: 0 additions & 119 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

clusterId := dh.ClusterID

localDrName := obj.Name + "-local"

var localIdentityId string

syncNamespace := common.GetSyncNamespace()

r := dh.RemoteRegistry
Expand All @@ -272,13 +268,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing")

//Create label based service entry in source and dependent clusters for subset routing to work
host := destinationRule.Host

basicSEName := getIstioResourceName(host, "-se")

seName := getIstioResourceName(host, "-se")

allDependentClusters := make(map[string]string)

util.MapCopy(allDependentClusters, dependentClusters)
Expand All @@ -289,36 +278,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

rc := r.RemoteControllers[dependentCluster]

var newServiceEntry *v1alpha3.ServiceEntry

var existsServiceEntry *v1alpha3.ServiceEntry

var drServiceEntries = make(map[string]*v1alpha32.ServiceEntry)

exist, err := rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Get(basicSEName, v12.GetOptions{})

var identityId = ""

if exist == nil || err != nil {

log.Warnf(LogFormat, "Find", "ServiceEntry", basicSEName, dependentCluster, "Failed")

} else {

serviceEntry := exist.Spec

identityRaw, ok := r.AdmiralCache.CnameIdentityCache.Load(serviceEntry.Hosts[0])

if ok {
identityId = fmt.Sprintf("%v", identityRaw)
if dependentCluster == clusterId {
localIdentityId = identityId
}
drServiceEntries = createSeWithDrLabels(rc, dependentCluster == clusterId, identityId, seName, &serviceEntry, &destinationRule, r.AdmiralCache.ServiceEntryAddressStore, r.AdmiralCache.ConfigMapController)
}

}

if event == common.Delete {

err := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(obj.Name, &v12.DeleteOptions{})
Expand All @@ -327,27 +286,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
} else {
log.Errorf(LogErrFormat, "Delete", "DestinationRule", obj.Name, clusterId, err)
}
err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Delete(seName, &v12.DeleteOptions{})
if err != nil {
log.Infof(LogFormat, "Delete", "ServiceEntry", seName, clusterId, "success")
} else {
log.Errorf(LogErrFormat, "Delete", "ServiceEntry", seName, clusterId, err)
}
for _, subset := range destinationRule.Subsets {
sseName := seName + common.Dash + subset.Name
err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Delete(sseName, &v12.DeleteOptions{})
if err != nil {
log.Infof(LogFormat, "Delete", "ServiceEntry", sseName, clusterId, "success")
} else {
log.Errorf(LogErrFormat, "Delete", "ServiceEntry", sseName, clusterId, err)
}
}
err = rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(localDrName, &v12.DeleteOptions{})
if err != nil {
log.Infof(LogFormat, "Delete", "DestinationRule", localDrName, clusterId, "success")
} else {
log.Errorf(LogErrFormat, "Delete", "DestinationRule", localDrName, clusterId, err)
}

} else {

Expand All @@ -357,28 +295,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
if dependentCluster != clusterId {
addUpdateDestinationRule(obj, exist, syncNamespace, rc)
}

for _seName, se := range drServiceEntries {
existsServiceEntry, _ = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(syncNamespace).Get(_seName, v12.GetOptions{})
newServiceEntry = createServiceEntrySkeletion(*se, _seName, syncNamespace)
if err != nil {
log.Warnf(LogErrFormat, "Create", "ServiceEntry", seName, clusterId, err)
}
if newServiceEntry != nil {
addUpdateServiceEntry(newServiceEntry, existsServiceEntry, syncNamespace, rc)
r.AdmiralCache.SeClusterCache.Put(newServiceEntry.Spec.Hosts[0], rc.ClusterID, rc.ClusterID)
}
//cache the subset service entries for updating them later for pod events
if dependentCluster == clusterId && se.Resolution == v1alpha32.ServiceEntry_STATIC {
r.AdmiralCache.SubsetServiceEntryIdentityCache.Store(identityId, map[string]string{_seName: clusterId})
}
}

if dependentCluster == clusterId {
//we need a destination rule with local fqdn for destination rules created with cnames to work in local cluster
createDestinationRuleForLocal(rc, localDrName, localIdentityId, clusterId, &destinationRule)
}

}
}
return
Expand All @@ -404,41 +320,6 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
}
}

func createDestinationRuleForLocal(remoteController *RemoteController, localDrName string, identityId string, clusterId string,
destinationRule *v1alpha32.DestinationRule) {

deployment := remoteController.DeploymentController.Cache.Get(identityId)

if deployment == nil || len(deployment.Deployments) == 0 {
log.Errorf(LogFormat, "Find", "deployment", identityId, remoteController.ClusterID, "Couldn't find deployment with identity")
return
}

//TODO this will pull a random deployment from some cluster which might not be the right deployment
var deploymentInstance *k8sAppsV1.Deployment
for _, value := range deployment.Deployments {
deploymentInstance = value
break
}

syncNamespace := common.GetSyncNamespace()
serviceInstance := getServiceForDeployment(remoteController, deploymentInstance)

cname := common.GetCname(deploymentInstance, common.GetHostnameSuffix(), common.GetWorkloadIdentifier())
if cname == destinationRule.Host {
destinationRule.Host = serviceInstance.Name + common.Sep + serviceInstance.Namespace + common.DotLocalDomainSuffix
existsDestinationRule, err := remoteController.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Get(localDrName, v12.GetOptions{})
if err != nil {
log.Warnf(LogErrFormat, "Find", "DestinationRule", localDrName, clusterId, err)
}
newDestinationRule := createDestinationRuleSkeletion(*destinationRule, localDrName, syncNamespace)

if newDestinationRule != nil {
addUpdateDestinationRule(newDestinationRule, existsDestinationRule, syncNamespace, remoteController)
}
}
}

func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceHandler, event common.Event, resourceType common.ResourceType) error {

log.Infof(LogFormat, "Event", resourceType, obj.Name, vh.ClusterID, "Received event")
Expand Down
53 changes: 2 additions & 51 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
StartTime: time.Now(),
}

err := w.deleteCacheController("I don't exit")
Expand All @@ -62,7 +62,7 @@ func TestDeleteCacheController(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
StartTime: time.Now(),
}

r := rest.Config{
Expand Down Expand Up @@ -132,55 +132,6 @@ func TestCopySidecar(t *testing.T) {
}
}

func TestCreateDestinationRuleForLocalNoDeployLabel(t *testing.T) {

config := rest.Config{
Host: "localhost",
}

d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
}

rc := RemoteController{
DeploymentController: d,
}

des := networking.DestinationRule{
Host: "test.com",
Subsets: []*networking.Subset{
{Name: "subset1", Labels: map[string]string{"foo": "bar"}, TrafficPolicy: nil},
},
}

createDestinationRuleForLocal(&rc, "local.name", "identity", "cluster1", &des)

}

func TestCreateDestinationRuleForLocal(t *testing.T) {

rc, err := createMockRemoteController(
func(i interface{}) {

},
)

if err != nil {
t.Fail()
}
des := networking.DestinationRule{
Host: "dev.bar.global",
Subsets: []*networking.Subset{
{Name: "subset1", Labels: map[string]string{"foo": "bar"}, TrafficPolicy: nil},
},
}

createDestinationRuleForLocal(rc, "local.name", "bar", "cluster1", &des)

}

func createMockRemoteController(f func(interface{})) (*RemoteController, error) {
config := rest.Config{
Host: "localhost",
Expand Down
79 changes: 17 additions & 62 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
cnames := make(map[string]string)
var serviceInstance *k8sV1.Service
var weightedServices map[string]*WeightedService
var rollout *admiral.RolloutClusterEntry
var rollout *argo.Rollout
var gtps = make(map[string][]*v1.GlobalTrafficPolicy)

var namespace string
Expand All @@ -84,29 +84,27 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
start := time.Now()
for _, rc := range remoteRegistry.RemoteControllers {

deployment := rc.DeploymentController.Cache.Get(sourceIdentity)
deployment := rc.DeploymentController.Cache.Get(sourceIdentity, env)

if rc.RolloutController != nil {
rollout = rc.RolloutController.Cache.Get(sourceIdentity)
rollout = rc.RolloutController.Cache.Get(sourceIdentity, env)
}

if deployment != nil && deployment.Deployments[env] != nil {
deploymentInstance := deployment.Deployments[env]
if deployment != nil {

serviceInstance = getServiceForDeployment(rc, deploymentInstance)
serviceInstance = getServiceForDeployment(rc, deployment)
if serviceInstance == nil {
continue
}
namespace = deploymentInstance.Namespace
localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deploymentInstance)
namespace = deployment.Namespace
localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deployment)

cname = common.GetCname(deploymentInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
sourceDeployments[rc.ClusterID] = deploymentInstance
createServiceEntry(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deploymentInstance, serviceEntries)
} else if rollout != nil && rollout.Rollouts[env] != nil {
rolloutInstance := rollout.Rollouts[env]
cname = common.GetCname(deployment, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
sourceDeployments[rc.ClusterID] = deployment
createServiceEntry(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deployment, serviceEntries)
} else if rollout != nil {

weightedServices = getServiceForRollout(rc, rolloutInstance)
weightedServices = getServiceForRollout(rc, rollout)
if len(weightedServices) == 0 {
continue
}
Expand All @@ -116,13 +114,13 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
serviceInstance = sInstance.Service
break
}
namespace = rolloutInstance.Namespace
localMeshPorts := GetMeshPortsForRollout(rc.ClusterID, serviceInstance, rolloutInstance)
namespace = rollout.Namespace
localMeshPorts := GetMeshPortsForRollout(rc.ClusterID, serviceInstance, rollout)

cname = common.GetCnameForRollout(rolloutInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
cname = common.GetCnameForRollout(rollout, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
cnames[cname] = "1"
sourceRollouts[rc.ClusterID] = rolloutInstance
createServiceEntryForRollout(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, rolloutInstance, serviceEntries)
sourceRollouts[rc.ClusterID] = rollout
createServiceEntryForRollout(event, rc, remoteRegistry.AdmiralCache, localMeshPorts, rollout, serviceEntries)
} else {
continue
}
Expand Down Expand Up @@ -391,49 +389,6 @@ func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar {
return newSidecarObj
}

func createSeWithDrLabels(remoteController *RemoteController, localCluster bool, identityId string, seName string, se *networking.ServiceEntry,
dr *networking.DestinationRule, seAddressCache *ServiceEntryAddressStore, configmapController admiral.ConfigMapControllerInterface) map[string]*networking.ServiceEntry {
var allSes = make(map[string]*networking.ServiceEntry)
var newSe = copyServiceEntry(se)

address, _, err := GetLocalAddressForSe(seName, seAddressCache, configmapController)
if err != nil {
log.Warnf("Failed to get address for dr service entry. Not creating it. err:%v", err)
return nil
}
newSe.Addresses = []string{address}

var endpoints = make([]*networking.ServiceEntry_Endpoint, 0)

for _, endpoint := range se.Endpoints {
for _, subset := range dr.Subsets {
newEndpoint := copyEndpoint(endpoint)
newEndpoint.Labels = subset.Labels

////create a service entry with name subsetSeName
//if localCluster {
// subsetSeName := seName + common.Dash + subset.Name
// subsetSeAddress := strings.Split(se.Hosts[0], common.DotMesh)[0] + common.Sep + subset.Name + common.DotMesh BROKEN MUST FIX //todo fix the cname format here
//
// //TODO uncomment the line below when subset routing across clusters is fixed
// //newEndpoint.Address = subsetSeAddress
//
// subSetSe := createSeWithPodIps(remoteController, identityId, subsetSeName, subsetSeAddress, newSe, newEndpoint, subset, seAddressMap)
// if subSetSe != nil {
// allSes[subsetSeName] = subSetSe
// //TODO create default DestinationRules for these subset SEs
// }
//}

endpoints = append(endpoints, newEndpoint)

}
}
newSe.Endpoints = endpoints
allSes[seName] = newSe
return allSes
}

//This will create the default service entries and also additional ones specified in GTP
func AddServiceEntriesWithDr(cache *AdmiralCache, sourceClusters map[string]string, rcs map[string]*RemoteController, serviceEntries map[string]*networking.ServiceEntry) {
syncNamespace := common.GetSyncNamespace()
Expand Down
Loading

0 comments on commit fbe8af7

Please sign in to comment.