You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

367 lines
8.6 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bundler
  15. import (
  16. "context"
  17. "fmt"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. )
  23. func TestBundlerCount1(t *testing.T) {
  24. // Unbundled case: one item per bundle.
  25. handler := &testHandler{}
  26. b := NewBundler(int(0), handler.handle)
  27. b.BundleCountThreshold = 1
  28. b.DelayThreshold = time.Second
  29. for i := 0; i < 3; i++ {
  30. if err := b.Add(i, 1); err != nil {
  31. t.Fatal(err)
  32. }
  33. }
  34. b.Flush()
  35. got := handler.bundles()
  36. want := [][]int{{0}, {1}, {2}}
  37. if !reflect.DeepEqual(got, want) {
  38. t.Errorf("bundles: got %v, want %v", got, want)
  39. }
  40. // All bundles should have been handled "immediately": much less
  41. // than the delay threshold of 1s.
  42. tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
  43. twant := []int{0, 0, 0}
  44. if !reflect.DeepEqual(tgot, twant) {
  45. t.Errorf("times: got %v, want %v", tgot, twant)
  46. }
  47. }
  48. func TestBundlerCount3(t *testing.T) {
  49. handler := &testHandler{}
  50. b := NewBundler(int(0), handler.handle)
  51. b.BundleCountThreshold = 3
  52. b.DelayThreshold = 100 * time.Millisecond
  53. // Add 8 items.
  54. // The first two bundles of 3 should both be handled quickly.
  55. // The third bundle of 2 should not be handled for about DelayThreshold ms.
  56. for i := 0; i < 8; i++ {
  57. if err := b.Add(i, 1); err != nil {
  58. t.Fatal(err)
  59. }
  60. }
  61. time.Sleep(5 * b.DelayThreshold)
  62. // We should not need to close the bundler.
  63. bgot := handler.bundles()
  64. bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7}}
  65. if !reflect.DeepEqual(bgot, bwant) {
  66. t.Errorf("bundles: got %v, want %v", bgot, bwant)
  67. }
  68. tgot := quantizeTimes(handler.times(), b.DelayThreshold)
  69. if len(tgot) != 3 || tgot[0] != 0 || tgot[1] != 0 || tgot[2] == 0 {
  70. t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
  71. }
  72. }
  73. func TestBundlerByteThreshold(t *testing.T) {
  74. handler := &testHandler{}
  75. b := NewBundler(int(0), handler.handle)
  76. b.BundleCountThreshold = 10
  77. b.BundleByteThreshold = 3
  78. add := func(i interface{}, s int) {
  79. if err := b.Add(i, s); err != nil {
  80. t.Fatal(err)
  81. }
  82. }
  83. add(1, 1)
  84. add(2, 2)
  85. // Hit byte threshold: bundle = 1, 2
  86. add(3, 1)
  87. add(4, 1)
  88. add(5, 2)
  89. // Passed byte threshold, but not limit: bundle = 3, 4, 5
  90. add(6, 1)
  91. b.Flush()
  92. bgot := handler.bundles()
  93. bwant := [][]int{{1, 2}, {3, 4, 5}, {6}}
  94. if !reflect.DeepEqual(bgot, bwant) {
  95. t.Errorf("bundles: got %v, want %v", bgot, bwant)
  96. }
  97. tgot := quantizeTimes(handler.times(), b.DelayThreshold)
  98. twant := []int{0, 0, 0}
  99. if !reflect.DeepEqual(tgot, twant) {
  100. t.Errorf("times: got %v, want %v", tgot, twant)
  101. }
  102. }
  103. func TestBundlerLimit(t *testing.T) {
  104. handler := &testHandler{}
  105. b := NewBundler(int(0), handler.handle)
  106. b.BundleCountThreshold = 10
  107. b.BundleByteLimit = 3
  108. add := func(i interface{}, s int) {
  109. if err := b.Add(i, s); err != nil {
  110. t.Fatal(err)
  111. }
  112. }
  113. add(1, 1)
  114. add(2, 2)
  115. // Hit byte limit: bundle = 1, 2
  116. add(3, 1)
  117. add(4, 1)
  118. add(5, 2)
  119. // Exceeded byte limit: bundle = 3, 4
  120. add(6, 2)
  121. // Exceeded byte limit: bundle = 5
  122. b.Flush()
  123. bgot := handler.bundles()
  124. bwant := [][]int{{1, 2}, {3, 4}, {5}, {6}}
  125. if !reflect.DeepEqual(bgot, bwant) {
  126. t.Errorf("bundles: got %v, want %v", bgot, bwant)
  127. }
  128. tgot := quantizeTimes(handler.times(), b.DelayThreshold)
  129. twant := []int{0, 0, 0, 0}
  130. if !reflect.DeepEqual(tgot, twant) {
  131. t.Errorf("times: got %v, want %v", tgot, twant)
  132. }
  133. }
  134. func TestAddWait(t *testing.T) {
  135. var (
  136. mu sync.Mutex
  137. events []string
  138. )
  139. event := func(s string) {
  140. mu.Lock()
  141. events = append(events, s)
  142. mu.Unlock()
  143. }
  144. handlec := make(chan int)
  145. done := make(chan struct{})
  146. b := NewBundler(int(0), func(interface{}) {
  147. <-handlec
  148. event("handle")
  149. })
  150. b.BufferedByteLimit = 3
  151. addw := func(sz int) {
  152. if err := b.AddWait(context.Background(), 0, sz); err != nil {
  153. t.Fatal(err)
  154. }
  155. event(fmt.Sprintf("addw(%d)", sz))
  156. }
  157. addw(2)
  158. go func() {
  159. addw(3) // blocks until first bundle is handled
  160. close(done)
  161. }()
  162. // Give addw(3) a chance to finish
  163. time.Sleep(100 * time.Millisecond)
  164. handlec <- 1 // handle the first bundle
  165. select {
  166. case <-time.After(time.Second):
  167. t.Fatal("timed out")
  168. case <-done:
  169. }
  170. want := []string{"addw(2)", "handle", "addw(3)"}
  171. if !reflect.DeepEqual(events, want) {
  172. t.Errorf("got %v\nwant%v", events, want)
  173. }
  174. }
  175. func TestAddWaitCancel(t *testing.T) {
  176. b := NewBundler(int(0), func(interface{}) {})
  177. b.BufferedByteLimit = 3
  178. ctx, cancel := context.WithCancel(context.Background())
  179. go func() {
  180. time.Sleep(100 * time.Millisecond)
  181. cancel()
  182. }()
  183. err := b.AddWait(ctx, 0, 4)
  184. if want := context.Canceled; err != want {
  185. t.Fatalf("got %v, want %v", err, want)
  186. }
  187. }
  188. func TestBundlerErrors(t *testing.T) {
  189. // Use a handler that blocks forever, to force the bundler to run out of
  190. // memory.
  191. b := NewBundler(int(0), func(interface{}) { select {} })
  192. b.BundleByteLimit = 3
  193. b.BufferedByteLimit = 10
  194. if got, want := b.Add(1, 4), ErrOversizedItem; got != want {
  195. t.Fatalf("got %v, want %v", got, want)
  196. }
  197. for i := 0; i < 5; i++ {
  198. if err := b.Add(i, 2); err != nil {
  199. t.Fatal(err)
  200. }
  201. }
  202. if got, want := b.Add(5, 1), ErrOverflow; got != want {
  203. t.Fatalf("got %v, want %v", got, want)
  204. }
  205. }
  206. // Check that no more than HandlerLimit handlers are active at once.
  207. func TestConcurrentHandlersMax(t *testing.T) {
  208. const handlerLimit = 10
  209. var (
  210. mu sync.Mutex
  211. active int
  212. maxHandlers int
  213. )
  214. b := NewBundler(int(0), func(s interface{}) {
  215. mu.Lock()
  216. active++
  217. if active > maxHandlers {
  218. maxHandlers = active
  219. }
  220. if maxHandlers > handlerLimit {
  221. t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
  222. }
  223. mu.Unlock()
  224. time.Sleep(1 * time.Millisecond) // let the scheduler work
  225. mu.Lock()
  226. active--
  227. mu.Unlock()
  228. })
  229. b.BundleCountThreshold = 5
  230. b.HandlerLimit = 10
  231. defer b.Flush()
  232. more := 0 // extra iterations past saturation
  233. for i := 0; more == 0 || i < more; i++ {
  234. mu.Lock()
  235. m := maxHandlers
  236. mu.Unlock()
  237. if m >= handlerLimit && more == 0 {
  238. // Run past saturation to check that we don't exceed the max.
  239. more = 2 * i
  240. }
  241. b.Add(i, 1)
  242. }
  243. }
  244. // Check that Flush doesn't return until all prior items have been handled.
  245. func TestConcurrentFlush(t *testing.T) {
  246. var (
  247. mu sync.Mutex
  248. items = make(map[int]bool)
  249. )
  250. b := NewBundler(int(0), func(s interface{}) {
  251. mu.Lock()
  252. for _, i := range s.([]int) {
  253. items[i] = true
  254. }
  255. mu.Unlock()
  256. time.Sleep(10 * time.Millisecond)
  257. })
  258. b.BundleCountThreshold = 5
  259. b.HandlerLimit = 10
  260. defer b.Flush()
  261. var wg sync.WaitGroup
  262. defer wg.Wait()
  263. for i := 0; i < 50; i++ {
  264. b.Add(i, 1)
  265. if i%100 == 0 {
  266. i := i
  267. wg.Add(1)
  268. go func() {
  269. defer wg.Done()
  270. b.Flush()
  271. mu.Lock()
  272. defer mu.Unlock()
  273. for j := 0; j <= i; j++ {
  274. if !items[j] {
  275. // Cannot use Fatal, since we're in a non-test goroutine.
  276. t.Errorf("flush(%d): item %d not handled", i, j)
  277. break
  278. }
  279. }
  280. }()
  281. }
  282. }
  283. }
  284. type testHandler struct {
  285. mu sync.Mutex
  286. b [][]int
  287. t []time.Time
  288. }
  289. func (t *testHandler) bundles() [][]int {
  290. t.mu.Lock()
  291. defer t.mu.Unlock()
  292. return t.b
  293. }
  294. func (t *testHandler) times() []time.Time {
  295. t.mu.Lock()
  296. defer t.mu.Unlock()
  297. return t.t
  298. }
  299. func (t *testHandler) handle(b interface{}) {
  300. t.mu.Lock()
  301. defer t.mu.Unlock()
  302. t.b = append(t.b, b.([]int))
  303. t.t = append(t.t, time.Now())
  304. }
  305. // Round times to the nearest q and express them as the number of q
  306. // since the first time.
  307. // E.g. if q is 100ms, then a time within 50ms of the first time
  308. // will be represented as 0, a time 150 to 250ms of the first time
  309. // we be represented as 1, etc.
  310. func quantizeTimes(times []time.Time, q time.Duration) []int {
  311. var rs []int
  312. for _, t := range times {
  313. d := t.Sub(times[0])
  314. r := int((d + q/2) / q)
  315. rs = append(rs, r)
  316. }
  317. return rs
  318. }
  319. func TestQuantizeTimes(t *testing.T) {
  320. quantum := 100 * time.Millisecond
  321. for _, test := range []struct {
  322. millis []int // times in milliseconds
  323. want []int
  324. }{
  325. {[]int{10, 20, 30}, []int{0, 0, 0}},
  326. {[]int{0, 49, 50, 90}, []int{0, 0, 1, 1}},
  327. {[]int{0, 95, 170, 315}, []int{0, 1, 2, 3}},
  328. } {
  329. var times []time.Time
  330. for _, ms := range test.millis {
  331. times = append(times, time.Unix(0, int64(ms*1e6)))
  332. }
  333. got := quantizeTimes(times, quantum)
  334. if !reflect.DeepEqual(got, test.want) {
  335. t.Errorf("%v: got %v, want %v", test.millis, got, test.want)
  336. }
  337. }
  338. }