You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

193 lines
5.6 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "io"
  18. "sync"
  19. "time"
  20. gax "github.com/googleapis/gax-go/v2"
  21. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  22. "google.golang.org/grpc"
  23. )
  24. // A pullStream supports the methods of a StreamingPullClient, but re-opens
  25. // the stream on a retryable error.
  26. type pullStream struct {
  27. ctx context.Context
  28. open func() (pb.Subscriber_StreamingPullClient, error)
  29. mu sync.Mutex
  30. spc *pb.Subscriber_StreamingPullClient
  31. err error // permanent error
  32. }
  33. // for testing
  34. type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)
  35. func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string) *pullStream {
  36. ctx = withSubscriptionKey(ctx, subName)
  37. return &pullStream{
  38. ctx: ctx,
  39. open: func() (pb.Subscriber_StreamingPullClient, error) {
  40. spc, err := streamingPull(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
  41. if err == nil {
  42. recordStat(ctx, StreamRequestCount, 1)
  43. err = spc.Send(&pb.StreamingPullRequest{
  44. Subscription: subName,
  45. // We modack messages when we receive them, so this value doesn't matter too much.
  46. StreamAckDeadlineSeconds: 60,
  47. })
  48. }
  49. if err != nil {
  50. return nil, err
  51. }
  52. return spc, nil
  53. },
  54. }
  55. }
  56. // get returns either a valid *StreamingPullClient (SPC), or a permanent error.
  57. // If the argument is nil, this is the first call for an RPC, and the current
  58. // SPC will be returned (or a new one will be opened). Otherwise, this call is a
  59. // request to re-open the stream because of a retryable error, and the argument
  60. // is a pointer to the SPC that returned the error.
  61. func (s *pullStream) get(spc *pb.Subscriber_StreamingPullClient) (*pb.Subscriber_StreamingPullClient, error) {
  62. s.mu.Lock()
  63. defer s.mu.Unlock()
  64. // A stored error is permanent.
  65. if s.err != nil {
  66. return nil, s.err
  67. }
  68. // If the context is done, so are we.
  69. s.err = s.ctx.Err()
  70. if s.err != nil {
  71. return nil, s.err
  72. }
  73. // If the current and argument SPCs differ, return the current one. This subsumes two cases:
  74. // 1. We have an SPC and the caller is getting the stream for the first time.
  75. // 2. The caller wants to retry, but they have an older SPC; we've already retried.
  76. if spc != s.spc {
  77. return s.spc, nil
  78. }
  79. // Either this is the very first call on this stream (s.spc == nil), or we have a valid
  80. // retry request. Either way, open a new stream.
  81. // The lock is held here for a long time, but it doesn't matter because no callers could get
  82. // anything done anyway.
  83. s.spc = new(pb.Subscriber_StreamingPullClient)
  84. *s.spc, s.err = s.openWithRetry() // Any error from openWithRetry is permanent.
  85. return s.spc, s.err
  86. }
  87. func (s *pullStream) openWithRetry() (pb.Subscriber_StreamingPullClient, error) {
  88. r := defaultRetryer{}
  89. for {
  90. recordStat(s.ctx, StreamOpenCount, 1)
  91. spc, err := s.open()
  92. bo, shouldRetry := r.Retry(err)
  93. if err != nil && shouldRetry {
  94. recordStat(s.ctx, StreamRetryCount, 1)
  95. if err := gax.Sleep(s.ctx, bo); err != nil {
  96. return nil, err
  97. }
  98. continue
  99. }
  100. return spc, err
  101. }
  102. }
  103. func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error, opts ...gax.CallOption) error {
  104. var settings gax.CallSettings
  105. for _, opt := range opts {
  106. opt.Resolve(&settings)
  107. }
  108. var r gax.Retryer = &defaultRetryer{}
  109. if settings.Retry != nil {
  110. r = settings.Retry()
  111. }
  112. var (
  113. spc *pb.Subscriber_StreamingPullClient
  114. err error
  115. )
  116. for {
  117. spc, err = s.get(spc)
  118. if err != nil {
  119. return err
  120. }
  121. start := time.Now()
  122. err = f(*spc)
  123. if err != nil {
  124. bo, shouldRetry := r.Retry(err)
  125. if shouldRetry {
  126. recordStat(s.ctx, StreamRetryCount, 1)
  127. if time.Since(start) < 30*time.Second { // don't sleep if we've been blocked for a while
  128. if err := gax.Sleep(s.ctx, bo); err != nil {
  129. return err
  130. }
  131. }
  132. continue
  133. }
  134. s.mu.Lock()
  135. s.err = err
  136. s.mu.Unlock()
  137. }
  138. return err
  139. }
  140. }
  141. func (s *pullStream) Send(req *pb.StreamingPullRequest) error {
  142. return s.call(func(spc pb.Subscriber_StreamingPullClient) error {
  143. recordStat(s.ctx, AckCount, int64(len(req.AckIds)))
  144. zeroes := 0
  145. for _, mds := range req.ModifyDeadlineSeconds {
  146. if mds == 0 {
  147. zeroes++
  148. }
  149. }
  150. recordStat(s.ctx, NackCount, int64(zeroes))
  151. recordStat(s.ctx, ModAckCount, int64(len(req.ModifyDeadlineSeconds)-zeroes))
  152. recordStat(s.ctx, StreamRequestCount, 1)
  153. return spc.Send(req)
  154. })
  155. }
  156. func (s *pullStream) Recv() (*pb.StreamingPullResponse, error) {
  157. var res *pb.StreamingPullResponse
  158. err := s.call(func(spc pb.Subscriber_StreamingPullClient) error {
  159. var err error
  160. recordStat(s.ctx, StreamResponseCount, 1)
  161. res, err = spc.Recv()
  162. if err == nil {
  163. recordStat(s.ctx, PullCount, int64(len(res.ReceivedMessages)))
  164. }
  165. return err
  166. }, gax.WithRetry(func() gax.Retryer { return &streamingPullRetryer{defaultRetryer: &defaultRetryer{}} }))
  167. return res, err
  168. }
  169. func (s *pullStream) CloseSend() error {
  170. err := s.call(func(spc pb.Subscriber_StreamingPullClient) error {
  171. return spc.CloseSend()
  172. })
  173. s.mu.Lock()
  174. s.err = io.EOF // should not be retried
  175. s.mu.Unlock()
  176. return err
  177. }