Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Admiral DR support #248

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().StringVar(&params.LabelSet.GatewayApp, "gateway_app", "istio-ingressgateway",
"The the value of the `app` label to use to match and find the service that represents the ingress for cross cluster traffic (AUTO_PASSTHROUGH mode)")
rootCmd.PersistentFlags().BoolVar(&params.MetricsEnabled, "metrics", true, "Enable prometheus metrics collections")
rootCmd.PersistentFlags().StringVar(&params.AdmiralStateCheckerName,"admiral_state_checker_name","NoOPStateChecker","The value of the admiral_state_checker_name label to configure the DR Strategy for Admiral")
rootCmd.PersistentFlags().StringVar(&params.DRStateStoreConfigPath,"dr_state_store_config_path","","Location of config file which has details for data store. Ex:- Dynamo DB connection details")
rootCmd.PersistentFlags().StringVar(&params.ServiceEntryIPPrefix,"se_ip_prefix","240.0","IP prefix for the auto generated IPs for service entries. Only the first two octets: Eg- 240.0")


return rootCmd
}
Expand Down
35 changes: 32 additions & 3 deletions admiral/pkg/apis/admiral/routes/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net/http"
"strconv"
"strings"

"github.com/gorilla/mux"
Expand All @@ -27,10 +28,38 @@ type IdentityServiceEntry struct {
ClusterNames []string `json:"Clusters,omitempty"`
}

