Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support not wait take func #100

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions limiter_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
package ratelimit // import "go.uber.org/ratelimit"

import (
"time"

"sync/atomic"
"time"
"unsafe"
)

Expand Down Expand Up @@ -66,6 +65,16 @@ func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// Take blocks to ensure that the time spent between multiple
// Take calls is on average per/rate.
func (t *atomicLimiter) Take() time.Time {
last, interval := t.take()
t.clock.Sleep(interval)
return last
}

func (t *atomicLimiter) TakeNoWait() (time.Time, time.Duration) {
return t.take()
}

func (t *atomicLimiter) take() (time.Time, time.Duration) {
var (
newState state
taken bool
Expand Down Expand Up @@ -105,6 +114,5 @@ func (t *atomicLimiter) Take() time.Time {
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
return newState.last
return newState.last, interval
}
16 changes: 12 additions & 4 deletions limiter_atomic_int64.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
package ratelimit // import "go.uber.org/ratelimit"

import (
"time"

"sync/atomic"
"time"
)

type atomicInt64Limiter struct {
Expand Down Expand Up @@ -57,6 +56,16 @@ func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicInt64Limiter) Take() time.Time {
last, interval := t.take()
t.clock.Sleep(interval)
return last
}

func (t *atomicInt64Limiter) TakeNoWait() (time.Time, time.Duration) {
return t.take()
}

func (t *atomicInt64Limiter) take() (time.Time, time.Duration) {
var (
newTimeOfNextPermissionIssue int64
now int64
Expand All @@ -82,6 +91,5 @@ func (t *atomicInt64Limiter) Take() time.Time {
break
}
}
t.clock.Sleep(time.Duration(newTimeOfNextPermissionIssue - now))
return time.Unix(0, newTimeOfNextPermissionIssue)
return time.Unix(0, newTimeOfNextPermissionIssue), time.Duration(newTimeOfNextPermissionIssue - now)
}
19 changes: 16 additions & 3 deletions limiter_mutexbased.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ func newMutexBased(rate int, opts ...Option) *mutexLimiter {
// Take blocks to ensure that the time spent between multiple
// Take calls is on average per/rate.
func (t *mutexLimiter) Take() time.Time {
last, interval := t.take()
t.clock.Sleep(interval)

return last
}

func (t *mutexLimiter) TakeNoWait() (time.Time, time.Duration) {
return t.take()
}

func (t *mutexLimiter) take() (time.Time, time.Duration) {
interval := time.Duration(0)

t.Lock()
defer t.Unlock()

Expand All @@ -59,7 +72,7 @@ func (t *mutexLimiter) Take() time.Time {
// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
return t.last, interval
}

// sleepFor calculates how much time we should sleep based on
Expand All @@ -77,12 +90,12 @@ func (t *mutexLimiter) Take() time.Time {

// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
interval = t.sleepFor
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}

return t.last
return t.last, interval
}
5 changes: 5 additions & 0 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
TakeNoWait() (time.Time, time.Duration)
}

// Clock is the minimum necessary interface to instantiate a rate limiter with
Expand Down Expand Up @@ -133,3 +134,7 @@ func NewUnlimited() Limiter {
func (unlimited) Take() time.Time {
return time.Now()
}

func (unlimited) TakeNoWait() (time.Time, time.Duration) {
return time.Now(), time.Duration(0)
}