You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

588 lines
21 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "strings"
  21. "sync"
  22. "time"
  23. "cloud.google.com/go/iam"
  24. "cloud.google.com/go/internal/optional"
  25. "github.com/golang/protobuf/ptypes"
  26. durpb "github.com/golang/protobuf/ptypes/duration"
  27. gax "github.com/googleapis/gax-go/v2"
  28. "golang.org/x/sync/errgroup"
  29. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  30. fmpb "google.golang.org/genproto/protobuf/field_mask"
  31. "google.golang.org/grpc"
  32. "google.golang.org/grpc/codes"
  33. )
  34. // Subscription is a reference to a PubSub subscription.
  35. type Subscription struct {
  36. c *Client
  37. // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
  38. name string
  39. // Settings for pulling messages. Configure these before calling Receive.
  40. ReceiveSettings ReceiveSettings
  41. mu sync.Mutex
  42. receiveActive bool
  43. }
  44. // Subscription creates a reference to a subscription.
  45. func (c *Client) Subscription(id string) *Subscription {
  46. return c.SubscriptionInProject(id, c.projectID)
  47. }
  48. // SubscriptionInProject creates a reference to a subscription in a given project.
  49. func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
  50. return &Subscription{
  51. c: c,
  52. name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
  53. }
  54. }
  55. // String returns the globally unique printable name of the subscription.
  56. func (s *Subscription) String() string {
  57. return s.name
  58. }
  59. // ID returns the unique identifier of the subscription within its project.
  60. func (s *Subscription) ID() string {
  61. slash := strings.LastIndex(s.name, "/")
  62. if slash == -1 {
  63. // name is not a fully-qualified name.
  64. panic("bad subscription name")
  65. }
  66. return s.name[slash+1:]
  67. }
  68. // Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
  69. func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
  70. it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
  71. Project: c.fullyQualifiedProjectName(),
  72. })
  73. return &SubscriptionIterator{
  74. c: c,
  75. next: func() (string, error) {
  76. sub, err := it.Next()
  77. if err != nil {
  78. return "", err
  79. }
  80. return sub.Name, nil
  81. },
  82. }
  83. }
  84. // SubscriptionIterator is an iterator that returns a series of subscriptions.
  85. type SubscriptionIterator struct {
  86. c *Client
  87. next func() (string, error)
  88. }
  89. // Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
  90. func (subs *SubscriptionIterator) Next() (*Subscription, error) {
  91. subName, err := subs.next()
  92. if err != nil {
  93. return nil, err
  94. }
  95. return &Subscription{c: subs.c, name: subName}, nil
  96. }
  97. // PushConfig contains configuration for subscriptions that operate in push mode.
  98. type PushConfig struct {
  99. // A URL locating the endpoint to which messages should be pushed.
  100. Endpoint string
  101. // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
  102. Attributes map[string]string
  103. }
  104. func (pc *PushConfig) toProto() *pb.PushConfig {
  105. return &pb.PushConfig{
  106. Attributes: pc.Attributes,
  107. PushEndpoint: pc.Endpoint,
  108. }
  109. }
  110. // SubscriptionConfig describes the configuration of a subscription.
  111. type SubscriptionConfig struct {
  112. Topic *Topic
  113. PushConfig PushConfig
  114. // The default maximum time after a subscriber receives a message before
  115. // the subscriber should acknowledge the message. Note: messages which are
  116. // obtained via Subscription.Receive need not be acknowledged within this
  117. // deadline, as the deadline will be automatically extended.
  118. AckDeadline time.Duration
  119. // Whether to retain acknowledged messages. If true, acknowledged messages
  120. // will not be expunged until they fall out of the RetentionDuration window.
  121. RetainAckedMessages bool
  122. // How long to retain messages in backlog, from the time of publish. If
  123. // RetainAckedMessages is true, this duration affects the retention of
  124. // acknowledged messages, otherwise only unacknowledged messages are retained.
  125. // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
  126. RetentionDuration time.Duration
  127. // The set of labels for the subscription.
  128. Labels map[string]string
  129. }
  130. func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
  131. var pbPushConfig *pb.PushConfig
  132. if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 {
  133. pbPushConfig = &pb.PushConfig{
  134. Attributes: cfg.PushConfig.Attributes,
  135. PushEndpoint: cfg.PushConfig.Endpoint,
  136. }
  137. }
  138. var retentionDuration *durpb.Duration
  139. if cfg.RetentionDuration != 0 {
  140. retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
  141. }
  142. return &pb.Subscription{
  143. Name: name,
  144. Topic: cfg.Topic.name,
  145. PushConfig: pbPushConfig,
  146. AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
  147. RetainAckedMessages: cfg.RetainAckedMessages,
  148. MessageRetentionDuration: retentionDuration,
  149. Labels: cfg.Labels,
  150. }
  151. }
  152. func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
  153. rd := time.Hour * 24 * 7
  154. var err error
  155. if pbSub.MessageRetentionDuration != nil {
  156. rd, err = ptypes.Duration(pbSub.MessageRetentionDuration)
  157. if err != nil {
  158. return SubscriptionConfig{}, err
  159. }
  160. }
  161. return SubscriptionConfig{
  162. Topic: newTopic(c, pbSub.Topic),
  163. AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
  164. PushConfig: PushConfig{
  165. Endpoint: pbSub.PushConfig.PushEndpoint,
  166. Attributes: pbSub.PushConfig.Attributes,
  167. },
  168. RetainAckedMessages: pbSub.RetainAckedMessages,
  169. RetentionDuration: rd,
  170. Labels: pbSub.Labels,
  171. }, nil
  172. }
  173. // ReceiveSettings configure the Receive method.
  174. // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
  175. type ReceiveSettings struct {
  176. // MaxExtension is the maximum period for which the Subscription should
  177. // automatically extend the ack deadline for each message.
  178. //
  179. // The Subscription will automatically extend the ack deadline of all
  180. // fetched Messages up to the duration specified. Automatic deadline
  181. // extension beyond the initial receipt may be disabled by specifying a
  182. // duration less than 0.
  183. MaxExtension time.Duration
  184. // MaxOutstandingMessages is the maximum number of unprocessed messages
  185. // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
  186. // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
  187. // If the value is negative, then there will be no limit on the number of
  188. // unprocessed messages.
  189. MaxOutstandingMessages int
  190. // MaxOutstandingBytes is the maximum size of unprocessed messages
  191. // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
  192. // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
  193. // the value is negative, then there will be no limit on the number of bytes
  194. // for unprocessed messages.
  195. MaxOutstandingBytes int
  196. // NumGoroutines is the number of goroutines Receive will spawn to pull
  197. // messages concurrently. If NumGoroutines is less than 1, it will be treated
  198. // as if it were DefaultReceiveSettings.NumGoroutines.
  199. //
  200. // NumGoroutines does not limit the number of messages that can be processed
  201. // concurrently. Even with one goroutine, many messages might be processed at
  202. // once, because that goroutine may continually receive messages and invoke the
  203. // function passed to Receive on them. To limit the number of messages being
  204. // processed concurrently, set MaxOutstandingMessages.
  205. NumGoroutines int
  206. // If Synchronous is true, then no more than MaxOutstandingMessages will be in
  207. // memory at one time. (In contrast, when Synchronous is false, more than
  208. // MaxOutstandingMessages may have been received from the service and in memory
  209. // before being processed.) MaxOutstandingBytes still refers to the total bytes
  210. // processed, rather than in memory. NumGoroutines is ignored.
  211. // The default is false.
  212. Synchronous bool
  213. }
  214. // For synchronous receive, the time to wait if we are already processing
  215. // MaxOutstandingMessages. There is no point calling Pull and asking for zero
  216. // messages, so we pause to allow some message-processing callbacks to finish.
  217. //
  218. // The wait time is large enough to avoid consuming significant CPU, but
  219. // small enough to provide decent throughput. Users who want better
  220. // throughput should not be using synchronous mode.
  221. //
  222. // Waiting might seem like polling, so it's natural to think we could do better by
  223. // noticing when a callback is finished and immediately calling Pull. But if
  224. // callbacks finish in quick succession, this will result in frequent Pull RPCs that
  225. // request a single message, which wastes network bandwidth. Better to wait for a few
  226. // callbacks to finish, so we make fewer RPCs fetching more messages.
  227. //
  228. // This value is unexported so the user doesn't have another knob to think about. Note that
  229. // it is the same value as the one used for nackTicker, so it matches this client's
  230. // idea of a duration that is short, but not so short that we perform excessive RPCs.
  231. const synchronousWaitTime = 100 * time.Millisecond
  232. // This is a var so that tests can change it.
  233. var minAckDeadline = 10 * time.Second
  234. // DefaultReceiveSettings holds the default values for ReceiveSettings.
  235. var DefaultReceiveSettings = ReceiveSettings{
  236. MaxExtension: 10 * time.Minute,
  237. MaxOutstandingMessages: 1000,
  238. MaxOutstandingBytes: 1e9, // 1G
  239. NumGoroutines: 1,
  240. }
  241. // Delete deletes the subscription.
  242. func (s *Subscription) Delete(ctx context.Context) error {
  243. return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
  244. }
  245. // Exists reports whether the subscription exists on the server.
  246. func (s *Subscription) Exists(ctx context.Context) (bool, error) {
  247. _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
  248. if err == nil {
  249. return true, nil
  250. }
  251. if grpc.Code(err) == codes.NotFound {
  252. return false, nil
  253. }
  254. return false, err
  255. }
  256. // Config fetches the current configuration for the subscription.
  257. func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
  258. pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
  259. if err != nil {
  260. return SubscriptionConfig{}, err
  261. }
  262. cfg, err := protoToSubscriptionConfig(pbSub, s.c)
  263. if err != nil {
  264. return SubscriptionConfig{}, err
  265. }
  266. return cfg, nil
  267. }
  268. // SubscriptionConfigToUpdate describes how to update a subscription.
  269. type SubscriptionConfigToUpdate struct {
  270. // If non-nil, the push config is changed.
  271. PushConfig *PushConfig
  272. // If non-zero, the ack deadline is changed.
  273. AckDeadline time.Duration
  274. // If set, RetainAckedMessages is changed.
  275. RetainAckedMessages optional.Bool
  276. // If non-zero, RetentionDuration is changed.
  277. RetentionDuration time.Duration
  278. // If non-nil, the current set of labels is completely
  279. // replaced by the new set.
  280. // This field has beta status. It is not subject to the stability guarantee
  281. // and may change.
  282. Labels map[string]string
  283. }
  284. // Update changes an existing subscription according to the fields set in cfg.
  285. // It returns the new SubscriptionConfig.
  286. //
  287. // Update returns an error if no fields were modified.
  288. func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
  289. req := s.updateRequest(&cfg)
  290. if len(req.UpdateMask.Paths) == 0 {
  291. return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
  292. }
  293. rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
  294. if err != nil {
  295. return SubscriptionConfig{}, err
  296. }
  297. return protoToSubscriptionConfig(rpsub, s.c)
  298. }
  299. func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
  300. psub := &pb.Subscription{Name: s.name}
  301. var paths []string
  302. if cfg.PushConfig != nil {
  303. psub.PushConfig = cfg.PushConfig.toProto()
  304. paths = append(paths, "push_config")
  305. }
  306. if cfg.AckDeadline != 0 {
  307. psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
  308. paths = append(paths, "ack_deadline_seconds")
  309. }
  310. if cfg.RetainAckedMessages != nil {
  311. psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
  312. paths = append(paths, "retain_acked_messages")
  313. }
  314. if cfg.RetentionDuration != 0 {
  315. psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
  316. paths = append(paths, "message_retention_duration")
  317. }
  318. if cfg.Labels != nil {
  319. psub.Labels = cfg.Labels
  320. paths = append(paths, "labels")
  321. }
  322. return &pb.UpdateSubscriptionRequest{
  323. Subscription: psub,
  324. UpdateMask: &fmpb.FieldMask{Paths: paths},
  325. }
  326. }
  327. // IAM returns the subscription's IAM handle.
  328. func (s *Subscription) IAM() *iam.Handle {
  329. return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
  330. }
  331. // CreateSubscription creates a new subscription on a topic.
  332. //
  333. // id is the name of the subscription to create. It must start with a letter,
  334. // and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
  335. // underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
  336. // must be between 3 and 255 characters in length, and must not start with
  337. // "goog".
  338. //
  339. // cfg.Topic is the topic from which the subscription should receive messages. It
  340. // need not belong to the same project as the subscription. This field is required.
  341. //
  342. // cfg.AckDeadline is the maximum time after a subscriber receives a message before
  343. // the subscriber should acknowledge the message. It must be between 10 and 600
  344. // seconds (inclusive), and is rounded down to the nearest second. If the
  345. // provided ackDeadline is 0, then the default value of 10 seconds is used.
  346. // Note: messages which are obtained via Subscription.Receive need not be
  347. // acknowledged within this deadline, as the deadline will be automatically
  348. // extended.
  349. //
  350. // cfg.PushConfig may be set to configure this subscription for push delivery.
  351. //
  352. // If the subscription already exists an error will be returned.
  353. func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
  354. if cfg.Topic == nil {
  355. return nil, errors.New("pubsub: require non-nil Topic")
  356. }
  357. if cfg.AckDeadline == 0 {
  358. cfg.AckDeadline = 10 * time.Second
  359. }
  360. if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
  361. return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
  362. }
  363. sub := c.Subscription(id)
  364. _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
  365. if err != nil {
  366. return nil, err
  367. }
  368. return sub, nil
  369. }
  370. var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
  371. // Receive calls f with the outstanding messages from the subscription.
  372. // It blocks until ctx is done, or the service returns a non-retryable error.
  373. //
  374. // The standard way to terminate a Receive is to cancel its context:
  375. //
  376. // cctx, cancel := context.WithCancel(ctx)
  377. // err := sub.Receive(cctx, callback)
  378. // // Call cancel from callback, or another goroutine.
  379. //
  380. // If the service returns a non-retryable error, Receive returns that error after
  381. // all of the outstanding calls to f have returned. If ctx is done, Receive
  382. // returns nil after all of the outstanding calls to f have returned and
  383. // all messages have been acknowledged or have expired.
  384. //
  385. // Receive calls f concurrently from multiple goroutines. It is encouraged to
  386. // process messages synchronously in f, even if that processing is relatively
  387. // time-consuming; Receive will spawn new goroutines for incoming messages,
  388. // limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
  389. //
  390. // The context passed to f will be canceled when ctx is Done or there is a
  391. // fatal service error.
  392. //
  393. // Receive will send an ack deadline extension on message receipt, then
  394. // automatically extend the ack deadline of all fetched Messages up to the
  395. // period specified by s.ReceiveSettings.MaxExtension.
  396. //
  397. // Each Subscription may have only one invocation of Receive active at a time.
  398. func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
  399. s.mu.Lock()
  400. if s.receiveActive {
  401. s.mu.Unlock()
  402. return errReceiveInProgress
  403. }
  404. s.receiveActive = true
  405. s.mu.Unlock()
  406. defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
  407. maxCount := s.ReceiveSettings.MaxOutstandingMessages
  408. if maxCount == 0 {
  409. maxCount = DefaultReceiveSettings.MaxOutstandingMessages
  410. }
  411. maxBytes := s.ReceiveSettings.MaxOutstandingBytes
  412. if maxBytes == 0 {
  413. maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
  414. }
  415. maxExt := s.ReceiveSettings.MaxExtension
  416. if maxExt == 0 {
  417. maxExt = DefaultReceiveSettings.MaxExtension
  418. } else if maxExt < 0 {
  419. // If MaxExtension is negative, disable automatic extension.
  420. maxExt = 0
  421. }
  422. var numGoroutines int
  423. switch {
  424. case s.ReceiveSettings.Synchronous:
  425. numGoroutines = 1
  426. case s.ReceiveSettings.NumGoroutines >= 1:
  427. numGoroutines = s.ReceiveSettings.NumGoroutines
  428. default:
  429. numGoroutines = DefaultReceiveSettings.NumGoroutines
  430. }
  431. // TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
  432. po := &pullOptions{
  433. maxExtension: maxExt,
  434. maxPrefetch: trunc32(int64(maxCount)),
  435. synchronous: s.ReceiveSettings.Synchronous,
  436. }
  437. fc := newFlowController(maxCount, maxBytes)
  438. // Wait for all goroutines started by Receive to return, so instead of an
  439. // obscure goroutine leak we have an obvious blocked call to Receive.
  440. group, gctx := errgroup.WithContext(ctx)
  441. for i := 0; i < numGoroutines; i++ {
  442. group.Go(func() error {
  443. return s.receive(gctx, po, fc, f)
  444. })
  445. }
  446. return group.Wait()
  447. }
  448. func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
  449. // Cancel a sub-context when we return, to kick the context-aware callbacks
  450. // and the goroutine below.
  451. ctx2, cancel := context.WithCancel(ctx)
  452. // The iterator does not use the context passed to Receive. If it did, canceling
  453. // that context would immediately stop the iterator without waiting for unacked
  454. // messages.
  455. iter := newMessageIterator(s.c.subc, s.name, po)
  456. // We cannot use errgroup from Receive here. Receive might already be calling group.Wait,
  457. // and group.Wait cannot be called concurrently with group.Go. We give each receive() its
  458. // own WaitGroup instead.
  459. // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed
  460. // to be called after all Adds.
  461. var wg sync.WaitGroup
  462. wg.Add(1)
  463. go func() {
  464. <-ctx2.Done()
  465. // Call stop when Receive's context is done.
  466. // Stop will block until all outstanding messages have been acknowledged
  467. // or there was a fatal service error.
  468. iter.stop()
  469. wg.Done()
  470. }()
  471. defer wg.Wait()
  472. defer cancel()
  473. for {
  474. var maxToPull int32 // maximum number of messages to pull
  475. if po.synchronous {
  476. if po.maxPrefetch < 0 {
  477. // If there is no limit on the number of messages to pull, use a reasonable default.
  478. maxToPull = 1000
  479. } else {
  480. // Limit the number of messages in memory to MaxOutstandingMessages
  481. // (here, po.maxPrefetch). For each message currently in memory, we have
  482. // called fc.acquire but not fc.release: this is fc.count(). The next
  483. // call to Pull should fetch no more than the difference between these
  484. // values.
  485. maxToPull = po.maxPrefetch - int32(fc.count())
  486. if maxToPull <= 0 {
  487. // Wait for some callbacks to finish.
  488. if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
  489. // Return nil if the context is done, not err.
  490. return nil
  491. }
  492. continue
  493. }
  494. }
  495. }
  496. msgs, err := iter.receive(maxToPull)
  497. if err == io.EOF {
  498. return nil
  499. }
  500. if err != nil {
  501. return err
  502. }
  503. for i, msg := range msgs {
  504. msg := msg
  505. // TODO(jba): call acquire closer to when the message is allocated.
  506. if err := fc.acquire(ctx, len(msg.Data)); err != nil {
  507. // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
  508. for _, m := range msgs[i:] {
  509. m.Nack()
  510. }
  511. // Return nil if the context is done, not err.
  512. return nil
  513. }
  514. old := msg.doneFunc
  515. msgLen := len(msg.Data)
  516. msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
  517. defer fc.release(msgLen)
  518. old(ackID, ack, receiveTime)
  519. }
  520. wg.Add(1)
  521. go func() {
  522. defer wg.Done()
  523. f(ctx2, msg)
  524. }()
  525. }
  526. }
  527. }
  528. type pullOptions struct {
  529. maxExtension time.Duration
  530. maxPrefetch int32
  531. // If true, use unary Pull instead of StreamingPull, and never pull more
  532. // than maxPrefetch messages.
  533. synchronous bool
  534. }