You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

95 lines
2.1 KiB

  1. package redis
  2. import (
  3. "errors"
  4. "time"
  5. "github.com/garyburd/redigo/redis"
  6. )
  7. var (
  8. PrefixKey = "ratelimit:"
  9. ErrUnreachable = errors.New("redis is unreachable")
  10. RetryAfter = time.Second
  11. )
  12. const skipOnUnhealthy = 1000
  13. type bucketStore struct {
  14. pool *redis.Pool
  15. rate int
  16. windowSeconds int
  17. retryAfter *time.Time
  18. }
  19. // New creates new in-memory token bucket store.
  20. func New(pool *redis.Pool) *bucketStore {
  21. return &bucketStore{
  22. pool: pool,
  23. }
  24. }
  25. func (s *bucketStore) InitRate(rate int, window time.Duration) {
  26. s.rate = rate
  27. s.windowSeconds = int(window / time.Second)
  28. if s.windowSeconds <= 1 {
  29. s.windowSeconds = 1
  30. }
  31. }
  32. // Take implements TokenBucketStore interface. It takes token from a bucket
  33. // referenced by a given key, if available.
  34. func (s *bucketStore) Take(key string) (bool, int, time.Time, error) {
  35. if s.retryAfter != nil {
  36. if s.retryAfter.After(time.Now()) {
  37. return false, 0, time.Time{}, ErrUnreachable
  38. }
  39. s.retryAfter = nil
  40. }
  41. c := s.pool.Get()
  42. defer c.Close()
  43. // Number of tokens in the bucket.
  44. bucketLen, err := redis.Int(c.Do("LLEN", PrefixKey+key))
  45. if err != nil {
  46. next := time.Now().Add(time.Second)
  47. s.retryAfter = &next
  48. return false, 0, time.Time{}, err
  49. }
  50. // Bucket is full.
  51. if bucketLen >= s.rate {
  52. return false, 0, time.Time{}, nil
  53. }
  54. if bucketLen > 0 {
  55. // Bucket most probably exists, try to push a new token into it.
  56. // If RPUSHX returns 0 (ie. key expired between LLEN and RPUSHX), we need
  57. // to fall-back to RPUSH without returning error.
  58. c.Send("MULTI")
  59. c.Send("RPUSHX", PrefixKey+key, "")
  60. reply, err := redis.Ints(c.Do("EXEC"))
  61. if err != nil {
  62. next := time.Now().Add(time.Second)
  63. s.retryAfter = &next
  64. return false, 0, time.Time{}, err
  65. }
  66. bucketLen = reply[0]
  67. if bucketLen > 0 {
  68. return true, s.rate - bucketLen - 1, time.Time{}, nil
  69. }
  70. }
  71. c.Send("MULTI")
  72. c.Send("RPUSH", PrefixKey+key, "")
  73. c.Send("EXPIRE", PrefixKey+key, s.windowSeconds)
  74. if _, err := c.Do("EXEC"); err != nil {
  75. next := time.Now().Add(time.Second)
  76. s.retryAfter = &next
  77. return false, 0, time.Time{}, err
  78. }
  79. return true, s.rate - bucketLen - 1, time.Time{}, nil
  80. }