Skip to content

Commit

Permalink
Merge pull request #200 from AwesomePatrol/add-new-rate-limiter-types
Browse files Browse the repository at this point in the history
Add new types of rate limiters
  • Loading branch information
google-oss-prow[bot] authored May 9, 2024
2 parents f7f6255 + 0f65d14 commit cf4aded
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 2 deletions.
20 changes: 19 additions & 1 deletion e2e/testmain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,26 @@ func TestMain(m *testing.M) {
}
}
client := oauth2.NewClient(ctx, ts)

mrl := &cloud.MinimumRateLimiter{RateLimiter: &cloud.NopRateLimiter{}, Minimum: 50 * time.Millisecond}
svc, err := cloud.NewService(ctx, client, &cloud.SingleProjectRouter{ID: testFlags.project}, mrl)
crl := cloud.NewCompositeRateLimiter(mrl)

// The default limit is 1500 per minute. Leave 200 buffer.
computeRL := cloud.NewTickerRateLimiter(1300, time.Minute)
crl.Register("HealthChecks", "", computeRL)
crl.Register("BackendServices", "", computeRL)
crl.Register("NetworkEndpointGroups", "", computeRL)

// The default limit is 1200 per minute. Leave 200 buffer.
networkServicesRL := cloud.NewTickerRateLimiter(1000, time.Minute)
crl.Register("TcpRoutes", "", networkServicesRL)
crl.Register("Meshes", "", networkServicesRL)

// To ensure minimum time between operations, wrap the network services rate limiter.
orl := &cloud.MinimumRateLimiter{RateLimiter: networkServicesRL, Minimum: 100 * time.Millisecond}
crl.Register("Operations", "", orl)

svc, err := cloud.NewService(ctx, client, &cloud.SingleProjectRouter{ID: testFlags.project}, crl)
if err != nil {
log.Fatal(err)
}
Expand Down
133 changes: 132 additions & 1 deletion pkg/cloud/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ type MinimumRateLimiter struct {
// Accept blocks on the minimum duration and context. Once the minimum duration is met,
// the func is blocked on the underlying ratelimiter.
func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
t := time.NewTimer(m.Minimum)
select {
case <-time.After(m.Minimum):
case <-t.C:
return m.RateLimiter.Accept(ctx, key)
case <-ctx.Done():
t.Stop()
return ctx.Err()
}
}
Expand All @@ -112,3 +114,132 @@ func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) erro
func (m *MinimumRateLimiter) Observe(ctx context.Context, err error, key *RateLimitKey) {
m.RateLimiter.Observe(ctx, err, key)
}

// TickerRateLimiter uses time.Ticker to spread Accepts over time.
//
// Concurrent calls to Accept will block on the same channel. It is not
// guaranteed what caller will be unblocked first.
type TickerRateLimiter struct {
ticker *time.Ticker
}

// NewTickerRateLimiter creates a new TickerRateLimiter which will space Accept
// calls at least interval/limit time apart.
//
// For example, limit=4 interval=time.Minute will unblock a single Accept call
// every 15 seconds.
func NewTickerRateLimiter(limit int, interval time.Duration) *TickerRateLimiter {
return &TickerRateLimiter{
ticker: time.NewTicker(interval / time.Duration(limit)),
}
}

