diff --git a/api/v1beta1/slicegateway_types.go b/api/v1beta1/slicegateway_types.go index b501687a4..c0ce78cf3 100644 --- a/api/v1beta1/slicegateway_types.go +++ b/api/v1beta1/slicegateway_types.go @@ -132,9 +132,14 @@ type GwPodInfo struct { PeerPodName string `json:"peerPodName,omitempty"` PodIP string `json:"podIP,omitempty"` LocalNsmIP string `json:"localNsmIP,omitempty"` - TunnelStatus TunnelStatus `json:"tunnelStatus,omitempty"` - RouteRemoved int32 `json:"routeRemoved,omitempty"` + // TunnelStatus is the status of the tunnel between this gw pod and its peer + TunnelStatus TunnelStatus `json:"tunnelStatus,omitempty"` + RouteRemoved int32 `json:"routeRemoved,omitempty"` + // RemotePort is the port number this gw pod is connected to on the remote cluster. + // Applicable only for gw clients. Would be set to 0 for gw servers. + RemotePort int32 `json:"remotePort,omitempty"` } + type TunnelStatus struct { IntfName string `json:"IntfName,omitempty"` LocalIP string `json:"LocalIP,omitempty"` @@ -143,7 +148,10 @@ type TunnelStatus struct { TxRate uint64 `json:"TxRate,omitempty"` RxRate uint64 `json:"RxRate,omitempty"` PacketLoss uint64 `json:"PacketLoss,omitempty"` - Status int32 `json:"Status,omitempty"` + // Status is the status of the tunnel. 0: DOWN, 1: UP + Status int32 `json:"Status,omitempty"` + // TunnelState is the state of the tunnel in string format: UP, DOWN, UNKNOWN + TunnelState string `json:"TunnelState,omitempty"` } func init() { diff --git a/config/crd/bases/networking.kubeslice.io_slicegateways.yaml b/config/crd/bases/networking.kubeslice.io_slicegateways.yaml index e05e95083..ac28fcc93 100644 --- a/config/crd/bases/networking.kubeslice.io_slicegateways.yaml +++ b/config/crd/bases/networking.kubeslice.io_slicegateways.yaml @@ -169,10 +169,18 @@ spec: type: string podName: type: string + remotePort: + description: |- + RemotePort is the port number this gw pod is connected to on the remote cluster. + Applicable only for gw clients. Would be set to 0 for gw servers. + format: int32 + type: integer routeRemoved: format: int32 type: integer tunnelStatus: + description: TunnelStatus is the status of the tunnel between + this gw pod and its peer properties: IntfName: type: string @@ -190,8 +198,14 @@ spec: format: int64 type: integer Status: + description: 'Status is the status of the tunnel. 0: DOWN, + 1: UP' format: int32 type: integer + TunnelState: + description: 'TunnelState is the state of the tunnel in + string format: UP, DOWN, UNKNOWN' + type: string TxRate: format: int64 type: integer diff --git a/controllers/slicegateway/slicegateway.go b/controllers/slicegateway/slicegateway.go index b4e779b63..6ce391ef7 100644 --- a/controllers/slicegateway/slicegateway.go +++ b/controllers/slicegateway/slicegateway.go @@ -78,7 +78,7 @@ func labelsForSliceGwDeployment(name, slice, depName string) map[string]string { } } -func labelsForSliceGwService(name, svcName, depName string) map[string]string { +func labelsForSliceGwService(name, depName string) map[string]string { return map[string]string{ controllers.SliceGatewaySelectorLabelKey: name, "kubeslice.io/slice-gw-dep": depName, @@ -360,7 +360,7 @@ func (r *SliceGwReconciler) serviceForGateway(g *kubeslicev1beta1.SliceGateway, }, Spec: corev1.ServiceSpec{ Type: "NodePort", - Selector: labelsForSliceGwService(g.Name, svcName, depName), + Selector: labelsForSliceGwService(g.Name, depName), Ports: []corev1.ServicePort{{ Port: 11194, Protocol: proto, @@ -661,8 +661,25 @@ func (r *SliceGwReconciler) ReconcileGwPodStatus(ctx context.Context, slicegatew return ctrl.Result{}, err, true } gwPod.LocalNsmIP = status.NsmStatus.LocalIP - gwPod.TunnelStatus = kubeslicev1beta1.TunnelStatus(status.TunnelStatus) - // this grpc call fails untill the openvpn tunnel connection is not established, so its better to do not reconcile in case of errors, hence the reconciler does not proceedes further + gwPod.TunnelStatus = kubeslicev1beta1.TunnelStatus{ + IntfName: status.TunnelStatus.IntfName, + LocalIP: status.TunnelStatus.LocalIP, + RemoteIP: status.TunnelStatus.RemoteIP, + Latency: status.TunnelStatus.Latency, + TxRate: status.TunnelStatus.TxRate, + RxRate: status.TunnelStatus.RxRate, + PacketLoss: status.TunnelStatus.PacketLoss, + Status: int32(status.TunnelStatus.Status), + TunnelState: status.TunnelStatus.TunnelState, + } + + if isClient(slicegateway) { + // get the remote port number this gw pod is connected to on the remote cluster + _, remotePortInUse := getClientGwRemotePortInUse(ctx, r.Client, slicegateway, GetDepNameFromPodName(slicegateway.Status.Config.SliceGatewayID, gwPod.PodName)) + gwPod.RemotePort = int32(remotePortInUse) + } + + // this grpc call fails untill the openvpn tunnel connection is not established, so its better to do not reconcile in case of errors, hence the reconciler does not proceeds further gwPod.PeerPodName, err = r.getRemoteGwPodName(ctx, slicegateway.Status.Config.SliceGatewayRemoteVpnIP, gwPod.PodIP) if err != nil { log.Error(err, "Error getting peer pod name", "PodName", gwPod.PodName, "PodIP", gwPod.PodIP) @@ -671,10 +688,11 @@ func (r *SliceGwReconciler) ReconcileGwPodStatus(ctx context.Context, slicegatew if isGatewayStatusChanged(slicegateway, gwPod) { toUpdate = true } - if len(slicegateway.Status.GatewayPodStatus) != len(gwPodsInfo) { - toUpdate = true - } } + if len(slicegateway.Status.GatewayPodStatus) != len(gwPodsInfo) { + toUpdate = true + } + if toUpdate { log.Info("gwPodsInfo", "gwPodsInfo", gwPodsInfo) slicegateway.Status.GatewayPodStatus = gwPodsInfo @@ -725,6 +743,10 @@ func (r *SliceGwReconciler) SendConnectionContextAndQosToGwPod(ctx context.Conte err = retry.RetryOnConflict(retry.DefaultRetry, func() error { err := r.Get(ctx, req.NamespacedName, slicegateway) + if err != nil { + log.Error(err, "Failed to get SliceGateway") + return err + } slicegateway.Status.ConnectionContextUpdatedOn = time.Now().Unix() err = r.Status().Update(ctx, slicegateway) if err != nil { @@ -1094,6 +1116,15 @@ func (r *SliceGwReconciler) gwPodPlacementIsSkewed(ctx context.Context, sliceGw func (r *SliceGwReconciler) ReconcileGwPodPlacement(ctx context.Context, sliceGw *kubeslicev1beta1.SliceGateway) error { log := r.Log + + // if the env variable is set, do not perform any gw pod rebalancing. This is useful in clusters where + // the k8s scheduler does not honor the pod anti-affinity rule and places the gw pods on the same node. Such scenarios + // could occur if the node with the kubeslice gateway label is cordoned off or if the node has insufficient resources or + // if the node has some taints that the gw pods cannot tolerate. + if os.Getenv("DISABLE_GW_POD_REBALANCING") == "true" { + return nil + } + // The gw pod rebalancing is always performed on a deployment. We expect the gw pods belonging to a slicegateway // object between any two clusters to placed on different nodes marked as kubeslice gateway nodes. If they are // initially placed on the same node due to lack of kubeslice-gateway nodes, the rebalancing algorithim is expected @@ -1170,7 +1201,7 @@ func (r *SliceGwReconciler) handleSliceGwSvcCreation(ctx context.Context, sliceG return ctrl.Result{Requeue: true}, nil, true } -func (r *SliceGwReconciler) handleSliceGwSvcDeletion(ctx context.Context, sliceGw *kubeslicev1beta1.SliceGateway, svcName, depName string) error { +func (r *SliceGwReconciler) handleSliceGwSvcDeletion(ctx context.Context, sliceGw *kubeslicev1beta1.SliceGateway, svcName string) error { log := logger.FromContext(ctx).WithName("slicegw") serviceFound := corev1.Service{} err := r.Get(ctx, types.NamespacedName{Namespace: sliceGw.Namespace, Name: svcName}, &serviceFound) @@ -1385,7 +1416,7 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli } // Update the port map gwClientToRemotePortMap.Store(deployment.Name, portNumToUpdate) - err = r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portNumToUpdate) + err = r.updateGatewayDeploymentNodePort(ctx, sliceGw, &deployment, portNumToUpdate) if err != nil { return ctrl.Result{}, err, true } @@ -1399,7 +1430,7 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli if foundInMap { if portInMap != nodePortInUse { // Update the deployment since the port numbers do not match - err := r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portInMap.(int)) + err := r.updateGatewayDeploymentNodePort(ctx, sliceGw, &deployment, portInMap.(int)) if err != nil { return ctrl.Result{}, err, true } @@ -1425,7 +1456,7 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli if deploymentsToDelete != nil { for _, depToDelete := range deploymentsToDelete.Items { // Delete the gw svc associated with the deployment - err := r.handleSliceGwSvcDeletion(ctx, sliceGw, getGwSvcNameFromDepName(depToDelete.Name), depToDelete.Name) + err := r.handleSliceGwSvcDeletion(ctx, sliceGw, getGwSvcNameFromDepName(depToDelete.Name)) if err != nil { log.Error(err, "Failed to delete gw svc", "svcName", depToDelete.Name) return ctrl.Result{}, err, true @@ -1615,7 +1646,7 @@ func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx con // updateGatewayDeploymentNodePort updates the gateway client deployments with the relevant updated ports // from the workersliceconfig -func (r *SliceGwReconciler) updateGatewayDeploymentNodePort(ctx context.Context, c client.Client, g *kubeslicev1beta1.SliceGateway, deployment *appsv1.Deployment, nodePort int) error { +func (r *SliceGwReconciler) updateGatewayDeploymentNodePort(ctx context.Context, g *kubeslicev1beta1.SliceGateway, deployment *appsv1.Deployment, nodePort int) error { containers := deployment.Spec.Template.Spec.Containers for contIndex, cont := range containers { if cont.Name == "kubeslice-sidecar" { diff --git a/controllers/slicegateway/utils.go b/controllers/slicegateway/utils.go index 1938411f1..7e32a9ecb 100644 --- a/controllers/slicegateway/utils.go +++ b/controllers/slicegateway/utils.go @@ -22,16 +22,17 @@ import ( "context" "errors" "fmt" + "os" + "strconv" + "strings" + "sync" + gwsidecarpb "github.com/kubeslice/gateway-sidecar/pkg/sidecar/sidecarpb" kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1" "github.com/kubeslice/worker-operator/controllers" ossEvents "github.com/kubeslice/worker-operator/events" "github.com/kubeslice/worker-operator/pkg/utils" webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod" - "os" - "strconv" - "strings" - "sync" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -112,6 +113,10 @@ func getPodNames(slicegateway *kubeslicev1beta1.SliceGateway) []string { } func GetDepNameFromPodName(sliceGwID, podName string) string { + if sliceGwID == "" || podName == "" { + return "" + } + after, found := strings.CutPrefix(podName, sliceGwID) if !found { return "" @@ -204,13 +209,13 @@ func getPodPairToRebalance(podsOnNode []corev1.Pod, sliceGw *kubeslicev1beta1.Sl func GetPeerGwPodName(gwPodName string, sliceGw *kubeslicev1beta1.SliceGateway) (string, error) { podInfo := findGwPodInfo(sliceGw.Status.GatewayPodStatus, gwPodName) if podInfo == nil { - return "", errors.New("Gw pod not found") + return "", errors.New("gw pod not found") } if podInfo.TunnelStatus.Status != int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP) { - return "", errors.New("Gw tunnel is down") + return "", errors.New("gw tunnel is down") } if podInfo.PeerPodName == "" { - return "", errors.New("Gw peer pod info unavailable") + return "", errors.New("gw peer pod info unavailable") } return podInfo.PeerPodName, nil diff --git a/pkg/gwsidecar/gwsidecar.go b/pkg/gwsidecar/gwsidecar.go index 1be8704e3..a9ec5458a 100644 --- a/pkg/gwsidecar/gwsidecar.go +++ b/pkg/gwsidecar/gwsidecar.go @@ -33,16 +33,19 @@ type NsmStatus struct { IntfName string LocalIP string } + type TunnelStatus struct { - IntfName string - LocalIP string - RemoteIP string - Latency uint64 - TxRate uint64 - RxRate uint64 - PacketLoss uint64 - Status int32 + IntfName string + LocalIP string + RemoteIP string + Latency uint64 + TxRate uint64 + RxRate uint64 + PacketLoss uint64 + Status int32 + TunnelState string } + type GwStatus struct { NsmStatus TunnelStatus @@ -79,6 +82,17 @@ func (worker gwSidecarClient) GetSliceGwRemotePodName(ctx context.Context, gwRem return res.GatewayPodName, nil } +func getTunnelState(tunnelState sidecar.TunnelStatusType) string { + switch tunnelState { + case sidecar.TunnelStatusType_GW_TUNNEL_STATE_UP: + return "UP" + case sidecar.TunnelStatusType_GW_TUNNEL_STATE_DOWN: + return "DOWN" + default: + return "UNKNOWN" + } +} + // GetStatus retrieves sidecar status func (worker gwSidecarClient) GetStatus(ctx context.Context, serverAddr string) (*GwStatus, error) { conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -103,14 +117,19 @@ func (worker gwSidecarClient) GetStatus(ctx context.Context, serverAddr string) } if res.TunnelStatus != nil { gwStatus.TunnelStatus = TunnelStatus{ - IntfName: res.TunnelStatus.NetInterface, - LocalIP: res.TunnelStatus.LocalIP, - RemoteIP: res.TunnelStatus.PeerIP, - PacketLoss: res.TunnelStatus.PacketLoss, - Status: int32(res.TunnelStatus.Status), + IntfName: res.TunnelStatus.NetInterface, + LocalIP: res.TunnelStatus.LocalIP, + RemoteIP: res.TunnelStatus.PeerIP, + Latency: res.TunnelStatus.Latency, + TxRate: res.TunnelStatus.TxRate, + RxRate: res.TunnelStatus.RxRate, + PacketLoss: res.TunnelStatus.PacketLoss, + Status: int32(res.TunnelStatus.Status), + TunnelState: getTunnelState(res.TunnelStatus.Status), } } else { gwStatus.TunnelStatus.Status = int32(sidecar.TunnelStatusType_GW_TUNNEL_STATE_DOWN) + gwStatus.TunnelStatus.TunnelState = getTunnelState(sidecar.TunnelStatusType_GW_TUNNEL_STATE_DOWN) } return gwStatus, err