|
- package redis
-
- import (
- "errors"
- "time"
-
- "github.com/garyburd/redigo/redis"
- )
-
- var (
- PrefixKey = "ratelimit:"
- ErrUnreachable = errors.New("redis is unreachable")
- RetryAfter = time.Second
- )
-
- const skipOnUnhealthy = 1000
-
- type bucketStore struct {
- pool *redis.Pool
-
- rate int
- windowSeconds int
- retryAfter *time.Time
- }
-
- // New creates new in-memory token bucket store.
- func New(pool *redis.Pool) *bucketStore {
- return &bucketStore{
- pool: pool,
- }
- }
-
- func (s *bucketStore) InitRate(rate int, window time.Duration) {
- s.rate = rate
- s.windowSeconds = int(window / time.Second)
- if s.windowSeconds <= 1 {
- s.windowSeconds = 1
- }
- }
-
- // Take implements TokenBucketStore interface. It takes token from a bucket
- // referenced by a given key, if available.
- func (s *bucketStore) Take(key string) (bool, int, time.Time, error) {
- if s.retryAfter != nil {
- if s.retryAfter.After(time.Now()) {
- return false, 0, time.Time{}, ErrUnreachable
- }
- s.retryAfter = nil
- }
- c := s.pool.Get()
- defer c.Close()
-
- // Number of tokens in the bucket.
- bucketLen, err := redis.Int(c.Do("LLEN", PrefixKey+key))
- if err != nil {
- next := time.Now().Add(time.Second)
- s.retryAfter = &next
- return false, 0, time.Time{}, err
- }
-
- // Bucket is full.
- if bucketLen >= s.rate {
- return false, 0, time.Time{}, nil
- }
-
- if bucketLen > 0 {
- // Bucket most probably exists, try to push a new token into it.
- // If RPUSHX returns 0 (ie. key expired between LLEN and RPUSHX), we need
- // to fall-back to RPUSH without returning error.
- c.Send("MULTI")
- c.Send("RPUSHX", PrefixKey+key, "")
- reply, err := redis.Ints(c.Do("EXEC"))
- if err != nil {
- next := time.Now().Add(time.Second)
- s.retryAfter = &next
- return false, 0, time.Time{}, err
- }
- bucketLen = reply[0]
- if bucketLen > 0 {
- return true, s.rate - bucketLen - 1, time.Time{}, nil
- }
- }
-
- c.Send("MULTI")
- c.Send("RPUSH", PrefixKey+key, "")
- c.Send("EXPIRE", PrefixKey+key, s.windowSeconds)
- if _, err := c.Do("EXEC"); err != nil {
- next := time.Now().Add(time.Second)
- s.retryAfter = &next
- return false, 0, time.Time{}, err
- }
-
- return true, s.rate - bucketLen - 1, time.Time{}, nil
- }
|