You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

350 rivejä
12 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package bundler supports bundling (batching) of items. Bundling amortizes an
  15. // action with fixed costs over multiple items. For example, if an API provides
  16. // an RPC that accepts a list of items as input, but clients would prefer
  17. // adding items one at a time, then a Bundler can accept individual items from
  18. // the client and bundle many of them into a single RPC.
  19. //
  20. // This package is experimental and subject to change without notice.
  21. package bundler
  22. import (
  23. "context"
  24. "errors"
  25. "math"
  26. "reflect"
  27. "sync"
  28. "time"
  29. "golang.org/x/sync/semaphore"
  30. )
  31. const (
  32. DefaultDelayThreshold = time.Second
  33. DefaultBundleCountThreshold = 10
  34. DefaultBundleByteThreshold = 1e6 // 1M
  35. DefaultBufferedByteLimit = 1e9 // 1G
  36. )
  37. var (
  38. // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
  39. ErrOverflow = errors.New("bundler reached buffered byte limit")
  40. // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
  41. ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
  42. )
  43. // A Bundler collects items added to it into a bundle until the bundle
  44. // exceeds a given size, then calls a user-provided function to handle the bundle.
  45. type Bundler struct {
  46. // Starting from the time that the first message is added to a bundle, once
  47. // this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
  48. DelayThreshold time.Duration
  49. // Once a bundle has this many items, handle the bundle. Since only one
  50. // item at a time is added to a bundle, no bundle will exceed this
  51. // threshold, so it also serves as a limit. The default is
  52. // DefaultBundleCountThreshold.
  53. BundleCountThreshold int
  54. // Once the number of bytes in current bundle reaches this threshold, handle
  55. // the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
  56. // but does not cap the total size of a bundle.
  57. BundleByteThreshold int
  58. // The maximum size of a bundle, in bytes. Zero means unlimited.
  59. BundleByteLimit int
  60. // The maximum number of bytes that the Bundler will keep in memory before
  61. // returning ErrOverflow. The default is DefaultBufferedByteLimit.
  62. BufferedByteLimit int
  63. // The maximum number of handler invocations that can be running at once.
  64. // The default is 1.
  65. HandlerLimit int
  66. handler func(interface{}) // called to handle a bundle
  67. itemSliceZero reflect.Value // nil (zero value) for slice of items
  68. flushTimer *time.Timer // implements DelayThreshold
  69. mu sync.Mutex
  70. sem *semaphore.Weighted // enforces BufferedByteLimit
  71. semOnce sync.Once
  72. curBundle bundle // incoming items added to this bundle
  73. // Each bundle is assigned a unique ticket that determines the order in which the
  74. // handler is called. The ticket is assigned with mu locked, but waiting for tickets
  75. // to be handled is done via mu2 and cond, below.
  76. nextTicket uint64 // next ticket to be assigned
  77. mu2 sync.Mutex
  78. cond *sync.Cond
  79. nextHandled uint64 // next ticket to be handled
  80. // In this implementation, active uses space proportional to HandlerLimit, and
  81. // waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
  82. // or release occurs, so large values of HandlerLimit max may cause performance
  83. // issues.
  84. active map[uint64]bool // tickets of bundles actively being handled
  85. }
  86. type bundle struct {
  87. items reflect.Value // slice of item type
  88. size int // size in bytes of all items
  89. }
  90. // NewBundler creates a new Bundler.
  91. //
  92. // itemExample is a value of the type that will be bundled. For example, if you
  93. // want to create bundles of *Entry, you could pass &Entry{} for itemExample.
  94. //
  95. // handler is a function that will be called on each bundle. If itemExample is
  96. // of type T, the argument to handler is of type []T. handler is always called
  97. // sequentially for each bundle, and never in parallel.
  98. //
  99. // Configure the Bundler by setting its thresholds and limits before calling
  100. // any of its methods.
  101. func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
  102. b := &Bundler{
  103. DelayThreshold: DefaultDelayThreshold,
  104. BundleCountThreshold: DefaultBundleCountThreshold,
  105. BundleByteThreshold: DefaultBundleByteThreshold,
  106. BufferedByteLimit: DefaultBufferedByteLimit,
  107. HandlerLimit: 1,
  108. handler: handler,
  109. itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
  110. active: map[uint64]bool{},
  111. }
  112. b.curBundle.items = b.itemSliceZero
  113. b.cond = sync.NewCond(&b.mu2)
  114. return b
  115. }
  116. func (b *Bundler) initSemaphores() {
  117. // Create the semaphores lazily, because the user may set limits
  118. // after NewBundler.
  119. b.semOnce.Do(func() {
  120. b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
  121. })
  122. }
  123. // Add adds item to the current bundle. It marks the bundle for handling and
  124. // starts a new one if any of the thresholds or limits are exceeded.
  125. //
  126. // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
  127. // the item can never be handled. Add returns ErrOversizedItem in this case.
  128. //
  129. // If adding the item would exceed the maximum memory allowed
  130. // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
  131. // memory, Add returns ErrOverflow.
  132. //
  133. // Add never blocks.
  134. func (b *Bundler) Add(item interface{}, size int) error {
  135. // If this item exceeds the maximum size of a bundle,
  136. // we can never send it.
  137. if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
  138. return ErrOversizedItem
  139. }
  140. // If adding this item would exceed our allotted memory
  141. // footprint, we can't accept it.
  142. // (TryAcquire also returns false if anything is waiting on the semaphore,
  143. // so calls to Add and AddWait shouldn't be mixed.)
  144. b.initSemaphores()
  145. if !b.sem.TryAcquire(int64(size)) {
  146. return ErrOverflow
  147. }
  148. b.add(item, size)
  149. return nil
  150. }
  151. // add adds item to the current bundle. It marks the bundle for handling and
  152. // starts a new one if any of the thresholds or limits are exceeded.
  153. func (b *Bundler) add(item interface{}, size int) {
  154. b.mu.Lock()
  155. defer b.mu.Unlock()
  156. // If adding this item to the current bundle would cause it to exceed the
  157. // maximum bundle size, close the current bundle and start a new one.
  158. if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
  159. b.startFlushLocked()
  160. }
  161. // Add the item.
  162. b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
  163. b.curBundle.size += size
  164. // Start a timer to flush the item if one isn't already running.
  165. // startFlushLocked clears the timer and closes the bundle at the same time,
  166. // so we only allocate a new timer for the first item in each bundle.
  167. // (We could try to call Reset on the timer instead, but that would add a lot
  168. // of complexity to the code just to save one small allocation.)
  169. if b.flushTimer == nil {
  170. b.flushTimer = time.AfterFunc(b.DelayThreshold, b.Flush)
  171. }
  172. // If the current bundle equals the count threshold, close it.
  173. if b.curBundle.items.Len() == b.BundleCountThreshold {
  174. b.startFlushLocked()
  175. }
  176. // If the current bundle equals or exceeds the byte threshold, close it.
  177. if b.curBundle.size >= b.BundleByteThreshold {
  178. b.startFlushLocked()
  179. }
  180. }
  181. // AddWait adds item to the current bundle. It marks the bundle for handling and
  182. // starts a new one if any of the thresholds or limits are exceeded.
  183. //
  184. // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
  185. // the item can never be handled. AddWait returns ErrOversizedItem in this case.
  186. //
  187. // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
  188. // AddWait blocks until space is available or ctx is done.
  189. //
  190. // Calls to Add and AddWait should not be mixed on the same Bundler.
  191. func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
  192. // If this item exceeds the maximum size of a bundle,
  193. // we can never send it.
  194. if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
  195. return ErrOversizedItem
  196. }
  197. // If adding this item would exceed our allotted memory footprint, block
  198. // until space is available. The semaphore is FIFO, so there will be no
  199. // starvation.
  200. b.initSemaphores()
  201. if err := b.sem.Acquire(ctx, int64(size)); err != nil {
  202. return err
  203. }
  204. // Here, we've reserved space for item. Other goroutines can call AddWait
  205. // and even acquire space, but no one can take away our reservation
  206. // (assuming sem.Release is used correctly). So there is no race condition
  207. // resulting from locking the mutex after sem.Acquire returns.
  208. b.add(item, size)
  209. return nil
  210. }
  211. // Flush invokes the handler for all remaining items in the Bundler and waits
  212. // for it to return.
  213. func (b *Bundler) Flush() {
  214. b.mu.Lock()
  215. b.startFlushLocked()
  216. // Here, all bundles with tickets < b.nextTicket are
  217. // either finished or active. Those are the ones
  218. // we want to wait for.
  219. t := b.nextTicket
  220. b.mu.Unlock()
  221. b.initSemaphores()
  222. b.waitUntilAllHandled(t)
  223. }
  224. func (b *Bundler) startFlushLocked() {
  225. if b.flushTimer != nil {
  226. b.flushTimer.Stop()
  227. b.flushTimer = nil
  228. }
  229. if b.curBundle.items.Len() == 0 {
  230. return
  231. }
  232. // Here, both semaphores must have been initialized.
  233. bun := b.curBundle
  234. b.curBundle = bundle{items: b.itemSliceZero}
  235. ticket := b.nextTicket
  236. b.nextTicket++
  237. go func() {
  238. defer func() {
  239. b.sem.Release(int64(bun.size))
  240. b.release(ticket)
  241. }()
  242. b.acquire(ticket)
  243. b.handler(bun.items.Interface())
  244. }()
  245. }
  246. // acquire blocks until ticket is the next to be served, then returns. In order for N
  247. // acquire calls to return, the tickets must be in the range [0, N). A ticket must
  248. // not be presented to acquire more than once.
  249. func (b *Bundler) acquire(ticket uint64) {
  250. b.mu2.Lock()
  251. defer b.mu2.Unlock()
  252. if ticket < b.nextHandled {
  253. panic("bundler: acquire: arg too small")
  254. }
  255. for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
  256. b.cond.Wait()
  257. }
  258. // Here,
  259. // ticket == b.nextHandled: the caller is the next one to be handled;
  260. // and len(b.active) < b.HandlerLimit: there is space available.
  261. b.active[ticket] = true
  262. b.nextHandled++
  263. // Broadcast, not Signal: although at most one acquire waiter can make progress,
  264. // there might be waiters in waitUntilAllHandled.
  265. b.cond.Broadcast()
  266. }
  267. // If a ticket is used for a call to acquire, it must later be passed to release. A
  268. // ticket must not be presented to release more than once.
  269. func (b *Bundler) release(ticket uint64) {
  270. b.mu2.Lock()
  271. defer b.mu2.Unlock()
  272. if !b.active[ticket] {
  273. panic("bundler: release: not an active ticket")
  274. }
  275. delete(b.active, ticket)
  276. b.cond.Broadcast()
  277. }
  278. // waitUntilAllHandled blocks until all tickets < n have called release, meaning
  279. // all bundles with tickets < n have been handled.
  280. func (b *Bundler) waitUntilAllHandled(n uint64) {
  281. // Proof of correctness of this function.
  282. // "N is acquired" means acquire(N) has returned.
  283. // "N is released" means release(N) has returned.
  284. // 1. If N is acquired, N-1 is acquired.
  285. // Follows from the loop test in acquire, and the fact
  286. // that nextHandled is incremented by 1.
  287. // 2. If nextHandled >= N, then N-1 is acquired.
  288. // Because we only increment nextHandled to N after N-1 is acquired.
  289. // 3. If nextHandled >= N, then all n < N is acquired.
  290. // Follows from #1 and #2.
  291. // 4. If N is acquired and N is not in active, then N is released.
  292. // Because we put N in active before acquire returns, and only
  293. // remove it when it is released.
  294. // Let min(active) be the smallest member of active, or infinity if active is empty.
  295. // 5. If nextHandled >= N and N <= min(active), then all n < N is released.
  296. // From nextHandled >= N and #3, all n < N is acquired.
  297. // N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
  298. // So from #4, all n < N is released.
  299. // The loop test below is the antecedent of #5.
  300. b.mu2.Lock()
  301. defer b.mu2.Unlock()
  302. for !(b.nextHandled >= n && n <= min(b.active)) {
  303. b.cond.Wait()
  304. }
  305. }
  306. // min returns the minimum value of the set s, or the largest uint64 if
  307. // s is empty.
  308. func min(s map[uint64]bool) uint64 {
  309. var m uint64 = math.MaxUint64
  310. for n := range s {
  311. if n < m {
  312. m = n
  313. }
  314. }
  315. return m
  316. }