func (opts *RouteOpts) ReturnSuccessGET(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
response := fmt.Sprintf("Heath check method called: %v, URI: %v, Method: %v\n", r.Host, r.RequestURI, r.Method)
/*
We expect the DNS health checker to include the query param checkifreadonly with value set to true.
The query param is used to check if the current Admiral instance is running in Active Mode or Passive Mode (also called read only mode).
If Running in passive mode, the health check returns 502 which forces DNS lookup to always return reference to Admiral in Active state.
*/

func (opts *RouteOpts) ReturnSuccessGET(w http.ResponseWriter, r *http.Request) {
allQueryParams:= r.URL.Query()
checkIfReadOnlyStringVal := allQueryParams.Get("checkifreadonly")
//Remove all spaces
checkIfReadOnlyStringVal = strings.ReplaceAll(checkIfReadOnlyStringVal," ","")
// checkIfReadOnlyStringVal will be empty in case ""checkifreadonly" query param is not sent in the request. checkIfReadOnlyBoolVal will be false
checkIfReadOnlyBoolVal, err := strconv.ParseBool(checkIfReadOnlyStringVal)
var response string

if len(checkIfReadOnlyStringVal) ==0 || nil==err {
if checkIfReadOnlyBoolVal{

if clusters.CurrentAdmiralState.ReadOnly{
//Force fail health check if Admiral is in Readonly mode
w.WriteHeader(503)
}else {
w.WriteHeader(200)
}
}else {
w.WriteHeader(200)
}
response = fmt.Sprintf("Heath check method called: %v, URI: %v, Method: %v\n", r.Host, r.RequestURI, r.Method)
}else {
w.WriteHeader(400)
response = fmt.Sprintf("Health check method called with bad query param value %v for checkifreadonly",checkIfReadOnlyStringVal)
}
_, writeErr := w.Write([]byte(response))
if writeErr != nil {
log.Printf("Error writing body: %v", writeErr)
Expand Down
53 changes: 53 additions & 0 deletions admiral/pkg/clusters/DRUtil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package clusters

import (
"context"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
log "github.com/sirupsen/logrus"
)
const ReadWriteEnabled = false
const ReadOnlyEnabled = true;
const StateNotInitialized = false;
const StateInitialized =true;

type AdmiralState struct {
ReadOnly bool
IsStateInitialized bool
}
var CurrentAdmiralState AdmiralState

type AdmiralStateChecker interface {
runStateCheck(ctx context.Context)
shouldRunOnIndependentGoRoutine() bool
}
/*
Utility function to start Admiral DR checks.
DR checks can be run either on the main go routine or a new go routine
*/
func RunAdmiralStateCheck(ctx context.Context,asc AdmiralStateChecker){
log.Infof("Starting Disaster Recovery state checks")
if asc.shouldRunOnIndependentGoRoutine() {
log.Info("Starting Admiral State Checker on a new Go Routine")
go asc.runStateCheck(ctx)
}else {
log.Infof("Starting Admiral State Checker on existing Go Routine")
asc.runStateCheck(ctx)
}
}

/*
utility function to identify the Admiral DR implementation based on the program parameters
*/
func startAdmiralStateChecker (ctx context.Context,params common.AdmiralParams){
var admiralStateChecker AdmiralStateChecker
switch params.AdmiralStateCheckerName {
/*
Add entries for your custom Disaster Recovery state checkers below
case "keywordforsomecustomchecker":
admiralStateChecker = customChecker{}
*/
default:
admiralStateChecker = NoOPStateChecker{}
}
RunAdmiralStateCheck(ctx,admiralStateChecker)
}
23 changes: 23 additions & 0 deletions admiral/pkg/clusters/NoOpDR.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package clusters

import (
"context"
log "github.com/sirupsen/logrus"
)

/*
Default implementation of the interface defined for DR
*/

type NoOPStateChecker struct {}

func (NoOPStateChecker) shouldRunOnIndependentGoRoutine() bool{
return false;
}

func (NoOPStateChecker) runStateCheck(ctx context.Context){
log.Info("NoOP State Checker called. Marking Admiral state as Read/Write enabled")
CurrentAdmiralState.ReadOnly = ReadWriteEnabled
CurrentAdmiralState.IsStateInitialized = StateInitialized
}

36 changes: 36 additions & 0 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,43 @@ func getOutlierDetection(se *v1alpha32.ServiceEntry, locality string, gtpTraffic
}

func (se *ServiceEntryHandler) Added(obj *v1alpha3.ServiceEntry) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (se *ServiceEntryHandler) Updated(obj *v1alpha3.ServiceEntry) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (se *ServiceEntryHandler) Deleted(obj *v1alpha3.ServiceEntry) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (dh *DestinationRuleHandler) Added(obj *v1alpha3.DestinationRule) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
Expand All @@ -177,6 +193,10 @@ func (dh *DestinationRuleHandler) Added(obj *v1alpha3.DestinationRule) {
}

func (dh *DestinationRuleHandler) Updated(obj *v1alpha3.DestinationRule) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
Expand All @@ -185,6 +205,10 @@ func (dh *DestinationRuleHandler) Updated(obj *v1alpha3.DestinationRule) {
}

func (dh *DestinationRuleHandler) Deleted(obj *v1alpha3.DestinationRule) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
Expand All @@ -193,6 +217,10 @@ func (dh *DestinationRuleHandler) Deleted(obj *v1alpha3.DestinationRule) {
}

func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
Expand All @@ -204,6 +232,10 @@ func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {
}

func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {
if CurrentAdmiralState.ReadOnly{
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
Expand All @@ -215,6 +247,10 @@ func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {
}

func (vh *VirtualServiceHandler) Deleted(obj *v1alpha3.VirtualService) {
if CurrentAdmiralState.ReadOnly {
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Admiral is in read-only mode. Skipping resource from namespace="+obj.Namespace)
return
}
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
Expand Down
27 changes: 26 additions & 1 deletion admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/istio"
"k8s.io/client-go/rest"
"os"
"sync"
"time"

Expand All @@ -25,6 +26,11 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
log.Infof("Initializing Admiral with params: %v", params)

common.InitializeConfig(params)

CurrentAdmiralState = AdmiralState{ReadOnly: ReadOnlyEnabled,IsStateInitialized: StateNotInitialized}
startAdmiralStateChecker(ctx,params)
pauseForAdmiralToInitializeState()

w := RemoteRegistry{
ctx: ctx,
StartTime: time.Now(),
Expand Down Expand Up @@ -66,7 +72,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
log.Info("argo rollouts disabled")
}

configMapController, err := admiral.NewConfigMapController()
configMapController, err := admiral.NewConfigMapController(params.ServiceEntryIPPrefix)
if err != nil {
return nil, fmt.Errorf(" Error with configmap controller init: %v", err)
}
Expand All @@ -83,6 +89,25 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
return &w, nil
}

func pauseForAdmiralToInitializeState(){
// Sleep until Admiral determines state. This is done to make sure events are not skipped during startup while determining READ-WRITE state
start := time.Now()
log.Info("Pausing thread to let Admiral determine it's READ-WRITE state. This is to let Admiral determine it's state during startup")
for {
if CurrentAdmiralState.IsStateInitialized {
log.Infof("Time taken for Admiral to complete state initialization =%v ms", time.Since(start).Milliseconds())
break
}
if time.Since(start).Milliseconds() > 60000 {
log.Error("Admiral not initialized after 60 seconds. Exiting now!!")
os.Exit(-1)
}
log.Debug("Admiral is waiting to determine state before proceeding with boot up")
time.Sleep(100 * time.Millisecond)
}

}

func createSecretController(ctx context.Context, w *RemoteRegistry) error {
var err error
var controller *secret.Controller
Expand Down
9 changes: 7 additions & 2 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s

defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()

if CurrentAdmiralState.ReadOnly {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped as Admiral is in Read-only mode")
return nil
}

if IsCacheWarmupTime(remoteRegistry) {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped during cache warm up state")
return nil
Expand Down Expand Up @@ -573,7 +578,7 @@ func GenerateNewAddressAndAddToConfigMap(seName string, configMapController admi

secondIndex := (len(newAddressState.Addresses) / 255) + 10
firstIndex := (len(newAddressState.Addresses) % 255) + 1
address := common.LocalAddressPrefix + common.Sep + strconv.Itoa(secondIndex) + common.Sep + strconv.Itoa(firstIndex)
address := configMapController.GetIPPrefixForServiceEntries() + common.Sep + strconv.Itoa(secondIndex) + common.Sep + strconv.Itoa(firstIndex)

for util.Contains(newAddressState.Addresses, address) {
if firstIndex < 255 {
Expand All @@ -582,7 +587,7 @@ func GenerateNewAddressAndAddToConfigMap(seName string, configMapController admi
secondIndex++
firstIndex = 0
}
address = common.LocalAddressPrefix + common.Sep + strconv.Itoa(secondIndex) + common.Sep + strconv.Itoa(firstIndex)
address = configMapController.GetIPPrefixForServiceEntries() + common.Sep + strconv.Itoa(secondIndex) + common.Sep + strconv.Itoa(firstIndex)
}
newAddressState.Addresses = append(newAddressState.Addresses, address)
newAddressState.EntryAddresses[seName] = address
Expand Down
11 changes: 10 additions & 1 deletion admiral/pkg/controller/admiral/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ const configmapName = "se-address-configmap"
type ConfigMapControllerInterface interface {
GetConfigMap() (*v1.ConfigMap, error)
PutConfigMap(newMap *v1.ConfigMap) error
GetIPPrefixForServiceEntries()(seIPPrefix string)
}

type ConfigMapController struct {
K8sClient kubernetes.Interface
ConfigmapNamespace string
ServiceEntryIPPrefix string
}

//todo this is a temp state, eventually changes will have to be made to give each cluster it's own configmap

func NewConfigMapController() (*ConfigMapController, error) {
func NewConfigMapController(seIPPrefix string) (*ConfigMapController, error) {
kubeconfigPath := common.GetKubeconfigPath()
namespaceToUse := common.GetSyncNamespace()

if kubeconfigPath == "" {
config, err := rest.InClusterConfig()
if err != nil {
Expand All @@ -39,6 +42,7 @@ func NewConfigMapController() (*ConfigMapController, error) {
controller := ConfigMapController{
K8sClient: client,
ConfigmapNamespace: namespaceToUse,
ServiceEntryIPPrefix: seIPPrefix,
}
return &controller, nil
} else {
Expand All @@ -56,6 +60,7 @@ func NewConfigMapController() (*ConfigMapController, error) {
controller := ConfigMapController{
K8sClient: client,
ConfigmapNamespace: namespaceToUse,
ServiceEntryIPPrefix: seIPPrefix,
}
return &controller, nil
}
Expand Down Expand Up @@ -85,3 +90,7 @@ func (c *ConfigMapController) PutConfigMap(newMap *v1.ConfigMap) error {
_, err := c.K8sClient.CoreV1().ConfigMaps(c.ConfigmapNamespace).Update(newMap)
return err
}

func (c *ConfigMapController)GetIPPrefixForServiceEntries() (string) {
return c.ServiceEntryIPPrefix
}
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestNewConfigMapController(t *testing.T) {
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
common.SetKubeconfigPath(c.kubeconfigPath)
controller, err := NewConfigMapController()
controller, err := NewConfigMapController("240.0")
if err == nil && c.expectedError == nil {
//only do these in an error-less context
if c.namespace != controller.ConfigmapNamespace {
Expand Down
Loading