You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

123 lines
3.4 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "sync/atomic"
  18. "golang.org/x/sync/semaphore"
  19. )
  20. // flowController implements flow control for Subscription.Receive.
  21. type flowController struct {
  22. maxCount int
  23. maxSize int // max total size of messages
  24. semCount, semSize *semaphore.Weighted // enforces max number and size of messages
  25. // Number of calls to acquire - number of calls to release. This can go
  26. // negative if semCount == nil and a large acquire is followed by multiple
  27. // small releases.
  28. // Atomic.
  29. countRemaining int64
  30. }
  31. // newFlowController creates a new flowController that ensures no more than
  32. // maxCount messages or maxSize bytes are outstanding at once. If maxCount or
  33. // maxSize is < 1, then an unlimited number of messages or bytes is permitted,
  34. // respectively.
  35. func newFlowController(maxCount, maxSize int) *flowController {
  36. fc := &flowController{
  37. maxCount: maxCount,
  38. maxSize: maxSize,
  39. semCount: nil,
  40. semSize: nil,
  41. }
  42. if maxCount > 0 {
  43. fc.semCount = semaphore.NewWeighted(int64(maxCount))
  44. }
  45. if maxSize > 0 {
  46. fc.semSize = semaphore.NewWeighted(int64(maxSize))
  47. }
  48. return fc
  49. }
  50. // acquire blocks until one message of size bytes can proceed or ctx is done.
  51. // It returns nil in the first case, or ctx.Err() in the second.
  52. //
  53. // acquire allows large messages to proceed by treating a size greater than maxSize
  54. // as if it were equal to maxSize.
  55. func (f *flowController) acquire(ctx context.Context, size int) error {
  56. if f.semCount != nil {
  57. if err := f.semCount.Acquire(ctx, 1); err != nil {
  58. return err
  59. }
  60. }
  61. if f.semSize != nil {
  62. if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
  63. if f.semCount != nil {
  64. f.semCount.Release(1)
  65. }
  66. return err
  67. }
  68. }
  69. atomic.AddInt64(&f.countRemaining, 1)
  70. return nil
  71. }
  72. // tryAcquire returns false if acquire would block. Otherwise, it behaves like
  73. // acquire and returns true.
  74. //
  75. // tryAcquire allows large messages to proceed by treating a size greater than
  76. // maxSize as if it were equal to maxSize.
  77. func (f *flowController) tryAcquire(size int) bool {
  78. if f.semCount != nil {
  79. if !f.semCount.TryAcquire(1) {
  80. return false
  81. }
  82. }
  83. if f.semSize != nil {
  84. if !f.semSize.TryAcquire(f.bound(size)) {
  85. if f.semCount != nil {
  86. f.semCount.Release(1)
  87. }
  88. return false
  89. }
  90. }
  91. atomic.AddInt64(&f.countRemaining, 1)
  92. return true
  93. }
  94. // release notes that one message of size bytes is no longer outstanding.
  95. func (f *flowController) release(size int) {
  96. atomic.AddInt64(&f.countRemaining, -1)
  97. if f.semCount != nil {
  98. f.semCount.Release(1)
  99. }
  100. if f.semSize != nil {
  101. f.semSize.Release(f.bound(size))
  102. }
  103. }
  104. func (f *flowController) bound(size int) int64 {
  105. if size > f.maxSize {
  106. return int64(f.maxSize)
  107. }
  108. return int64(size)
  109. }
  110. func (f *flowController) count() int {
  111. return int(atomic.LoadInt64(&f.countRemaining))
  112. }