You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

101 lines
3.0 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "time"
  17. "github.com/golang/protobuf/ptypes"
  18. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  19. )
  20. // Message represents a Pub/Sub message.
  21. type Message struct {
  22. // ID identifies this message.
  23. // This ID is assigned by the server and is populated for Messages obtained from a subscription.
  24. // This field is read-only.
  25. ID string
  26. // Data is the actual data in the message.
  27. Data []byte
  28. // Attributes represents the key-value pairs the current message
  29. // is labelled with.
  30. Attributes map[string]string
  31. // ackID is the identifier to acknowledge this message.
  32. ackID string
  33. // The time at which the message was published.
  34. // This is populated by the server for Messages obtained from a subscription.
  35. // This field is read-only.
  36. PublishTime time.Time
  37. // receiveTime is the time the message was received by the client.
  38. receiveTime time.Time
  39. // size is the approximate size of the message's data and attributes.
  40. size int
  41. calledDone bool
  42. // The done method of the iterator that created this Message.
  43. doneFunc func(string, bool, time.Time)
  44. }
  45. func toMessage(resp *pb.ReceivedMessage) (*Message, error) {
  46. if resp.Message == nil {
  47. return &Message{ackID: resp.AckId}, nil
  48. }
  49. pubTime, err := ptypes.Timestamp(resp.Message.PublishTime)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return &Message{
  54. ackID: resp.AckId,
  55. Data: resp.Message.Data,
  56. Attributes: resp.Message.Attributes,
  57. ID: resp.Message.MessageId,
  58. PublishTime: pubTime,
  59. }, nil
  60. }
  61. // Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
  62. // It should not be called on any other Message value.
  63. // If message acknowledgement fails, the Message will be redelivered.
  64. // Client code must call Ack or Nack when finished for each received Message.
  65. // Calls to Ack or Nack have no effect after the first call.
  66. func (m *Message) Ack() {
  67. m.done(true)
  68. }
  69. // Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
  70. // It should not be called on any other Message value.
  71. // Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
  72. // Client code must call Ack or Nack when finished for each received Message.
  73. // Calls to Ack or Nack have no effect after the first call.
  74. func (m *Message) Nack() {
  75. m.done(false)
  76. }
  77. func (m *Message) done(ack bool) {
  78. if m.calledDone {
  79. return
  80. }
  81. m.calledDone = true
  82. m.doneFunc(m.ackID, ack, m.receiveTime)
  83. }