Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When netpol is added to a workload, the workload's POD can be accessed using service #1156

Merged
merged 1 commit into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func NewController(config *Configuration) *Controller {
})

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddService,
DeleteFunc: controller.enqueueDeleteService,
UpdateFunc: controller.enqueueUpdateService,
})
Expand Down
252 changes: 214 additions & 38 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,58 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}

// set svc addreess_set
svcAsNameIPv4 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv4), "-", ".", -1)
svcAsNameIPv6 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv6), "-", ".", -1)
svcIpv4s, svcIpv6s, err := c.fetchSelectedSvc(np.Namespace, &np.Spec.PodSelector)
if err != nil {
klog.Errorf("failed to fetchSelectedSvc svcIPs result %v", err)
return err
}
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
svcIPs := svcIpv4s
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
svcIPs = svcIpv6s
}
if err := c.ovnClient.CreateAddressSet(svcAsName, np.Namespace, np.Name, "service"); err != nil {
klog.Errorf("failed to create address_set %s, %v", svcAsNameIPv4, err)
return err
}
if err := c.ovnClient.SetAddressesToAddressSet(svcIPs, svcAsName); err != nil {
klog.Errorf("failed to set netpol svc, %v", err)
return err
}
}

// before update or add ingress info,we should first delete acl and address_set
if err := c.ovnClient.DeleteACL(pgName, "to-lport"); err != nil {
klog.Errorf("failed to delete np %s ingress acls, %v", key, err)
return err
}

ingressAsNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "ingress")
if err != nil {
klog.Errorf("failed to list ingress address_set, %v", err)
return err
}
for _, ingressAsName := range ingressAsNames {
if err := c.ovnClient.DeleteAddressSet(ingressAsName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}

if hasIngressRule(np) {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
}

for idx, npr := range np.Spec.Ingress {
// A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different
ingressAllowAsName := fmt.Sprintf("%s.%s.%d", ingressAllowAsNamePrefix, protocol, idx)
Expand Down Expand Up @@ -250,7 +299,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports); err != nil {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports, svcAsName); err != nil {
klog.Errorf("failed to create ingress acls for np %s, %v", key, err)
return err
}
Expand All @@ -269,7 +318,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
ingressPorts := []netv1.NetworkPolicyPort{}
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts); err != nil {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts, svcAsName); err != nil {
klog.Errorf("failed to create ingress acls for np %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -299,28 +348,33 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
}
} else {
if err := c.ovnClient.DeleteACL(pgName, "to-lport"); err != nil {
klog.Errorf("failed to delete np %s ingress acls, %v", key, err)
return err
}
}

asNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "ingress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
// before update or add egress info,we should first delete acl and address_set
if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil {
klog.Errorf("failed to delete np %s egress acls, %v", key, err)
return err
}

egressAsNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "egress")
if err != nil {
klog.Errorf("failed to list egress address_set, %v", err)
return err
}
for _, egressAsName := range egressAsNames {
if err := c.ovnClient.DeleteAddressSet(egressAsName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
for _, asName := range asNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}
}

if hasEgressRule(np) {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
}

for idx, npr := range np.Spec.Egress {
// A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different
egressAllowAsName := fmt.Sprintf("%s.%s.%d", egressAllowAsNamePrefix, protocol, idx)
Expand Down Expand Up @@ -366,7 +420,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports); err != nil {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports, svcAsName); err != nil {
klog.Errorf("failed to create egress acls for np %s, %v", key, err)
return err
}
Expand All @@ -385,7 +439,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
egressPorts := []netv1.NetworkPolicyPort{}
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts); err != nil {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts, svcAsName); err != nil {
klog.Errorf("failed to create egress acls for np %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -416,24 +470,8 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
}
} else {
if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil {
klog.Errorf("failed to delete np %s egress acls, %v", key, err)
return err
}

asNames, err := c.ovnClient.ListAddressSet(np.Namespace, np.Name, "egress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
return err
}
for _, asName := range asNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}
}

if err := c.ovnClient.CreateGatewayACL(pgName, subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil {
klog.Errorf("failed to create gateway acl, %v", err)
return err
Expand All @@ -453,6 +491,18 @@ func (c *Controller) handleDeleteNp(key string) error {
klog.Errorf("failed to delete np %s port group, %v", key, err)
}

svcAsNames, err := c.ovnClient.ListAddressSet(namespace, name, "service")
if err != nil {
klog.Errorf("failed to list svc address_set, %v", err)
return err
}
for _, asName := range svcAsNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}

ingressAsNames, err := c.ovnClient.ListAddressSet(namespace, name, "ingress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
Expand Down Expand Up @@ -482,7 +532,7 @@ func (c *Controller) handleDeleteNp(key string) error {
func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.LabelSelector) ([]string, error) {
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, fmt.Errorf("error createing label selector, %v", err)
return nil, fmt.Errorf("error fetch label selector, %v", err)
}
pods, err := c.podsLister.Pods(namespace).List(sel)
if err != nil {
Expand All @@ -501,6 +551,45 @@ func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.Label
return ports, nil
}

func (c *Controller) fetchSelectedSvc(namespace string, selector *metav1.LabelSelector) ([]string, []string, error) {
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, nil, fmt.Errorf("error creating label selector, %v", err)
}
pods, err := c.podsLister.Pods(namespace).List(sel)
if err != nil {
return nil, nil, fmt.Errorf("failed to list pods, %v", err)
}

svcIpv4s := make([]string, 0)
svcIpv6s := make([]string, 0)
svcs, err := c.servicesLister.Services(namespace).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return nil, nil, err
}

for _, pod := range pods {
if !isPodAlive(pod) {
continue
}
if !pod.Spec.HostNetwork && pod.Annotations[util.AllocatedAnnotation] == "true" {
svcIpv4, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv4)
if err != nil {
return nil, nil, err
}
svcIpv4s = append(svcIpv4s, svcIpv4...)

svcIpv6, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv6)
if err != nil {
return nil, nil, err
}
svcIpv6s = append(svcIpv6s, svcIpv6...)
}
}
return svcIpv4s, svcIpv6s, nil
}

