You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

256 lines
6.3 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "sync/atomic"
  20. "testing"
  21. "time"
  22. "golang.org/x/sync/errgroup"
  23. )
  24. func TestFlowControllerCancel(t *testing.T) {
  25. // Test canceling a flow controller's context.
  26. t.Parallel()
  27. fc := newFlowController(3, 10)
  28. if err := fc.acquire(context.Background(), 5); err != nil {
  29. t.Fatal(err)
  30. }
  31. // Experiment: a context that times out should always return an error.
  32. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
  33. defer cancel()
  34. if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded {
  35. t.Fatalf("got %v, expected DeadlineExceeded", err)
  36. }
  37. // Control: a context that is not done should always return nil.
  38. go func() {
  39. time.Sleep(5 * time.Millisecond)
  40. fc.release(5)
  41. }()
  42. if err := fc.acquire(context.Background(), 6); err != nil {
  43. t.Errorf("got %v, expected nil", err)
  44. }
  45. }
  46. func TestFlowControllerLargeRequest(t *testing.T) {
  47. // Large requests succeed, consuming the entire allotment.
  48. t.Parallel()
  49. fc := newFlowController(3, 10)
  50. err := fc.acquire(context.Background(), 11)
  51. if err != nil {
  52. t.Fatal(err)
  53. }
  54. }
  55. func TestFlowControllerNoStarve(t *testing.T) {
  56. // A large request won't starve, because the flowController is
  57. // (best-effort) FIFO.
  58. t.Parallel()
  59. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  60. defer cancel()
  61. fc := newFlowController(10, 10)
  62. first := make(chan int)
  63. for i := 0; i < 20; i++ {
  64. go func() {
  65. for {
  66. if err := fc.acquire(ctx, 1); err != nil {
  67. if err != context.Canceled {
  68. t.Error(err)
  69. }
  70. return
  71. }
  72. select {
  73. case first <- 1:
  74. default:
  75. }
  76. fc.release(1)
  77. }
  78. }()
  79. }
  80. <-first // Wait until the flowController's state is non-zero.
  81. if err := fc.acquire(ctx, 11); err != nil {
  82. t.Errorf("got %v, want nil", err)
  83. }
  84. }
  85. func TestFlowControllerSaturation(t *testing.T) {
  86. t.Parallel()
  87. const (
  88. maxCount = 6
  89. maxSize = 10
  90. )
  91. for _, test := range []struct {
  92. acquireSize int
  93. wantCount, wantSize int64
  94. }{
  95. {
  96. // Many small acquires cause the flow controller to reach its max count.
  97. acquireSize: 1,
  98. wantCount: 6,
  99. wantSize: 6,
  100. },
  101. {
  102. // Five acquires of size 2 will cause the flow controller to reach its max size,
  103. // but not its max count.
  104. acquireSize: 2,
  105. wantCount: 5,
  106. wantSize: 10,
  107. },
  108. {
  109. // If the requests are the right size (relatively prime to maxSize),
  110. // the flow controller will not saturate on size. (In this case, not on count either.)
  111. acquireSize: 3,
  112. wantCount: 3,
  113. wantSize: 9,
  114. },
  115. } {
  116. fc := newFlowController(maxCount, maxSize)
  117. // Atomically track flow controller state.
  118. // The flowController itself tracks count.
  119. var curSize int64
  120. success := errors.New("")
  121. // Time out if wantSize or wantCount is never reached.
  122. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  123. defer cancel()
  124. g, ctx := errgroup.WithContext(ctx)
  125. for i := 0; i < 10; i++ {
  126. g.Go(func() error {
  127. var hitCount, hitSize bool
  128. // Run at least until we hit the expected values, and at least
  129. // for enough iterations to exceed them if the flow controller
  130. // is broken.
  131. for i := 0; i < 100 || !hitCount || !hitSize; i++ {
  132. select {
  133. case <-ctx.Done():
  134. return ctx.Err()
  135. default:
  136. }
  137. if err := fc.acquire(ctx, test.acquireSize); err != nil {
  138. return err
  139. }
  140. c := int64(fc.count())
  141. if c > test.wantCount {
  142. return fmt.Errorf("count %d exceeds want %d", c, test.wantCount)
  143. }
  144. if c == test.wantCount {
  145. hitCount = true
  146. }
  147. s := atomic.AddInt64(&curSize, int64(test.acquireSize))
  148. if s > test.wantSize {
  149. return fmt.Errorf("size %d exceeds want %d", s, test.wantSize)
  150. }
  151. if s == test.wantSize {
  152. hitSize = true
  153. }
  154. time.Sleep(5 * time.Millisecond) // Let other goroutines make progress.
  155. if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
  156. return errors.New("negative size")
  157. }
  158. fc.release(test.acquireSize)
  159. }
  160. return success
  161. })
  162. }
  163. if err := g.Wait(); err != success {
  164. t.Errorf("%+v: %v", test, err)
  165. continue
  166. }
  167. }
  168. }
  169. func TestFlowControllerTryAcquire(t *testing.T) {
  170. t.Parallel()
  171. fc := newFlowController(3, 10)
  172. // Successfully tryAcquire 4 bytes.
  173. if !fc.tryAcquire(4) {
  174. t.Error("got false, wanted true")
  175. }
  176. // Fail to tryAcquire 7 bytes.
  177. if fc.tryAcquire(7) {
  178. t.Error("got true, wanted false")
  179. }
  180. // Successfully tryAcquire 6 byte.
  181. if !fc.tryAcquire(6) {
  182. t.Error("got false, wanted true")
  183. }
  184. }
  185. func TestFlowControllerUnboundedCount(t *testing.T) {
  186. t.Parallel()
  187. ctx := context.Background()
  188. fc := newFlowController(0, 10)
  189. // Successfully acquire 4 bytes.
  190. if err := fc.acquire(ctx, 4); err != nil {
  191. t.Errorf("got %v, wanted no error", err)
  192. }
  193. // Successfully tryAcquire 4 bytes.
  194. if !fc.tryAcquire(4) {
  195. t.Error("got false, wanted true")
  196. }
  197. // Fail to tryAcquire 3 bytes.
  198. if fc.tryAcquire(3) {
  199. t.Error("got true, wanted false")
  200. }
  201. }
  202. func TestFlowControllerUnboundedCount2(t *testing.T) {
  203. t.Parallel()
  204. ctx := context.Background()
  205. fc := newFlowController(0, 0)
  206. // Successfully acquire 4 bytes.
  207. if err := fc.acquire(ctx, 4); err != nil {
  208. t.Errorf("got %v, wanted no error", err)
  209. }
  210. fc.release(1)
  211. fc.release(1)
  212. fc.release(1)
  213. wantCount := int64(-2)
  214. c := int64(fc.count())
  215. if c != wantCount {
  216. t.Fatalf("got count %d, want %d", c, wantCount)
  217. }
  218. }
  219. func TestFlowControllerUnboundedBytes(t *testing.T) {
  220. t.Parallel()
  221. ctx := context.Background()
  222. fc := newFlowController(2, 0)
  223. // Successfully acquire 4GB.
  224. if err := fc.acquire(ctx, 4e9); err != nil {
  225. t.Errorf("got %v, wanted no error", err)
  226. }
  227. // Successfully tryAcquire 4GB bytes.
  228. if !fc.tryAcquire(4e9) {
  229. t.Error("got false, wanted true")
  230. }
  231. // Fail to tryAcquire a third message.
  232. if fc.tryAcquire(3) {
  233. t.Error("got true, wanted false")
  234. }
  235. }