You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

161 lines
5.1 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "github.com/golang/protobuf/ptypes"
  21. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  22. )
  23. // Snapshot is a reference to a PubSub snapshot.
  24. type Snapshot struct {
  25. c *Client
  26. // The fully qualified identifier for the snapshot, in the format "projects/<projid>/snapshots/<snap>"
  27. name string
  28. }
  29. // ID returns the unique identifier of the snapshot within its project.
  30. func (s *Snapshot) ID() string {
  31. slash := strings.LastIndex(s.name, "/")
  32. if slash == -1 {
  33. // name is not a fully-qualified name.
  34. panic("bad snapshot name")
  35. }
  36. return s.name[slash+1:]
  37. }
  38. // SnapshotConfig contains the details of a Snapshot.
  39. type SnapshotConfig struct {
  40. *Snapshot
  41. Topic *Topic
  42. Expiration time.Time
  43. }
  44. // Snapshot creates a reference to a snapshot.
  45. func (c *Client) Snapshot(id string) *Snapshot {
  46. return &Snapshot{
  47. c: c,
  48. name: fmt.Sprintf("projects/%s/snapshots/%s", c.projectID, id),
  49. }
  50. }
  51. // Snapshots returns an iterator which returns snapshots for this project.
  52. func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator {
  53. it := c.subc.ListSnapshots(ctx, &pb.ListSnapshotsRequest{
  54. Project: c.fullyQualifiedProjectName(),
  55. })
  56. next := func() (*SnapshotConfig, error) {
  57. snap, err := it.Next()
  58. if err != nil {
  59. return nil, err
  60. }
  61. return toSnapshotConfig(snap, c)
  62. }
  63. return &SnapshotConfigIterator{next: next}
  64. }
  65. // SnapshotConfigIterator is an iterator that returns a series of snapshots.
  66. type SnapshotConfigIterator struct {
  67. next func() (*SnapshotConfig, error)
  68. }
  69. // Next returns the next SnapshotConfig. Its second return value is iterator.Done if there are no more results.
  70. // Once Next returns iterator.Done, all subsequent calls will return iterator.Done.
  71. func (snaps *SnapshotConfigIterator) Next() (*SnapshotConfig, error) {
  72. return snaps.next()
  73. }
  74. // Delete deletes a snapshot.
  75. func (s *Snapshot) Delete(ctx context.Context) error {
  76. return s.c.subc.DeleteSnapshot(ctx, &pb.DeleteSnapshotRequest{Snapshot: s.name})
  77. }
  78. // SeekToTime seeks the subscription to a point in time.
  79. //
  80. // Messages retained in the subscription that were published before this
  81. // time are marked as acknowledged, and messages retained in the
  82. // subscription that were published after this time are marked as
  83. // unacknowledged. Note that this operation affects only those messages
  84. // retained in the subscription (configured by SnapshotConfig). For example,
  85. // if `time` corresponds to a point before the message retention
  86. // window (or to a point before the system's notion of the subscription
  87. // creation time), only retained messages will be marked as unacknowledged,
  88. // and already-expunged messages will not be restored.
  89. func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error {
  90. ts, err := ptypes.TimestampProto(t)
  91. if err != nil {
  92. return err
  93. }
  94. _, err = s.c.subc.Seek(ctx, &pb.SeekRequest{
  95. Subscription: s.name,
  96. Target: &pb.SeekRequest_Time{ts},
  97. })
  98. return err
  99. }
  100. // CreateSnapshot creates a new snapshot from this subscription.
  101. // The snapshot will be for the topic this subscription is subscribed to.
  102. // If the name is empty string, a unique name is assigned.
  103. //
  104. // The created snapshot is guaranteed to retain:
  105. // (a) The existing backlog on the subscription. More precisely, this is
  106. // defined as the messages in the subscription's backlog that are
  107. // unacknowledged when Snapshot returns without error.
  108. // (b) Any messages published to the subscription's topic following
  109. // Snapshot returning without error.
  110. func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error) {
  111. if name != "" {
  112. name = fmt.Sprintf("projects/%s/snapshots/%s", strings.Split(s.name, "/")[1], name)
  113. }
  114. snap, err := s.c.subc.CreateSnapshot(ctx, &pb.CreateSnapshotRequest{
  115. Name: name,
  116. Subscription: s.name,
  117. })
  118. if err != nil {
  119. return nil, err
  120. }
  121. return toSnapshotConfig(snap, s.c)
  122. }
  123. // SeekToSnapshot seeks the subscription to a snapshot.
  124. //
  125. // The snapshot need not be created from this subscription,
  126. // but it must be for the topic this subscription is subscribed to.
  127. func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error {
  128. _, err := s.c.subc.Seek(ctx, &pb.SeekRequest{
  129. Subscription: s.name,
  130. Target: &pb.SeekRequest_Snapshot{snap.name},
  131. })
  132. return err
  133. }
  134. func toSnapshotConfig(snap *pb.Snapshot, c *Client) (*SnapshotConfig, error) {
  135. exp, err := ptypes.Timestamp(snap.ExpireTime)
  136. if err != nil {
  137. return nil, err
  138. }
  139. return &SnapshotConfig{
  140. Snapshot: &Snapshot{c: c, name: snap.Name},
  141. Topic: newTopic(c, snap.Topic),
  142. Expiration: exp,
  143. }, nil
  144. }