func hasIngressRule(np *netv1.NetworkPolicy) bool {
for _, pt := range np.Spec.PolicyTypes {
if strings.Contains(string(pt), string(netv1.PolicyTypeIngress)) {
Expand Down Expand Up @@ -530,7 +619,7 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
} else {
sel, err := metav1.LabelSelectorAsSelector(npp.NamespaceSelector)
if err != nil {
return nil, nil, fmt.Errorf("error createing label selector, %v", err)
return nil, nil, fmt.Errorf("error fetch label selector, %v", err)
}
nss, err := c.namespacesLister.List(sel)
if err != nil {
Expand All @@ -554,17 +643,79 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
if err != nil {
return nil, nil, fmt.Errorf("failed to list pod, %v", err)
}
svcs, err := c.servicesLister.Services(ns).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return nil, nil, fmt.Errorf("failed to list svc, %v", err)
}

for _, pod := range pods {
for _, podIP := range pod.Status.PodIPs {
if podIP.IP != "" && util.CheckProtocol(podIP.IP) == protocol {
selectedAddresses = append(selectedAddresses, podIP.IP)
if len(svcs) == 0 {
continue
}
klog.Infof("svc is %v", svcs)
svcIPs, err := svcMatchPods(svcs, pod, protocol)
if err != nil {
return nil, nil, err
}
klog.Infof("svcIPs is %v", svcIPs)
selectedAddresses = append(selectedAddresses, svcIPs...)
}
}
}
}

return selectedAddresses, nil, nil
}

func svcMatchPods(svcs []*corev1.Service, pod *corev1.Pod, protocol string) ([]string, error) {
matchSvcs := []string{}
// find svc ip by pod's info
for _, svc := range svcs {
isMatch, err := isSvcMatchPod(svc, pod)
if err != nil {
return nil, err
}
if isMatch {
clusterIPs := svc.Spec.ClusterIPs
if len(clusterIPs) == 0 && svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != corev1.ClusterIPNone {
clusterIPs = []string{svc.Spec.ClusterIP}
}
protocolClusterIPs := getProtocolSvcIp(clusterIPs, protocol)
if len(protocolClusterIPs) != 0 {
matchSvcs = append(matchSvcs, protocolClusterIPs...)
}
}
}
return matchSvcs, nil
}
func getProtocolSvcIp(clusterIPs []string, protocol string) []string {
protocolClusterIPs := []string{}
for _, clusterIP := range clusterIPs {
if clusterIP != "" && clusterIP != corev1.ClusterIPNone && util.CheckProtocol(clusterIP) == protocol {
protocolClusterIPs = append(protocolClusterIPs, clusterIP)
}
}
return protocolClusterIPs
}
func isSvcMatchPod(svc *corev1.Service, pod *corev1.Pod) (bool, error) {
ss := metav1.SetAsLabelSelector(svc.Spec.Selector)
sel, err := metav1.LabelSelectorAsSelector(ss)
if err != nil {
return false, fmt.Errorf("error fetch label selector, %v", err)
}
if pod.Labels == nil {
return false, nil
}
if sel.Matches(labels.Set(pod.Labels)) {
return true, nil
}
return false, nil
}

func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string {
podNs, _ := c.namespacesLister.Get(pod.Namespace)
nps, _ := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything())
Expand All @@ -577,6 +728,31 @@ func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string {
return match
}

func (c *Controller) svcMatchNetworkPolicies(svc *corev1.Service) ([]string, error) {
// find all match pod
pods, err := c.podsLister.Pods(svc.Namespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list pods, %v", err)
}

// find all match netpol
nps, err := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list netpols, %v", err)
}
match := []string{}
for _, pod := range pods {
podNs, _ := c.namespacesLister.Get(pod.Namespace)
for _, np := range nps {
if isPodMatchNetworkPolicy(pod, *podNs, np, np.Namespace) {
match = append(match, fmt.Sprintf("%s/%s", np.Namespace, np.Name))
}
}
}
klog.Infof("match svc is %v", match)
return match, nil
}

func isPodMatchNetworkPolicy(pod *corev1.Pod, podNs corev1.Namespace, policy *netv1.NetworkPolicy, policyNs string) bool {
sel, _ := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if pod.Labels == nil {
Expand Down
Loading