// Accept will block until a time, specified when creating TickerRateLimiter,
// passes since the last call to Accept.
func (t *TickerRateLimiter) Accept(ctx context.Context, rlk *RateLimitKey) error {
select {
case <-t.ticker.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// Observe does nothing.
func (*TickerRateLimiter) Observe(context.Context, error, *RateLimitKey) {
}

// Make sure that TickerRateLimiter implements RateLimiter.
var _ RateLimiter = new(TickerRateLimiter)

// CompositeRateLimiter combines rate limiters based on RateLimitKey.
type CompositeRateLimiter struct {
// map[resource name]map[operation name]RateLimiter
rateLimiters map[string]map[string]RateLimiter
// defaultRL is used when no matching RateLimiter was found.
defaultRL RateLimiter
}

// NewCompositeRateLimiter creates a new CompositeRateLimiter that will use
// provided default rate limiter if no better match is found. It is intended to
// be used for a single project.
//
// # Example
//
// bsDefaultRL := /* backend service default rate limiter */
// bsGetListRL := /* backend service rate limiter for get and list operations */
//
// rl := NewCompositeRateLimiter(defaultRL)
// rl.Register("BackendServices", "", bsDefaultRL)
// rl.Register("BackendServices", "Get", bsGetListRL)
// rl.Register("BackendServices", "List", bsGetListRL)
//
// This rate limiter is not nesting. Only one rate limiter is used for any
// particular combination of: resource, operation. For the case above, rate
// limiter registered at ("BackendServices", "") won't be applied to operation
// ("BackendServices", "Get"), because a more specific rate limiter was
// registered.
func NewCompositeRateLimiter(defaultRL RateLimiter) *CompositeRateLimiter {
m := map[string]map[string]RateLimiter{
"": {
"": defaultRL,
},
}
return &CompositeRateLimiter{
rateLimiters: m,
defaultRL: defaultRL,
}
}

// ensureExists creates sub-maps as needed.
func (c *CompositeRateLimiter) ensureExists(service string) {
if _, ok := c.rateLimiters[service]; !ok {
c.rateLimiters[service] = map[string]RateLimiter{}
}
}

// fillMissing finds all combinations where resource and/or operation name
// could be omitted and sets it to defaultRL.
func (c *CompositeRateLimiter) fillMissing() {
for _, subService := range c.rateLimiters {
if subService[""] == nil {
subService[""] = c.defaultRL
}
}
}

// Register adds provided rl to the composite rate limiter. Service, operation
// can be omitted by providing an empty string. In this case, the provided rate
// limiter will be used only when there is no other rate limiter matching a
// particular resource, or operation.
//
// It replaces previous rate limiter provided for the same service, operation
// combination. Once a rate limiter is added, it can't be removed.
//
// Same rate limiter can be used for multiple Register calls.
func (c *CompositeRateLimiter) Register(service, operation string, rl RateLimiter) {
c.ensureExists(service)
c.rateLimiters[service][operation] = rl
c.fillMissing()
}

// Accept either calls underlying rate limiter matching rlk or a default rate
// limiter when none is found.
func (c *CompositeRateLimiter) Accept(ctx context.Context, rlk *RateLimitKey) error {
if rlk == nil {
return c.defaultRL.Accept(ctx, rlk)
}
service := rlk.Service
if _, ok := c.rateLimiters[service]; !ok {
service = ""
}
operation := rlk.Operation
if _, ok := c.rateLimiters[service][operation]; !ok {
operation = ""
}
return c.rateLimiters[service][operation].Accept(ctx, rlk)
}

// Observe does nothing.
func (*CompositeRateLimiter) Observe(context.Context, error, *RateLimitKey) {
}
146 changes: 146 additions & 0 deletions pkg/cloud/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,149 @@ func TestMinimumRateLimiter(t *testing.T) {
t.Errorf("`called` = true, want false")
}
}

func TestTickerRateLimiter(t *testing.T) {
t.Parallel()

trl := NewTickerRateLimiter(100, time.Second)
start := time.Now()
for i := 0; i < 50; i++ {
err := trl.Accept(context.Background(), nil)
if err != nil {
t.Errorf("TickerRateLimiter.Accept = %v, want nil", err)
}
}
elapsed := time.Since(start)
if elapsed > time.Second {
t.Errorf("TickerRateLimiter.Accept took too long: %v, want <1s", elapsed)
}
if elapsed < 500*time.Millisecond {
t.Errorf("TickerRateLimiter.Accept took too short: %v, want >500ms", elapsed)
}

// Use context that has been cancelled and expect a context error returned.
ctxCancelled, cancelled := context.WithCancel(context.Background())
cancelled()
// Verify context is cancelled by now.
<-ctxCancelled.Done()
err := trl.Accept(ctxCancelled, nil)
if err != ctxCancelled.Err() {
t.Errorf("TickerRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
}
}

func TestCompositeRateLimiter(t *testing.T) {
t.Parallel()

var calledA bool
fa := &FakeAcceptor{accept: func() { calledA = true }}
arl := &AcceptRateLimiter{fa}
rl := NewCompositeRateLimiter(arl)

// Call default.
err := rl.Accept(context.Background(), nil)
if err != nil {
t.Errorf("CompositeRateLimiter.Accept = %v, want nil", err)
}
if !calledA {
t.Errorf("`calledA` = false, want true")
}

calledA = false
calledB := false
fb := &FakeAcceptor{accept: func() { calledB = true }}
brl := &AcceptRateLimiter{fb}
rl.Register("Meshes", "", brl)

// Call registered rate limiter.
err = rl.Accept(context.Background(), &CallContextKey{Service: "Meshes"})
if err != nil {
t.Errorf("CompositeRateLimiter.Accept = %v, want nil", err)
}
if !calledB {
t.Errorf("`calledB` = false, want true")
}
if calledA {
t.Errorf("`calledA` = true, want false")
}

calledB = false
// Call default rate limiter when registered is not found
err = rl.Accept(context.Background(), &CallContextKey{Service: "service-does-not-exist"})
if err != nil {
t.Errorf("CompositeRateLimiter.Accept = %v, want nil", err)
}
if !calledA {
t.Errorf("`calledA` = false, want true")
}
if calledB {
t.Errorf("`calledB` = true, want false")
}

calledA = false
calledC := false
fc := &FakeAcceptor{accept: func() { calledC = true }}
crl := &AcceptRateLimiter{fc}
rl.Register("", "Get", crl)

// Call rate limiter for network service when no project was specified
err = rl.Accept(context.Background(), &CallContextKey{ProjectID: "project-does-not-exist", Service: "Networks", Operation: "Get"})
if err != nil {
t.Errorf("CompositeRateLimiter.Accept = %v, want nil", err)
}
if !calledC {
t.Errorf("`calledC` = false, want true")
}
if calledA {
t.Errorf("`calledA` = true, want false")
}
if calledB {
t.Errorf("`calledB` = true, want false")
}
}

type CountingRateLimiter int

func (crl *CountingRateLimiter) Accept(_ context.Context, key *CallContextKey) error {
*crl++
return nil
}

func (*CountingRateLimiter) Observe(context.Context, error, *RateLimitKey) {}

func TestCompositeRateLimiter_Table(t *testing.T) {
t.Parallel()

def := new(CountingRateLimiter)
rl := NewCompositeRateLimiter(def)
defNetRL := new(CountingRateLimiter)
rl.Register("networks", "", defNetRL)
getNetRL := new(CountingRateLimiter)
rl.Register("networks", "get", getNetRL)

for _, project := range []string{"", "projectB", "project-does-not-exist"} {
for _, service := range []string{"", "networks", "service-does-not-exist"} {
for _, operation := range []string{"", "get", "operation-does-not-exist"} {
key := &CallContextKey{
ProjectID: project,
Service: service,
Operation: operation,
}
err := rl.Accept(context.Background(), key)
if err != nil {
t.Errorf("CompositeRateLimiter.Accept = %v, want nil", err)
}
}
}
}

if *def != 18 {
t.Errorf("def served %d calls, want = 18", *def)
}
if *defNetRL != 6 {
t.Errorf("defNetRL served %d calls, want = 6", *defNetRL)
}
if *getNetRL != 3 {
t.Errorf("getNetRL served %d calls, want = 3", *getNetRL)
}
}

0 comments on commit cf4aded

Please sign in to comment.