Skip to content

Commit

Permalink
fix: all rlps should go to the limits config - maybe there are overri…
Browse files Browse the repository at this point in the history
…dden for one gateway but not for all gateways
  • Loading branch information
guicassolato committed May 23, 2024
1 parent 92519d7 commit ad90f28
Showing 1 changed file with 22 additions and 99 deletions.
121 changes: 22 additions & 99 deletions controllers/ratelimitpolicy_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package controllers

import (
"context"
"slices"
"sort"

"github.com/go-logr/logr"
limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
"github.com/samber/lo"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"

kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/common"
Expand All @@ -21,29 +17,29 @@ import (
)

func (r *RateLimitPolicyReconciler) reconcileLimits(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy) error {
topology, policies, err := r.generateTopology(ctx)
policies, err := r.getPolicies(ctx)
if err != nil {
return err
}
return r.reconcileLimitador(ctx, rlp, topology, policies)
return r.reconcileLimitador(ctx, rlp, policies)
}

func (r *RateLimitPolicyReconciler) deleteLimits(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy) error {
topology, policies, err := r.generateTopology(ctx)
policies, err := r.getPolicies(ctx)
if err != nil {
return err
}
policiesWithoutRLP := utils.Filter(policies, func(policy kuadrantgatewayapi.Policy) bool {
return client.ObjectKeyFromObject(policy) != client.ObjectKeyFromObject(rlp)
})
return r.reconcileLimitador(ctx, rlp, topology, policiesWithoutRLP)
return r.reconcileLimitador(ctx, rlp, policiesWithoutRLP)
}

func (r *RateLimitPolicyReconciler) reconcileLimitador(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, topology *kuadrantgatewayapi.Topology, policies []kuadrantgatewayapi.Policy) error {
func (r *RateLimitPolicyReconciler) reconcileLimitador(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, policies []kuadrantgatewayapi.Policy) error {
logger, _ := logr.FromContext(ctx)
logger = logger.WithName("reconcileLimitador").WithValues("policies", utils.Map(policies, func(p kuadrantgatewayapi.Policy) string { return client.ObjectKeyFromObject(p).String() }))

rateLimitIndex := r.buildRateLimitIndex(ctx, topology, policies)
rateLimitIndex := r.buildRateLimitIndex(policies)

// get the current limitador cr for the kuadrant instance so we can compare if it needs to be updated
limitador, err := GetLimitador(ctx, r.Client(), rlp)
Expand Down Expand Up @@ -98,105 +94,32 @@ func GetLimitador(ctx context.Context, k8sclient client.Client, rlp *kuadrantv1b
return limitador, nil
}

func (r *RateLimitPolicyReconciler) buildRateLimitIndex(ctx context.Context, topology *kuadrantgatewayapi.Topology, policies []kuadrantgatewayapi.Policy) *rlptools.RateLimitIndex {
rateLimitIndex := rlptools.NewRateLimitIndex()

for _, policy := range policies {
rlpKey := client.ObjectKeyFromObject(policy)
if _, ok := rateLimitIndex.Get(rlpKey); ok {
continue
}

rlp := policy.(*kuadrantv1beta2.RateLimitPolicy)

// If rlp is targeting a route, limits may be overridden by other policies
if r.overridden(ctx, rlp, topology) {
continue
}

rateLimitIndex.Set(rlpKey, rlptools.LimitadorRateLimitsFromRLP(rlp))
}

return rateLimitIndex
}

func (r *RateLimitPolicyReconciler) overridden(ctx context.Context, rlp *kuadrantv1beta2.RateLimitPolicy, topology *kuadrantgatewayapi.Topology) bool {
// Only route policies can be overridden
if !kuadrantgatewayapi.IsTargetRefHTTPRoute(rlp.GetTargetRef()) {
return false
}

func (r *RateLimitPolicyReconciler) getPolicies(ctx context.Context) ([]kuadrantgatewayapi.Policy, error) {
logger, _ := logr.FromContext(ctx)
logger = logger.WithName("overridden")

// Only gateway policies can override route policies and those must not be marked for deletion
gatewayPolicies := utils.Filter(r.getRelatedPolicies(rlp, topology), func(policy kuadrantgatewayapi.Policy) bool {
return policy.GetDeletionTimestamp() == nil &&
kuadrantgatewayapi.IsTargetRefGateway(policy.GetTargetRef()) &&
policy.GetUID() != rlp.GetUID()
})

for _, policy := range gatewayPolicies {
p := policy.(*kuadrantv1beta2.RateLimitPolicy)
if p.Spec.Overrides != nil {
logger.V(1).Info("policy has been overridden, skipping corresponding limits config", "overridden by", client.ObjectKeyFromObject(p))
return true
}
}

return false
}

func (r *RateLimitPolicyReconciler) getRelatedPolicies(rlp *kuadrantv1beta2.RateLimitPolicy, t *kuadrantgatewayapi.Topology) []kuadrantgatewayapi.Policy {
topologyIndexes := kuadrantgatewayapi.NewTopologyIndexes(t)
policyKeyFunc := func(p kuadrantgatewayapi.Policy) client.ObjectKey { return client.ObjectKeyFromObject(p) }
var relatedPolicies []kuadrantgatewayapi.Policy
for _, gw := range t.Gateways() {
policyList := topologyIndexes.PoliciesFromGateway(gw.Gateway)
if slices.Contains(utils.Map(policyList, policyKeyFunc), client.ObjectKeyFromObject(rlp)) {
relatedPolicies = append(relatedPolicies, policyList...)
}
}
sort.Sort(kuadrantgatewayapi.PolicyByTargetRefKindAndCreationTimeStamp(relatedPolicies))
return lo.Uniq(relatedPolicies)
}

func (r *RateLimitPolicyReconciler) generateTopology(ctx context.Context) (*kuadrantgatewayapi.Topology, []kuadrantgatewayapi.Policy, error) {
logger, _ := logr.FromContext(ctx)

gwList := &gatewayapiv1.GatewayList{}
err := r.Client().List(ctx, gwList)
logger.V(1).Info("topology: list gateways", "#Gateways", len(gwList.Items), "err", err)
if err != nil {
return nil, nil, err
}

routeList := &gatewayapiv1.HTTPRouteList{}
err = r.Client().List(ctx, routeList)
logger.V(1).Info("topology: list httproutes", "#HTTPRoutes", len(routeList.Items), "err", err)
if err != nil {
return nil, nil, err
}

rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
err = r.Client().List(ctx, rlpList)
err := r.Client().List(ctx, rlpList)
logger.V(1).Info("topology: list rate limit policies", "#RLPS", len(rlpList.Items), "err", err)
if err != nil {
return nil, nil, err
return nil, err
}

policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p })

return policies, nil
}

func (r *RateLimitPolicyReconciler) buildRateLimitIndex(policies []kuadrantgatewayapi.Policy) *rlptools.RateLimitIndex {
rateLimitIndex := rlptools.NewRateLimitIndex()

sort.Sort(kuadrantgatewayapi.PolicyByTargetRefKindAndCreationTimeStamp(policies))

topology, err := kuadrantgatewayapi.NewTopology(
kuadrantgatewayapi.WithGateways(utils.Map(gwList.Items, ptr.To[gatewayapiv1.Gateway])),
kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To[gatewayapiv1.HTTPRoute])),
kuadrantgatewayapi.WithPolicies(policies),
kuadrantgatewayapi.WithLogger(logger),
)
if err != nil {
return nil, nil, err
for i := range policies {
policy := policies[i]
rlpKey := client.ObjectKeyFromObject(policy)
rlp := policy.(*kuadrantv1beta2.RateLimitPolicy)
rateLimitIndex.Set(rlpKey, rlptools.LimitadorRateLimitsFromRLP(rlp))
}

return topology, policies, nil
return rateLimitIndex
}

0 comments on commit ad90f28

Please sign in to comment.