Skip to content

Commit

Permalink
Merge pull request #1156 from wangyd1988/release-1.7-yd
Browse files Browse the repository at this point in the history
When netpol is added to a workload, the workload's POD can be accessed using service
  • Loading branch information
zhangzujian committed Dec 13, 2021
2 parents aaedbd3 + 6fb7d26 commit 2e9d407
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func NewController(config *Configuration) *Controller {
})

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddService,
DeleteFunc: controller.enqueueDeleteService,
UpdateFunc: controller.enqueueUpdateService,
})
Expand Down
252 changes: 214 additions & 38 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,58 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}

// set svc addreess_set
svcAsNameIPv4 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv4), "-", ".", -1)
svcAsNameIPv6 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv6), "-", ".", -1)
svcIpv4s, svcIpv6s, err := c.fetchSelectedSvc(np.Namespace, &np.Spec.PodSelector)
if err != nil {
klog.Errorf("failed to fetchSelectedSvc svcIPs result %v", err)
return err
}
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
svcIPs := svcIpv4s
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
svcIPs = svcIpv6s
}
if err := c.ovnClient.CreateAddressSet(svcAsName, np.Namespace, np.Name, "service"); err != nil {
klog.Errorf("failed to create address_set %s, %v", svcAsNameIPv4, err)
return err
}
if err := c.ovnClient.SetAddressesToAddressSet(svcIPs, svcAsName); err != nil {
klog.Errorf("failed to set netpol svc, %v", err)
return err
}
}

// before update or add ingress info,we should first delete acl and address_set
if err := c.ovnClient.DeleteACL(pgName, "to-lport"); err != nil {
klog.Errorf("failed to delete np %s ingress acls, %v", key, err)
return err
}

ingressAsNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "ingress")
if err != nil {
klog.Errorf("failed to list ingress address_set, %v", err)
return err
}
for _, ingressAsName := range ingressAsNames {
if err := c.ovnClient.DeleteAddressSet(ingressAsName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}

if hasIngressRule(np) {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
}

for idx, npr := range np.Spec.Ingress {
// A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different
ingressAllowAsName := fmt.Sprintf("%s.%s.%d", ingressAllowAsNamePrefix, protocol, idx)
Expand Down Expand Up @@ -250,7 +299,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports); err != nil {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports, svcAsName); err != nil {
klog.Errorf("failed to create ingress acls for np %s, %v", key, err)
return err
}
Expand All @@ -269,7 +318,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
ingressPorts := []netv1.NetworkPolicyPort{}
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts); err != nil {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts, svcAsName); err != nil {
klog.Errorf("failed to create ingress acls for np %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -299,28 +348,33 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
}
} else {
if err := c.ovnClient.DeleteACL(pgName, "to-lport"); err != nil {
klog.Errorf("failed to delete np %s ingress acls, %v", key, err)
return err
}
}

asNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "ingress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
// before update or add egress info,we should first delete acl and address_set
if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil {
klog.Errorf("failed to delete np %s egress acls, %v", key, err)
return err
}

egressAsNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "egress")
if err != nil {
klog.Errorf("failed to list egress address_set, %v", err)
return err
}
for _, egressAsName := range egressAsNames {
if err := c.ovnClient.DeleteAddressSet(egressAsName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
for _, asName := range asNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}
}

if hasEgressRule(np) {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
}

for idx, npr := range np.Spec.Egress {
// A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different
egressAllowAsName := fmt.Sprintf("%s.%s.%d", egressAllowAsNamePrefix, protocol, idx)
Expand Down Expand Up @@ -366,7 +420,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports); err != nil {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports, svcAsName); err != nil {
klog.Errorf("failed to create egress acls for np %s, %v", key, err)
return err
}
Expand All @@ -385,7 +439,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
egressPorts := []netv1.NetworkPolicyPort{}
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts); err != nil {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts, svcAsName); err != nil {
klog.Errorf("failed to create egress acls for np %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -416,24 +470,8 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
}
} else {
if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil {
klog.Errorf("failed to delete np %s egress acls, %v", key, err)
return err
}

asNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "egress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
return err
}
for _, asName := range asNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}
}

if err := c.ovnClient.CreateGatewayACL(pgName, subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil {
klog.Errorf("failed to create gateway acl, %v", err)
return err
Expand All @@ -453,6 +491,18 @@ func (c *Controller) handleDeleteNp(key string) error {
klog.Errorf("failed to delete np %s port group, %v", key, err)
}

svcAsNames, err := c.ovnClient.ListAddressSet(namespace, name, "service")
if err != nil {
klog.Errorf("failed to list svc address_set, %v", err)
return err
}
for _, asName := range svcAsNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}

ingressAsNames, err := c.ovnClient.ListAddressSet(namespace, name, "ingress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
Expand Down Expand Up @@ -482,7 +532,7 @@ func (c *Controller) handleDeleteNp(key string) error {
func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.LabelSelector) ([]string, error) {
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, fmt.Errorf("error createing label selector, %v", err)
return nil, fmt.Errorf("error fetch label selector, %v", err)
}
pods, err := c.podsLister.Pods(namespace).List(sel)
if err != nil {
Expand All @@ -501,6 +551,45 @@ func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.Label
return ports, nil
}

func (c *Controller) fetchSelectedSvc(namespace string, selector *metav1.LabelSelector) ([]string, []string, error) {
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, nil, fmt.Errorf("error creating label selector, %v", err)
}
pods, err := c.podsLister.Pods(namespace).List(sel)
if err != nil {
return nil, nil, fmt.Errorf("failed to list pods, %v", err)
}

svcIpv4s := make([]string, 0)
svcIpv6s := make([]string, 0)
svcs, err := c.servicesLister.Services(namespace).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return nil, nil, err
}

for _, pod := range pods {
if !isPodAlive(pod) {
continue
}
if !pod.Spec.HostNetwork && pod.Annotations[util.AllocatedAnnotation] == "true" {
svcIpv4, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv4)
if err != nil {
return nil, nil, err
}
svcIpv4s = append(svcIpv4s, svcIpv4...)

svcIpv6, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv6)
if err != nil {
return nil, nil, err
}
svcIpv6s = append(svcIpv6s, svcIpv6...)
}
}
return svcIpv4s, svcIpv6s, nil
}

