You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

523 line
18 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "errors"
  17. "fmt"
  18. "io"
  19. "strings"
  20. "sync"
  21. "time"
  22. "cloud.google.com/go/iam"
  23. "cloud.google.com/go/internal/optional"
  24. "github.com/golang/protobuf/ptypes"
  25. durpb "github.com/golang/protobuf/ptypes/duration"
  26. "golang.org/x/net/context"
  27. "golang.org/x/sync/errgroup"
  28. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  29. fmpb "google.golang.org/genproto/protobuf/field_mask"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. )
  33. // Subscription is a reference to a PubSub subscription.
  34. type Subscription struct {
  35. c *Client
  36. // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
  37. name string
  38. // Settings for pulling messages. Configure these before calling Receive.
  39. ReceiveSettings ReceiveSettings
  40. mu sync.Mutex
  41. receiveActive bool
  42. }
  43. // Subscription creates a reference to a subscription.
  44. func (c *Client) Subscription(id string) *Subscription {
  45. return c.SubscriptionInProject(id, c.projectID)
  46. }
  47. // SubscriptionInProject creates a reference to a subscription in a given project.
  48. func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
  49. return &Subscription{
  50. c: c,
  51. name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
  52. }
  53. }
  54. // String returns the globally unique printable name of the subscription.
  55. func (s *Subscription) String() string {
  56. return s.name
  57. }
  58. // ID returns the unique identifier of the subscription within its project.
  59. func (s *Subscription) ID() string {
  60. slash := strings.LastIndex(s.name, "/")
  61. if slash == -1 {
  62. // name is not a fully-qualified name.
  63. panic("bad subscription name")
  64. }
  65. return s.name[slash+1:]
  66. }
  67. // Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
  68. func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
  69. it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
  70. Project: c.fullyQualifiedProjectName(),
  71. })
  72. return &SubscriptionIterator{
  73. c: c,
  74. next: func() (string, error) {
  75. sub, err := it.Next()
  76. if err != nil {
  77. return "", err
  78. }
  79. return sub.Name, nil
  80. },
  81. }
  82. }
  83. // SubscriptionIterator is an iterator that returns a series of subscriptions.
  84. type SubscriptionIterator struct {
  85. c *Client
  86. next func() (string, error)
  87. }
  88. // Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
  89. func (subs *SubscriptionIterator) Next() (*Subscription, error) {
  90. subName, err := subs.next()
  91. if err != nil {
  92. return nil, err
  93. }
  94. return &Subscription{c: subs.c, name: subName}, nil
  95. }
  96. // PushConfig contains configuration for subscriptions that operate in push mode.
  97. type PushConfig struct {
  98. // A URL locating the endpoint to which messages should be pushed.
  99. Endpoint string
  100. // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
  101. Attributes map[string]string
  102. }
  103. func (pc *PushConfig) toProto() *pb.PushConfig {
  104. return &pb.PushConfig{
  105. Attributes: pc.Attributes,
  106. PushEndpoint: pc.Endpoint,
  107. }
  108. }
  109. // Subscription config contains the configuration of a subscription.
  110. type SubscriptionConfig struct {
  111. Topic *Topic
  112. PushConfig PushConfig
  113. // The default maximum time after a subscriber receives a message before
  114. // the subscriber should acknowledge the message. Note: messages which are
  115. // obtained via Subscription.Receive need not be acknowledged within this
  116. // deadline, as the deadline will be automatically extended.
  117. AckDeadline time.Duration
  118. // Whether to retain acknowledged messages. If true, acknowledged messages
  119. // will not be expunged until they fall out of the RetentionDuration window.
  120. RetainAckedMessages bool
  121. // How long to retain messages in backlog, from the time of publish. If
  122. // RetainAckedMessages is true, this duration affects the retention of
  123. // acknowledged messages, otherwise only unacknowledged messages are retained.
  124. // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
  125. RetentionDuration time.Duration
  126. }
  127. func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
  128. var pbPushConfig *pb.PushConfig
  129. if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 {
  130. pbPushConfig = &pb.PushConfig{
  131. Attributes: cfg.PushConfig.Attributes,
  132. PushEndpoint: cfg.PushConfig.Endpoint,
  133. }
  134. }
  135. var retentionDuration *durpb.Duration
  136. if cfg.RetentionDuration != 0 {
  137. retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
  138. }
  139. return &pb.Subscription{
  140. Name: name,
  141. Topic: cfg.Topic.name,
  142. PushConfig: pbPushConfig,
  143. AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
  144. RetainAckedMessages: cfg.RetainAckedMessages,
  145. MessageRetentionDuration: retentionDuration,
  146. }
  147. }
  148. func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
  149. rd := time.Hour * 24 * 7
  150. var err error
  151. if pbSub.MessageRetentionDuration != nil {
  152. rd, err = ptypes.Duration(pbSub.MessageRetentionDuration)
  153. if err != nil {
  154. return SubscriptionConfig{}, err
  155. }
  156. }
  157. return SubscriptionConfig{
  158. Topic: newTopic(c, pbSub.Topic),
  159. AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
  160. PushConfig: PushConfig{
  161. Endpoint: pbSub.PushConfig.PushEndpoint,
  162. Attributes: pbSub.PushConfig.Attributes,
  163. },
  164. RetainAckedMessages: pbSub.RetainAckedMessages,
  165. RetentionDuration: rd,
  166. }, nil
  167. }
  168. // ReceiveSettings configure the Receive method.
  169. // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
  170. type ReceiveSettings struct {
  171. // MaxExtension is the maximum period for which the Subscription should
  172. // automatically extend the ack deadline for each message.
  173. //
  174. // The Subscription will automatically extend the ack deadline of all
  175. // fetched Messages for the duration specified. Automatic deadline
  176. // extension may be disabled by specifying a duration less than 0.
  177. //
  178. // Connections may be terminated if they last longer than 30m, which
  179. // effectively makes that the ceiling for this value. For longer message
  180. // processing, see the example at https://godoc.org/cloud.google.com/go/pubsub/apiv1#example_SubscriberClient_Pull_lengthyClientProcessing
  181. MaxExtension time.Duration
  182. // MaxOutstandingMessages is the maximum number of unprocessed messages
  183. // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
  184. // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
  185. // If the value is negative, then there will be no limit on the number of
  186. // unprocessed messages.
  187. MaxOutstandingMessages int
  188. // MaxOutstandingBytes is the maximum size of unprocessed messages
  189. // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
  190. // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
  191. // the value is negative, then there will be no limit on the number of bytes
  192. // for unprocessed messages.
  193. MaxOutstandingBytes int
  194. // NumGoroutines is the number of goroutines Receive will spawn to pull
  195. // messages concurrently. If NumGoroutines is less than 1, it will be treated
  196. // as if it were DefaultReceiveSettings.NumGoroutines.
  197. //
  198. // NumGoroutines does not limit the number of messages that can be processed
  199. // concurrently. Even with one goroutine, many messages might be processed at
  200. // once, because that goroutine may continually receive messages and invoke the
  201. // function passed to Receive on them. To limit the number of messages being
  202. // processed concurrently, set MaxOutstandingMessages.
  203. NumGoroutines int
  204. }
  205. // DefaultReceiveSettings holds the default values for ReceiveSettings.
  206. var DefaultReceiveSettings = ReceiveSettings{
  207. MaxExtension: 10 * time.Minute,
  208. MaxOutstandingMessages: 1000,
  209. MaxOutstandingBytes: 1e9, // 1G
  210. NumGoroutines: 1,
  211. }
  212. // Delete deletes the subscription.
  213. func (s *Subscription) Delete(ctx context.Context) error {
  214. return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
  215. }
  216. // Exists reports whether the subscription exists on the server.
  217. func (s *Subscription) Exists(ctx context.Context) (bool, error) {
  218. _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
  219. if err == nil {
  220. return true, nil
  221. }
  222. if grpc.Code(err) == codes.NotFound {
  223. return false, nil
  224. }
  225. return false, err
  226. }
  227. // Config fetches the current configuration for the subscription.
  228. func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
  229. pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
  230. if err != nil {
  231. return SubscriptionConfig{}, err
  232. }
  233. cfg, err := protoToSubscriptionConfig(pbSub, s.c)
  234. if err != nil {
  235. return SubscriptionConfig{}, err
  236. }
  237. return cfg, nil
  238. }
  239. // SubscriptionConfigToUpdate describes how to update a subscription.
  240. type SubscriptionConfigToUpdate struct {
  241. // If non-nil, the push config is changed.
  242. PushConfig *PushConfig
  243. // If non-zero, the ack deadline is changed.
  244. AckDeadline time.Duration
  245. // If set, RetainAckedMessages is changed.
  246. RetainAckedMessages optional.Bool
  247. // If non-zero, RetentionDuration is changed.
  248. RetentionDuration time.Duration
  249. }
  250. // Update changes an existing subscription according to the fields set in cfg.
  251. // It returns the new SubscriptionConfig.
  252. //
  253. // Update returns an error if no fields were modified.
  254. func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
  255. req := s.updateRequest(&cfg)
  256. if len(req.UpdateMask.Paths) == 0 {
  257. return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
  258. }
  259. rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
  260. if err != nil {
  261. return SubscriptionConfig{}, err
  262. }
  263. return protoToSubscriptionConfig(rpsub, s.c)
  264. }
  265. func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
  266. psub := &pb.Subscription{Name: s.name}
  267. var paths []string
  268. if cfg.PushConfig != nil {
  269. psub.PushConfig = cfg.PushConfig.toProto()
  270. paths = append(paths, "push_config")
  271. }
  272. if cfg.AckDeadline != 0 {
  273. psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
  274. paths = append(paths, "ack_deadline_seconds")
  275. }
  276. if cfg.RetainAckedMessages != nil {
  277. psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
  278. paths = append(paths, "retain_acked_messages")
  279. }
  280. if cfg.RetentionDuration != 0 {
  281. psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
  282. paths = append(paths, "message_retention_duration")
  283. }
  284. return &pb.UpdateSubscriptionRequest{
  285. Subscription: psub,
  286. UpdateMask: &fmpb.FieldMask{Paths: paths},
  287. }
  288. }
  289. func (s *Subscription) IAM() *iam.Handle {
  290. return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
  291. }
  292. // CreateSubscription creates a new subscription on a topic.
  293. //
  294. // id is the name of the subscription to create. It must start with a letter,
  295. // and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
  296. // underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
  297. // must be between 3 and 255 characters in length, and must not start with
  298. // "goog".
  299. //
  300. // cfg.Topic is the topic from which the subscription should receive messages. It
  301. // need not belong to the same project as the subscription. This field is required.
  302. //
  303. // cfg.AckDeadline is the maximum time after a subscriber receives a message before
  304. // the subscriber should acknowledge the message. It must be between 10 and 600
  305. // seconds (inclusive), and is rounded down to the nearest second. If the
  306. // provided ackDeadline is 0, then the default value of 10 seconds is used.
  307. // Note: messages which are obtained via Subscription.Receive need not be
  308. // acknowledged within this deadline, as the deadline will be automatically
  309. // extended.
  310. //
  311. // cfg.PushConfig may be set to configure this subscription for push delivery.
  312. //
  313. // If the subscription already exists an error will be returned.
  314. func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
  315. if cfg.Topic == nil {
  316. return nil, errors.New("pubsub: require non-nil Topic")
  317. }
  318. if cfg.AckDeadline == 0 {
  319. cfg.AckDeadline = 10 * time.Second
  320. }
  321. if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
  322. return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
  323. }
  324. sub := c.Subscription(id)
  325. _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
  326. if err != nil {
  327. return nil, err
  328. }
  329. return sub, nil
  330. }
  331. var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
  332. // Receive calls f with the outstanding messages from the subscription.
  333. // It blocks until ctx is done, or the service returns a non-retryable error.
  334. //
  335. // The standard way to terminate a Receive is to cancel its context:
  336. //
  337. // cctx, cancel := context.WithCancel(ctx)
  338. // err := sub.Receive(cctx, callback)
  339. // // Call cancel from callback, or another goroutine.
  340. //
  341. // If the service returns a non-retryable error, Receive returns that error after
  342. // all of the outstanding calls to f have returned. If ctx is done, Receive
  343. // returns nil after all of the outstanding calls to f have returned and
  344. // all messages have been acknowledged or have expired.
  345. //
  346. // Receive calls f concurrently from multiple goroutines. It is encouraged to
  347. // process messages synchronously in f, even if that processing is relatively
  348. // time-consuming; Receive will spawn new goroutines for incoming messages,
  349. // limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
  350. //
  351. // The context passed to f will be canceled when ctx is Done or there is a
  352. // fatal service error.
  353. //
  354. // Receive will automatically extend the ack deadline of all fetched Messages for the
  355. // period specified by s.ReceiveSettings.MaxExtension.
  356. //
  357. // Each Subscription may have only one invocation of Receive active at a time.
  358. func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
  359. s.mu.Lock()
  360. if s.receiveActive {
  361. s.mu.Unlock()
  362. return errReceiveInProgress
  363. }
  364. s.receiveActive = true
  365. s.mu.Unlock()
  366. defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
  367. config, err := s.Config(ctx)
  368. if err != nil {
  369. if grpc.Code(err) == codes.Canceled {
  370. return nil
  371. }
  372. return err
  373. }
  374. maxCount := s.ReceiveSettings.MaxOutstandingMessages
  375. if maxCount == 0 {
  376. maxCount = DefaultReceiveSettings.MaxOutstandingMessages
  377. }
  378. maxBytes := s.ReceiveSettings.MaxOutstandingBytes
  379. if maxBytes == 0 {
  380. maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
  381. }
  382. maxExt := s.ReceiveSettings.MaxExtension
  383. if maxExt == 0 {
  384. maxExt = DefaultReceiveSettings.MaxExtension
  385. } else if maxExt < 0 {
  386. // If MaxExtension is negative, disable automatic extension.
  387. maxExt = 0
  388. }
  389. numGoroutines := s.ReceiveSettings.NumGoroutines
  390. if numGoroutines < 1 {
  391. numGoroutines = DefaultReceiveSettings.NumGoroutines
  392. }
  393. // TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
  394. po := &pullOptions{
  395. maxExtension: maxExt,
  396. maxPrefetch: trunc32(int64(maxCount)),
  397. ackDeadline: config.AckDeadline,
  398. }
  399. fc := newFlowController(maxCount, maxBytes)
  400. // Wait for all goroutines started by Receive to return, so instead of an
  401. // obscure goroutine leak we have an obvious blocked call to Receive.
  402. group, gctx := errgroup.WithContext(ctx)
  403. for i := 0; i < numGoroutines; i++ {
  404. group.Go(func() error {
  405. return s.receive(gctx, po, fc, f)
  406. })
  407. }
  408. return group.Wait()
  409. }
  410. func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
  411. // Cancel a sub-context when we return, to kick the context-aware callbacks
  412. // and the goroutine below.
  413. ctx2, cancel := context.WithCancel(ctx)
  414. // Call stop when Receive's context is done.
  415. // Stop will block until all outstanding messages have been acknowledged
  416. // or there was a fatal service error.
  417. // The iterator does not use the context passed to Receive. If it did, canceling
  418. // that context would immediately stop the iterator without waiting for unacked
  419. // messages.
  420. iter := newMessageIterator(context.Background(), s.c.subc, s.name, po)
  421. // We cannot use errgroup from Receive here. Receive might already be calling group.Wait,
  422. // and group.Wait cannot be called concurrently with group.Go. We give each receive() its
  423. // own WaitGroup instead.
  424. // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed
  425. // to be called after all Adds.
  426. var wg sync.WaitGroup
  427. wg.Add(1)
  428. go func() {
  429. <-ctx2.Done()
  430. iter.stop()
  431. wg.Done()
  432. }()
  433. defer wg.Wait()
  434. defer cancel()
  435. for {
  436. msgs, err := iter.receive()
  437. if err == io.EOF {
  438. return nil
  439. }
  440. if err != nil {
  441. return err
  442. }
  443. for i, msg := range msgs {
  444. msg := msg
  445. // TODO(jba): call acquire closer to when the message is allocated.
  446. if err := fc.acquire(ctx, len(msg.Data)); err != nil {
  447. // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
  448. for _, m := range msgs[i:] {
  449. m.Nack()
  450. }
  451. return nil
  452. }
  453. old := msg.doneFunc
  454. msgLen := len(msg.Data)
  455. msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
  456. defer fc.release(msgLen)
  457. old(ackID, ack, receiveTime)
  458. }
  459. wg.Add(1)
  460. go func() {
  461. defer wg.Done()
  462. f(ctx2, msg)
  463. }()
  464. }
  465. }
  466. }
  467. // TODO(jba): remove when we delete messageIterator.
  468. type pullOptions struct {
  469. maxExtension time.Duration
  470. maxPrefetch int32
  471. // ackDeadline is the default ack deadline for the subscription. Not
  472. // configurable.
  473. ackDeadline time.Duration
  474. }