You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

497 lines
15 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "io"
  18. "sync"
  19. "time"
  20. vkit "cloud.google.com/go/pubsub/apiv1"
  21. "cloud.google.com/go/pubsub/internal/distribution"
  22. gax "github.com/googleapis/gax-go/v2"
  23. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/status"
  26. )
  27. // Between message receipt and ack (that is, the time spent processing a message) we want to extend the message
  28. // deadline by way of modack. However, we don't want to extend the deadline right as soon as the deadline expires;
  29. // instead, we'd want to extend the deadline a little bit of time ahead. gracePeriod is that amount of time ahead
  30. // of the actual deadline.
  31. const gracePeriod = 5 * time.Second
  32. type messageIterator struct {
  33. ctx context.Context
  34. cancel func() // the function that will cancel ctx; called in stop
  35. po *pullOptions
  36. ps *pullStream
  37. subc *vkit.SubscriberClient
  38. subName string
  39. kaTick <-chan time.Time // keep-alive (deadline extensions)
  40. ackTicker *time.Ticker // message acks
  41. nackTicker *time.Ticker // message nacks (more frequent than acks)
  42. pingTicker *time.Ticker // sends to the stream to keep it open
  43. failed chan struct{} // closed on stream error
  44. drained chan struct{} // closed when stopped && no more pending messages
  45. wg sync.WaitGroup
  46. mu sync.Mutex
  47. ackTimeDist *distribution.D // dist uses seconds
  48. // keepAliveDeadlines is a map of id to expiration time. This map is used in conjunction with
  49. // subscription.ReceiveSettings.MaxExtension to record the maximum amount of time (the
  50. // deadline, more specifically) we're willing to extend a message's ack deadline. As each
  51. // message arrives, we'll record now+MaxExtension in this table; whenever we have a chance
  52. // to update ack deadlines (via modack), we'll consult this table and only include IDs
  53. // that are not beyond their deadline.
  54. keepAliveDeadlines map[string]time.Time
  55. pendingAcks map[string]bool
  56. pendingNacks map[string]bool
  57. pendingModAcks map[string]bool // ack IDs whose ack deadline is to be modified
  58. err error // error from stream failure
  59. }
  60. // newMessageIterator starts and returns a new messageIterator.
  61. // subName is the full name of the subscription to pull messages from.
  62. // Stop must be called on the messageIterator when it is no longer needed.
  63. // The iterator always uses the background context for acking messages and extending message deadlines.
  64. func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator {
  65. var ps *pullStream
  66. if !po.synchronous {
  67. ps = newPullStream(context.Background(), subc.StreamingPull, subName)
  68. }
  69. // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
  70. // the first keepAlive halfway towards the minimum ack deadline.
  71. keepAlivePeriod := minAckDeadline / 2
  72. // Ack promptly so users don't lose work if client crashes.
  73. ackTicker := time.NewTicker(100 * time.Millisecond)
  74. nackTicker := time.NewTicker(100 * time.Millisecond)
  75. pingTicker := time.NewTicker(30 * time.Second)
  76. cctx, cancel := context.WithCancel(context.Background())
  77. it := &messageIterator{
  78. ctx: cctx,
  79. cancel: cancel,
  80. ps: ps,
  81. po: po,
  82. subc: subc,
  83. subName: subName,
  84. kaTick: time.After(keepAlivePeriod),
  85. ackTicker: ackTicker,
  86. nackTicker: nackTicker,
  87. pingTicker: pingTicker,
  88. failed: make(chan struct{}),
  89. drained: make(chan struct{}),
  90. ackTimeDist: distribution.New(int(maxAckDeadline/time.Second) + 1),
  91. keepAliveDeadlines: map[string]time.Time{},
  92. pendingAcks: map[string]bool{},
  93. pendingNacks: map[string]bool{},
  94. pendingModAcks: map[string]bool{},
  95. }
  96. it.wg.Add(1)
  97. go it.sender()
  98. return it
  99. }
  100. // Subscription.receive will call stop on its messageIterator when finished with it.
  101. // Stop will block until Done has been called on all Messages that have been
  102. // returned by Next, or until the context with which the messageIterator was created
  103. // is cancelled or exceeds its deadline.
  104. func (it *messageIterator) stop() {
  105. it.cancel()
  106. it.mu.Lock()
  107. it.checkDrained()
  108. it.mu.Unlock()
  109. it.wg.Wait()
  110. }
  111. // checkDrained closes the drained channel if the iterator has been stopped and all
  112. // pending messages have either been n/acked or expired.
  113. //
  114. // Called with the lock held.
  115. func (it *messageIterator) checkDrained() {
  116. select {
  117. case <-it.drained:
  118. return
  119. default:
  120. }
  121. select {
  122. case <-it.ctx.Done():
  123. if len(it.keepAliveDeadlines) == 0 {
  124. close(it.drained)
  125. }
  126. default:
  127. }
  128. }
  129. // Called when a message is acked/nacked.
  130. func (it *messageIterator) done(ackID string, ack bool, receiveTime time.Time) {
  131. it.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
  132. it.mu.Lock()
  133. defer it.mu.Unlock()
  134. delete(it.keepAliveDeadlines, ackID)
  135. if ack {
  136. it.pendingAcks[ackID] = true
  137. } else {
  138. it.pendingNacks[ackID] = true
  139. }
  140. it.checkDrained()
  141. }
  142. // fail is called when a stream method returns a permanent error.
  143. // fail returns it.err. This may be err, or it may be the error
  144. // set by an earlier call to fail.
  145. func (it *messageIterator) fail(err error) error {
  146. it.mu.Lock()
  147. defer it.mu.Unlock()
  148. if it.err == nil {
  149. it.err = err
  150. close(it.failed)
  151. }
  152. return it.err
  153. }
  154. // receive makes a call to the stream's Recv method, or the Pull RPC, and returns
  155. // its messages.
  156. // maxToPull is the maximum number of messages for the Pull RPC.
  157. func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
  158. it.mu.Lock()
  159. ierr := it.err
  160. it.mu.Unlock()
  161. if ierr != nil {
  162. return nil, ierr
  163. }
  164. // Stop retrieving messages if the iterator's Stop method was called.
  165. select {
  166. case <-it.ctx.Done():
  167. it.wg.Wait()
  168. return nil, io.EOF
  169. default:
  170. }
  171. var rmsgs []*pb.ReceivedMessage
  172. var err error
  173. if it.po.synchronous {
  174. rmsgs, err = it.pullMessages(maxToPull)
  175. } else {
  176. rmsgs, err = it.recvMessages()
  177. }
  178. // Any error here is fatal.
  179. if err != nil {
  180. return nil, it.fail(err)
  181. }
  182. msgs, err := convertMessages(rmsgs)
  183. if err != nil {
  184. return nil, it.fail(err)
  185. }
  186. // We received some messages. Remember them so we can keep them alive. Also,
  187. // do a receipt mod-ack when streaming.
  188. maxExt := time.Now().Add(it.po.maxExtension)
  189. ackIDs := map[string]bool{}
  190. it.mu.Lock()
  191. now := time.Now()
  192. for _, m := range msgs {
  193. m.receiveTime = now
  194. addRecv(m.ID, m.ackID, now)
  195. m.doneFunc = it.done
  196. it.keepAliveDeadlines[m.ackID] = maxExt
  197. // Don't change the mod-ack if the message is going to be nacked. This is
  198. // possible if there are retries.
  199. if !it.pendingNacks[m.ackID] {
  200. ackIDs[m.ackID] = true
  201. }
  202. }
  203. deadline := it.ackDeadline()
  204. it.mu.Unlock()
  205. if len(ackIDs) > 0 {
  206. if !it.sendModAck(ackIDs, deadline) {
  207. return nil, it.err
  208. }
  209. }
  210. return msgs, nil
  211. }
  212. // Get messages using the Pull RPC.
  213. // This may block indefinitely. It may also return zero messages, after some time waiting.
  214. func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) {
  215. // Use it.ctx as the RPC context, so that if the iterator is stopped, the call
  216. // will return immediately.
  217. res, err := it.subc.Pull(it.ctx, &pb.PullRequest{
  218. Subscription: it.subName,
  219. MaxMessages: maxToPull,
  220. })
  221. switch {
  222. case err == context.Canceled:
  223. return nil, nil
  224. case err != nil:
  225. return nil, err
  226. default:
  227. return res.ReceivedMessages, nil
  228. }
  229. }
  230. func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
  231. res, err := it.ps.Recv()
  232. if err != nil {
  233. return nil, err
  234. }
  235. return res.ReceivedMessages, nil
  236. }
  237. // sender runs in a goroutine and handles all sends to the stream.
  238. func (it *messageIterator) sender() {
  239. defer it.wg.Done()
  240. defer it.ackTicker.Stop()
  241. defer it.nackTicker.Stop()
  242. defer it.pingTicker.Stop()
  243. defer func() {
  244. if it.ps != nil {
  245. it.ps.CloseSend()
  246. }
  247. }()
  248. done := false
  249. for !done {
  250. sendAcks := false
  251. sendNacks := false
  252. sendModAcks := false
  253. sendPing := false
  254. dl := it.ackDeadline()
  255. select {
  256. case <-it.failed:
  257. // Stream failed: nothing to do, so stop immediately.
  258. return
  259. case <-it.drained:
  260. // All outstanding messages have been marked done:
  261. // nothing left to do except make the final calls.
  262. it.mu.Lock()
  263. sendAcks = (len(it.pendingAcks) > 0)
  264. sendNacks = (len(it.pendingNacks) > 0)
  265. // No point in sending modacks.
  266. done = true
  267. case <-it.kaTick:
  268. it.mu.Lock()
  269. it.handleKeepAlives()
  270. sendModAcks = (len(it.pendingModAcks) > 0)
  271. nextTick := dl - gracePeriod
  272. if nextTick <= 0 {
  273. // If the deadline is <= gracePeriod, let's tick again halfway to
  274. // the deadline.
  275. nextTick = dl / 2
  276. }
  277. it.kaTick = time.After(nextTick)
  278. case <-it.nackTicker.C:
  279. it.mu.Lock()
  280. sendNacks = (len(it.pendingNacks) > 0)
  281. case <-it.ackTicker.C:
  282. it.mu.Lock()
  283. sendAcks = (len(it.pendingAcks) > 0)
  284. case <-it.pingTicker.C:
  285. it.mu.Lock()
  286. // Ping only if we are processing messages via streaming.
  287. sendPing = !it.po.synchronous && (len(it.keepAliveDeadlines) > 0)
  288. }
  289. // Lock is held here.
  290. var acks, nacks, modAcks map[string]bool
  291. if sendAcks {
  292. acks = it.pendingAcks
  293. it.pendingAcks = map[string]bool{}
  294. }
  295. if sendNacks {
  296. nacks = it.pendingNacks
  297. it.pendingNacks = map[string]bool{}
  298. }
  299. if sendModAcks {
  300. modAcks = it.pendingModAcks
  301. it.pendingModAcks = map[string]bool{}
  302. }
  303. it.mu.Unlock()
  304. // Make Ack and ModAck RPCs.
  305. if sendAcks {
  306. if !it.sendAck(acks) {
  307. return
  308. }
  309. }
  310. if sendNacks {
  311. // Nack indicated by modifying the deadline to zero.
  312. if !it.sendModAck(nacks, 0) {
  313. return
  314. }
  315. }
  316. if sendModAcks {
  317. if !it.sendModAck(modAcks, dl) {
  318. return
  319. }
  320. }
  321. if sendPing {
  322. it.pingStream()
  323. }
  324. }
  325. }
  326. // handleKeepAlives modifies the pending request to include deadline extensions
  327. // for live messages. It also purges expired messages.
  328. //
  329. // Called with the lock held.
  330. func (it *messageIterator) handleKeepAlives() {
  331. now := time.Now()
  332. for id, expiry := range it.keepAliveDeadlines {
  333. if expiry.Before(now) {
  334. // This delete will not result in skipping any map items, as implied by
  335. // the spec at https://golang.org/ref/spec#For_statements, "For
  336. // statements with range clause", note 3, and stated explicitly at
  337. // https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ.
  338. delete(it.keepAliveDeadlines, id)
  339. } else {
  340. // This will not conflict with a nack, because nacking removes the ID from keepAliveDeadlines.
  341. it.pendingModAcks[id] = true
  342. }
  343. }
  344. it.checkDrained()
  345. }
  346. func (it *messageIterator) sendAck(m map[string]bool) bool {
  347. return it.sendAckIDRPC(m, func(ids []string) error {
  348. recordStat(it.ctx, AckCount, int64(len(ids)))
  349. addAcks(ids)
  350. // Use context.Background() as the call's context, not it.ctx. We don't
  351. // want to cancel this RPC when the iterator is stopped.
  352. return it.subc.Acknowledge(context.Background(), &pb.AcknowledgeRequest{
  353. Subscription: it.subName,
  354. AckIds: ids,
  355. })
  356. })
  357. }
  358. // The receipt mod-ack amount is derived from a percentile distribution based
  359. // on the time it takes to process messages. The percentile chosen is the 99%th
  360. // percentile in order to capture the highest amount of time necessary without
  361. // considering 1% outliers.
  362. func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration) bool {
  363. return it.sendAckIDRPC(m, func(ids []string) error {
  364. if deadline == 0 {
  365. recordStat(it.ctx, NackCount, int64(len(ids)))
  366. } else {
  367. recordStat(it.ctx, ModAckCount, int64(len(ids)))
  368. }
  369. addModAcks(ids, int32(deadline/time.Second))
  370. // Retry this RPC on Unavailable for a short amount of time, then give up
  371. // without returning a fatal error. The utility of this RPC is by nature
  372. // transient (since the deadline is relative to the current time) and it
  373. // isn't crucial for correctness (since expired messages will just be
  374. // resent).
  375. cctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  376. defer cancel()
  377. bo := gax.Backoff{
  378. Initial: 100 * time.Millisecond,
  379. Max: time.Second,
  380. Multiplier: 2,
  381. }
  382. for {
  383. err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
  384. Subscription: it.subName,
  385. AckDeadlineSeconds: int32(deadline / time.Second),
  386. AckIds: ids,
  387. })
  388. switch status.Code(err) {
  389. case codes.Unavailable:
  390. if err := gax.Sleep(cctx, bo.Pause()); err == nil {
  391. continue
  392. }
  393. // Treat sleep timeout like RPC timeout.
  394. fallthrough
  395. case codes.DeadlineExceeded:
  396. // Timeout. Not a fatal error, but note that it happened.
  397. recordStat(it.ctx, ModAckTimeoutCount, 1)
  398. return nil
  399. default:
  400. // Any other error is fatal.
  401. return err
  402. }
  403. }
  404. })
  405. }
  406. func (it *messageIterator) sendAckIDRPC(ackIDSet map[string]bool, call func([]string) error) bool {
  407. ackIDs := make([]string, 0, len(ackIDSet))
  408. for k := range ackIDSet {
  409. ackIDs = append(ackIDs, k)
  410. }
  411. var toSend []string
  412. for len(ackIDs) > 0 {
  413. toSend, ackIDs = splitRequestIDs(ackIDs, maxPayload)
  414. if err := call(toSend); err != nil {
  415. // The underlying client handles retries, so any error is fatal to the
  416. // iterator.
  417. it.fail(err)
  418. return false
  419. }
  420. }
  421. return true
  422. }
  423. // Send a message to the stream to keep it open. The stream will close if there's no
  424. // traffic on it for a while. By keeping it open, we delay the start of the
  425. // expiration timer on messages that are buffered by gRPC or elsewhere in the
  426. // network. This matters if it takes a long time to process messages relative to the
  427. // default ack deadline, and if the messages are small enough so that many can fit
  428. // into the buffer.
  429. func (it *messageIterator) pingStream() {
  430. // Ignore error; if the stream is broken, this doesn't matter anyway.
  431. _ = it.ps.Send(&pb.StreamingPullRequest{})
  432. }
  433. func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) {
  434. size := reqFixedOverhead
  435. i := 0
  436. for size < maxSize && i < len(ids) {
  437. size += overheadPerID + len(ids[i])
  438. i++
  439. }
  440. if size > maxSize {
  441. i--
  442. }
  443. return ids[:i], ids[i:]
  444. }
  445. // The deadline to ack is derived from a percentile distribution based
  446. // on the time it takes to process messages. The percentile chosen is the 99%th
  447. // percentile - that is, processing times up to the 99%th longest processing
  448. // times should be safe. The highest 1% may expire. This number was chosen
  449. // as a way to cover most users' usecases without losing the value of
  450. // expiration.
  451. func (it *messageIterator) ackDeadline() time.Duration {
  452. pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second
  453. if pt > maxAckDeadline {
  454. return maxAckDeadline
  455. }
  456. if pt < minAckDeadline {
  457. return minAckDeadline
  458. }
  459. return pt
  460. }