func hasIngressRule(np *netv1.NetworkPolicy) bool {
for _, pt := range np.Spec.PolicyTypes {
if strings.Contains(string(pt), string(netv1.PolicyTypeIngress)) {
Expand Down Expand Up @@ -530,7 +619,7 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
} else {
sel, err := metav1.LabelSelectorAsSelector(npp.NamespaceSelector)
if err != nil {
return nil, nil, fmt.Errorf("error createing label selector, %v", err)
return nil, nil, fmt.Errorf("error fetch label selector, %v", err)
}
nss, err := c.namespacesLister.List(sel)
if err != nil {
Expand All @@ -554,17 +643,79 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
if err != nil {
return nil, nil, fmt.Errorf("failed to list pod, %v", err)
}
svcs, err := c.servicesLister.Services(ns).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return nil, nil, fmt.Errorf("failed to list svc, %v", err)
}

for _, pod := range pods {
for _, podIP := range pod.Status.PodIPs {
if podIP.IP != "" && util.CheckProtocol(podIP.IP) == protocol {
selectedAddresses = append(selectedAddresses, podIP.IP)
if len(svcs) == 0 {
continue
}
klog.Infof("svc is %v", svcs)
svcIPs, err := svcMatchPods(svcs, pod, protocol)
if err != nil {
return nil, nil, err
}
klog.Infof("svcIPs is %v", svcIPs)
selectedAddresses = append(selectedAddresses, svcIPs...)
}
}
}
}

return selectedAddresses, nil, nil
}

func svcMatchPods(svcs []*corev1.Service, pod *corev1.Pod, protocol string) ([]string, error) {
matchSvcs := []string{}
// find svc ip by pod's info
for _, svc := range svcs {
isMatch, err := isSvcMatchPod(svc, pod)
if err != nil {
return nil, err
}
if isMatch {
clusterIPs := svc.Spec.ClusterIPs
if len(clusterIPs) == 0 && svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != corev1.ClusterIPNone {
clusterIPs = []string{svc.Spec.ClusterIP}
}
protocolClusterIPs := getProtocolSvcIp(clusterIPs, protocol)
if len(protocolClusterIPs) != 0 {
matchSvcs = append(matchSvcs, protocolClusterIPs...)
}
}
}
return matchSvcs, nil
}
func getProtocolSvcIp(clusterIPs []string, protocol string) []string {
protocolClusterIPs := []string{}
for _, clusterIP := range clusterIPs {
if clusterIP != "" && clusterIP != corev1.ClusterIPNone && util.CheckProtocol(clusterIP) == protocol {
protocolClusterIPs = append(protocolClusterIPs, clusterIP)
}
}
return protocolClusterIPs
}
func isSvcMatchPod(svc *corev1.Service, pod *corev1.Pod) (bool, error) {
ss := metav1.SetAsLabelSelector(svc.Spec.Selector)
sel, err := metav1.LabelSelectorAsSelector(ss)
if err != nil {
return false, fmt.Errorf("error fetch label selector, %v", err)
}
if pod.Labels == nil {
return false, nil
}
if sel.Matches(labels.Set(pod.Labels)) {
return true, nil
}
return false, nil
}

func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string {
podNs, _ := c.namespacesLister.Get(pod.Namespace)
nps, _ := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything())
Expand All @@ -577,6 +728,31 @@ func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string {
return match
}

func (c *Controller) svcMatchNetworkPolicies(svc *corev1.Service) ([]string, error) {
// find all match pod
pods, err := c.podsLister.Pods(svc.Namespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list pods, %v", err)
}

// find all match netpol
nps, err := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list netpols, %v", err)
}
match := []string{}
for _, pod := range pods {
podNs, _ := c.namespacesLister.Get(pod.Namespace)
for _, np := range nps {
if isPodMatchNetworkPolicy(pod, *podNs, np, np.Namespace) {
match = append(match, fmt.Sprintf("%s/%s", np.Namespace, np.Name))
}
}
}
klog.Infof("match svc is %v", match)
return match, nil
}

func isPodMatchNetworkPolicy(pod *corev1.Pod, podNs corev1.Namespace, policy *netv1.NetworkPolicy, policyNs string) bool {
sel, _ := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if pod.Labels == nil {
Expand Down
Loading

0 comments on commit 2e9d407

Please sign in to comment.