You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

128 regels
3.2 KiB

  1. // Copyright 2017 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package semaphore provides a weighted semaphore implementation.
  5. package semaphore // import "golang.org/x/sync/semaphore"
  6. import (
  7. "container/list"
  8. "context"
  9. "sync"
  10. )
  11. type waiter struct {
  12. n int64
  13. ready chan<- struct{} // Closed when semaphore acquired.
  14. }
  15. // NewWeighted creates a new weighted semaphore with the given
  16. // maximum combined weight for concurrent access.
  17. func NewWeighted(n int64) *Weighted {
  18. w := &Weighted{size: n}
  19. return w
  20. }
  21. // Weighted provides a way to bound concurrent access to a resource.
  22. // The callers can request access with a given weight.
  23. type Weighted struct {
  24. size int64
  25. cur int64
  26. mu sync.Mutex
  27. waiters list.List
  28. }
  29. // Acquire acquires the semaphore with a weight of n, blocking until resources
  30. // are available or ctx is done. On success, returns nil. On failure, returns
  31. // ctx.Err() and leaves the semaphore unchanged.
  32. //
  33. // If ctx is already done, Acquire may still succeed without blocking.
  34. func (s *Weighted) Acquire(ctx context.Context, n int64) error {
  35. s.mu.Lock()
  36. if s.size-s.cur >= n && s.waiters.Len() == 0 {
  37. s.cur += n
  38. s.mu.Unlock()
  39. return nil
  40. }
  41. if n > s.size {
  42. // Don't make other Acquire calls block on one that's doomed to fail.
  43. s.mu.Unlock()
  44. <-ctx.Done()
  45. return ctx.Err()
  46. }
  47. ready := make(chan struct{})
  48. w := waiter{n: n, ready: ready}
  49. elem := s.waiters.PushBack(w)
  50. s.mu.Unlock()
  51. select {
  52. case <-ctx.Done():
  53. err := ctx.Err()
  54. s.mu.Lock()
  55. select {
  56. case <-ready:
  57. // Acquired the semaphore after we were canceled. Rather than trying to
  58. // fix up the queue, just pretend we didn't notice the cancelation.
  59. err = nil
  60. default:
  61. s.waiters.Remove(elem)
  62. }
  63. s.mu.Unlock()
  64. return err
  65. case <-ready:
  66. return nil
  67. }
  68. }
  69. // TryAcquire acquires the semaphore with a weight of n without blocking.
  70. // On success, returns true. On failure, returns false and leaves the semaphore unchanged.
  71. func (s *Weighted) TryAcquire(n int64) bool {
  72. s.mu.Lock()
  73. success := s.size-s.cur >= n && s.waiters.Len() == 0
  74. if success {
  75. s.cur += n
  76. }
  77. s.mu.Unlock()
  78. return success
  79. }
  80. // Release releases the semaphore with a weight of n.
  81. func (s *Weighted) Release(n int64) {
  82. s.mu.Lock()
  83. s.cur -= n
  84. if s.cur < 0 {
  85. s.mu.Unlock()
  86. panic("semaphore: bad release")
  87. }
  88. for {
  89. next := s.waiters.Front()
  90. if next == nil {
  91. break // No more waiters blocked.
  92. }
  93. w := next.Value.(waiter)
  94. if s.size-s.cur < w.n {
  95. // Not enough tokens for the next waiter. We could keep going (to try to
  96. // find a waiter with a smaller request), but under load that could cause
  97. // starvation for large requests; instead, we leave all remaining waiters
  98. // blocked.
  99. //
  100. // Consider a semaphore used as a read-write lock, with N tokens, N
  101. // readers, and one writer. Each reader can Acquire(1) to obtain a read
  102. // lock. The writer can Acquire(N) to obtain a write lock, excluding all
  103. // of the readers. If we allow the readers to jump ahead in the queue,
  104. // the writer will starve — there is always one token available for every
  105. // reader.
  106. break
  107. }
  108. s.cur += w.n
  109. s.waiters.Remove(next)
  110. close(w.ready)
  111. }
  112. s.mu.Unlock()
  113. }