You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

107 lines
2.8 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // https://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub_test
  15. import (
  16. "context"
  17. "fmt"
  18. "log"
  19. "time"
  20. pubsub "cloud.google.com/go/pubsub/apiv1"
  21. pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
  22. )
  23. func ExampleSubscriberClient_Pull_lengthyClientProcessing() {
  24. projectID := "some-project"
  25. subscriptionID := "some-subscription"
  26. ctx := context.Background()
  27. client, err := pubsub.NewSubscriberClient(ctx)
  28. if err != nil {
  29. log.Fatal(err)
  30. }
  31. defer client.Close()
  32. sub := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID)
  33. // Be sure to tune the MaxMessages parameter per your project's needs, and accordingly
  34. // adjust the ack behavior below to batch acknowledgements.
  35. req := pubsubpb.PullRequest{
  36. Subscription: sub,
  37. MaxMessages: 1,
  38. }
  39. fmt.Println("Listening..")
  40. for {
  41. res, err := client.Pull(ctx, &req)
  42. if err != nil {
  43. log.Fatal(err)
  44. }
  45. // client.Pull returns an empty list if there are no messages available in the
  46. // backlog. We should skip processing steps when that happens.
  47. if len(res.ReceivedMessages) == 0 {
  48. continue
  49. }
  50. var recvdAckIDs []string
  51. for _, m := range res.ReceivedMessages {
  52. recvdAckIDs = append(recvdAckIDs, m.AckId)
  53. }
  54. var done = make(chan struct{})
  55. var delay = 0 * time.Second // Tick immediately upon reception
  56. var ackDeadline = 10 * time.Second
  57. // Continuously notify the server that processing is still happening on this batch.
  58. go func() {
  59. for {
  60. select {
  61. case <-ctx.Done():
  62. return
  63. case <-done:
  64. return
  65. case <-time.After(delay):
  66. err := client.ModifyAckDeadline(ctx, &pubsubpb.ModifyAckDeadlineRequest{
  67. Subscription: sub,
  68. AckIds: recvdAckIDs,
  69. AckDeadlineSeconds: int32(ackDeadline.Seconds()),
  70. })
  71. if err != nil {
  72. log.Fatal(err)
  73. }
  74. delay = ackDeadline - 5*time.Second // 5 seconds grace period.
  75. }
  76. }
  77. }()
  78. for _, m := range res.ReceivedMessages {
  79. // Process the message here, possibly in a goroutine.
  80. log.Printf("Got message: %s", string(m.Message.Data))
  81. err := client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest{
  82. Subscription: sub,
  83. AckIds: []string{m.AckId},
  84. })
  85. if err != nil {
  86. log.Fatal(err)
  87. }
  88. }
  89. close(done)
  90. }
  91. }