You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

380 lines
11 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "sync"
  17. "time"
  18. vkit "cloud.google.com/go/pubsub/apiv1"
  19. "cloud.google.com/go/pubsub/internal/distribution"
  20. "golang.org/x/net/context"
  21. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  22. )
  23. // newMessageIterator starts a new streamingMessageIterator. Stop must be called on the messageIterator
  24. // when it is no longer needed.
  25. // subName is the full name of the subscription to pull messages from.
  26. // ctx is the context to use for acking messages and extending message deadlines.
  27. func newMessageIterator(ctx context.Context, subc *vkit.SubscriberClient, subName string, po *pullOptions) *streamingMessageIterator {
  28. ps := newPullStream(ctx, subc.StreamingPull, subName, int32(po.ackDeadline.Seconds()))
  29. return newStreamingMessageIterator(ctx, ps, po, subc, subName)
  30. }
  31. type streamingMessageIterator struct {
  32. ctx context.Context
  33. po *pullOptions
  34. ps *pullStream
  35. subc *vkit.SubscriberClient
  36. subName string
  37. kaTicker *time.Ticker // keep-alive (deadline extensions)
  38. ackTicker *time.Ticker // message acks
  39. nackTicker *time.Ticker // message nacks (more frequent than acks)
  40. pingTicker *time.Ticker // sends to the stream to keep it open
  41. failed chan struct{} // closed on stream error
  42. stopped chan struct{} // closed when Stop is called
  43. drained chan struct{} // closed when stopped && no more pending messages
  44. wg sync.WaitGroup
  45. mu sync.Mutex
  46. ackTimeDist *distribution.D
  47. keepAliveDeadlines map[string]time.Time
  48. pendingAcks map[string]bool
  49. pendingNacks map[string]bool
  50. pendingModAcks map[string]bool // ack IDs whose ack deadline is to be modified
  51. err error // error from stream failure
  52. }
  53. func newStreamingMessageIterator(ctx context.Context, ps *pullStream, po *pullOptions, subc *vkit.SubscriberClient, subName string) *streamingMessageIterator {
  54. // TODO: make kaTicker frequency more configurable. (ackDeadline - 5s) is a
  55. // reasonable default for now, because the minimum ack period is 10s. This
  56. // gives us 5s grace.
  57. keepAlivePeriod := po.ackDeadline - 5*time.Second
  58. kaTicker := time.NewTicker(keepAlivePeriod)
  59. // Ack promptly so users don't lose work if client crashes.
  60. ackTicker := time.NewTicker(100 * time.Millisecond)
  61. nackTicker := time.NewTicker(100 * time.Millisecond)
  62. pingTicker := time.NewTicker(30 * time.Second)
  63. it := &streamingMessageIterator{
  64. ctx: ctx,
  65. ps: ps,
  66. po: po,
  67. subc: subc,
  68. subName: subName,
  69. kaTicker: kaTicker,
  70. ackTicker: ackTicker,
  71. nackTicker: nackTicker,
  72. pingTicker: pingTicker,
  73. failed: make(chan struct{}),
  74. stopped: make(chan struct{}),
  75. drained: make(chan struct{}),
  76. ackTimeDist: distribution.New(int(maxAckDeadline/time.Second) + 1),
  77. keepAliveDeadlines: map[string]time.Time{},
  78. pendingAcks: map[string]bool{},
  79. pendingNacks: map[string]bool{},
  80. pendingModAcks: map[string]bool{},
  81. }
  82. it.wg.Add(1)
  83. go it.sender()
  84. return it
  85. }
  86. // Subscription.receive will call stop on its messageIterator when finished with it.
  87. // Stop will block until Done has been called on all Messages that have been
  88. // returned by Next, or until the context with which the messageIterator was created
  89. // is cancelled or exceeds its deadline.
  90. func (it *streamingMessageIterator) stop() {
  91. it.mu.Lock()
  92. select {
  93. case <-it.stopped:
  94. default:
  95. close(it.stopped)
  96. }
  97. it.checkDrained()
  98. it.mu.Unlock()
  99. it.wg.Wait()
  100. }
  101. // checkDrained closes the drained channel if the iterator has been stopped and all
  102. // pending messages have either been n/acked or expired.
  103. //
  104. // Called with the lock held.
  105. func (it *streamingMessageIterator) checkDrained() {
  106. select {
  107. case <-it.drained:
  108. return
  109. default:
  110. }
  111. select {
  112. case <-it.stopped:
  113. if len(it.keepAliveDeadlines) == 0 {
  114. close(it.drained)
  115. }
  116. default:
  117. }
  118. }
  119. // Called when a message is acked/nacked.
  120. func (it *streamingMessageIterator) done(ackID string, ack bool, receiveTime time.Time) {
  121. it.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
  122. it.mu.Lock()
  123. defer it.mu.Unlock()
  124. delete(it.keepAliveDeadlines, ackID)
  125. if ack {
  126. it.pendingAcks[ackID] = true
  127. } else {
  128. it.pendingNacks[ackID] = true
  129. }
  130. it.checkDrained()
  131. }
  132. // fail is called when a stream method returns a permanent error.
  133. func (it *streamingMessageIterator) fail(err error) {
  134. it.mu.Lock()
  135. if it.err == nil {
  136. it.err = err
  137. close(it.failed)
  138. }
  139. it.mu.Unlock()
  140. }
  141. // receive makes a call to the stream's Recv method and returns
  142. // its messages.
  143. func (it *streamingMessageIterator) receive() ([]*Message, error) {
  144. // Stop retrieving messages if the context is done, the stream
  145. // failed, or the iterator's Stop method was called.
  146. select {
  147. case <-it.ctx.Done():
  148. return nil, it.ctx.Err()
  149. default:
  150. }
  151. it.mu.Lock()
  152. err := it.err
  153. it.mu.Unlock()
  154. if err != nil {
  155. return nil, err
  156. }
  157. // Receive messages from stream. This may block indefinitely.
  158. res, err := it.ps.Recv()
  159. // The pullStream handles retries, so any error here is fatal.
  160. if err != nil {
  161. it.fail(err)
  162. return nil, err
  163. }
  164. msgs, err := convertMessages(res.ReceivedMessages)
  165. if err != nil {
  166. it.fail(err)
  167. return nil, err
  168. }
  169. // We received some messages. Remember them so we can keep them alive. Also,
  170. // do a receipt mod-ack.
  171. maxExt := time.Now().Add(it.po.maxExtension)
  172. ackIDs := map[string]bool{}
  173. it.mu.Lock()
  174. now := time.Now()
  175. for _, m := range msgs {
  176. m.receiveTime = now
  177. addRecv(m.ID, m.ackID, now)
  178. m.doneFunc = it.done
  179. it.keepAliveDeadlines[m.ackID] = maxExt
  180. // The receipt mod-ack uses the subscription's configured ack deadline. Don't
  181. // change the mod-ack if the message is going to be nacked. This is possible
  182. // if there are retries.
  183. if !it.pendingNacks[m.ackID] {
  184. ackIDs[m.ackID] = true
  185. }
  186. }
  187. it.mu.Unlock()
  188. if !it.sendModAck(ackIDs, trunc32(int64(it.po.ackDeadline.Seconds()))) {
  189. return nil, it.err
  190. }
  191. return msgs, nil
  192. }
  193. // sender runs in a goroutine and handles all sends to the stream.
  194. func (it *streamingMessageIterator) sender() {
  195. defer it.wg.Done()
  196. defer it.kaTicker.Stop()
  197. defer it.ackTicker.Stop()
  198. defer it.nackTicker.Stop()
  199. defer it.pingTicker.Stop()
  200. defer it.ps.CloseSend()
  201. done := false
  202. for !done {
  203. sendAcks := false
  204. sendNacks := false
  205. sendModAcks := false
  206. sendPing := false
  207. select {
  208. case <-it.ctx.Done():
  209. // Context canceled or timed out: stop immediately, without
  210. // another RPC.
  211. return
  212. case <-it.failed:
  213. // Stream failed: nothing to do, so stop immediately.
  214. return
  215. case <-it.drained:
  216. // All outstanding messages have been marked done:
  217. // nothing left to do except make the final calls.
  218. it.mu.Lock()
  219. sendAcks = (len(it.pendingAcks) > 0)
  220. sendNacks = (len(it.pendingNacks) > 0)
  221. // No point in sending modacks.
  222. done = true
  223. case <-it.kaTicker.C:
  224. it.mu.Lock()
  225. it.handleKeepAlives()
  226. sendModAcks = (len(it.pendingModAcks) > 0)
  227. case <-it.nackTicker.C:
  228. it.mu.Lock()
  229. sendNacks = (len(it.pendingNacks) > 0)
  230. case <-it.ackTicker.C:
  231. it.mu.Lock()
  232. sendAcks = (len(it.pendingAcks) > 0)
  233. case <-it.pingTicker.C:
  234. it.mu.Lock()
  235. // Ping only if we are processing messages.
  236. sendPing = (len(it.keepAliveDeadlines) > 0)
  237. }
  238. // Lock is held here.
  239. var acks, nacks, modAcks map[string]bool
  240. if sendAcks {
  241. acks = it.pendingAcks
  242. it.pendingAcks = map[string]bool{}
  243. }
  244. if sendNacks {
  245. nacks = it.pendingNacks
  246. it.pendingNacks = map[string]bool{}
  247. }
  248. if sendModAcks {
  249. modAcks = it.pendingModAcks
  250. it.pendingModAcks = map[string]bool{}
  251. }
  252. it.mu.Unlock()
  253. // Make Ack and ModAck RPCs.
  254. if sendAcks {
  255. if !it.sendAck(acks) {
  256. return
  257. }
  258. }
  259. if sendNacks {
  260. // Nack indicated by modifying the deadline to zero.
  261. if !it.sendModAck(nacks, 0) {
  262. return
  263. }
  264. }
  265. if sendModAcks {
  266. if !it.sendModAck(modAcks, trunc32(int64(it.po.ackDeadline.Seconds()))) {
  267. return
  268. }
  269. }
  270. if sendPing {
  271. it.pingStream()
  272. }
  273. }
  274. }
  275. // handleKeepAlives modifies the pending request to include deadline extensions
  276. // for live messages. It also purges expired messages.
  277. //
  278. // Called with the lock held.
  279. func (it *streamingMessageIterator) handleKeepAlives() {
  280. now := time.Now()
  281. for id, expiry := range it.keepAliveDeadlines {
  282. if expiry.Before(now) {
  283. // This delete will not result in skipping any map items, as implied by
  284. // the spec at https://golang.org/ref/spec#For_statements, "For
  285. // statements with range clause", note 3, and stated explicitly at
  286. // https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ.
  287. delete(it.keepAliveDeadlines, id)
  288. } else {
  289. // This will not conflict with a nack, because nacking removes the ID from keepAliveDeadlines.
  290. it.pendingModAcks[id] = true
  291. }
  292. }
  293. it.checkDrained()
  294. }
  295. func (it *streamingMessageIterator) sendAck(m map[string]bool) bool {
  296. return it.sendAckIDRPC(m, func(ids []string) error {
  297. addAcks(ids)
  298. return it.subc.Acknowledge(it.ctx, &pb.AcknowledgeRequest{
  299. Subscription: it.subName,
  300. AckIds: ids,
  301. })
  302. })
  303. }
  304. func (it *streamingMessageIterator) sendModAck(m map[string]bool, deadlineSecs int32) bool {
  305. return it.sendAckIDRPC(m, func(ids []string) error {
  306. addModAcks(ids, deadlineSecs)
  307. return it.subc.ModifyAckDeadline(it.ctx, &pb.ModifyAckDeadlineRequest{
  308. Subscription: it.subName,
  309. AckDeadlineSeconds: deadlineSecs,
  310. AckIds: ids,
  311. })
  312. })
  313. }
  314. func (it *streamingMessageIterator) sendAckIDRPC(ackIDSet map[string]bool, call func([]string) error) bool {
  315. ackIDs := make([]string, 0, len(ackIDSet))
  316. for k := range ackIDSet {
  317. ackIDs = append(ackIDs, k)
  318. }
  319. var toSend []string
  320. for len(ackIDs) > 0 {
  321. toSend, ackIDs = splitRequestIDs(ackIDs, maxPayload)
  322. if err := call(toSend); err != nil {
  323. // The underlying client handles retries, so any error is fatal to the
  324. // iterator.
  325. it.fail(err)
  326. return false
  327. }
  328. }
  329. return true
  330. }
  331. // Send a message to the stream to keep it open. The stream will close if there's no
  332. // traffic on it for a while. By keeping it open, we delay the start of the
  333. // expiration timer on messages that are buffered by gRPC or elsewhere in the
  334. // network. This matters if it takes a long time to process messages relative to the
  335. // default ack deadline, and if the messages are small enough so that many can fit
  336. // into the buffer.
  337. func (it *streamingMessageIterator) pingStream() {
  338. // Ignore error; if the stream is broken, this doesn't matter anyway.
  339. _ = it.ps.Send(&pb.StreamingPullRequest{})
  340. }
  341. func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) {
  342. size := reqFixedOverhead
  343. i := 0
  344. for size < maxSize && i < len(ids) {
  345. size += overheadPerID + len(ids[i])
  346. i++
  347. }
  348. if size > maxSize {
  349. i--
  350. }
  351. return ids[:i], ids[i:]
  352. }