You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

462 lines
14 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "runtime"
  20. "strings"
  21. "sync"
  22. "time"
  23. "cloud.google.com/go/iam"
  24. "github.com/golang/protobuf/proto"
  25. gax "github.com/googleapis/gax-go/v2"
  26. "google.golang.org/api/support/bundler"
  27. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  28. fmpb "google.golang.org/genproto/protobuf/field_mask"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/codes"
  31. )
  32. const (
  33. // MaxPublishRequestCount is the maximum number of messages that can be in a single publish request, as
  34. // defined by the PubSub service.
  35. MaxPublishRequestCount = 1000
  36. // MaxPublishRequestBytes is the maximum size of a single publish request in bytes, as defined by the PubSub
  37. // service.
  38. MaxPublishRequestBytes = 1e7
  39. maxInt = int(^uint(0) >> 1)
  40. )
  41. // ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
  42. var ErrOversizedMessage = bundler.ErrOversizedItem
  43. // Topic is a reference to a PubSub topic.
  44. //
  45. // The methods of Topic are safe for use by multiple goroutines.
  46. type Topic struct {
  47. c *Client
  48. // The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
  49. name string
  50. // Settings for publishing messages. All changes must be made before the
  51. // first call to Publish. The default is DefaultPublishSettings.
  52. PublishSettings PublishSettings
  53. mu sync.RWMutex
  54. stopped bool
  55. bundler *bundler.Bundler
  56. }
  57. // PublishSettings control the bundling of published messages.
  58. type PublishSettings struct {
  59. // Publish a non-empty batch after this delay has passed.
  60. DelayThreshold time.Duration
  61. // Publish a batch when it has this many messages. The maximum is
  62. // MaxPublishRequestCount.
  63. CountThreshold int
  64. // Publish a batch when its size in bytes reaches this value.
  65. ByteThreshold int
  66. // The number of goroutines that invoke the Publish RPC concurrently.
  67. // Defaults to a multiple of GOMAXPROCS.
  68. NumGoroutines int
  69. // The maximum time that the client will attempt to publish a bundle of messages.
  70. Timeout time.Duration
  71. }
  72. // DefaultPublishSettings holds the default values for topics' PublishSettings.
  73. var DefaultPublishSettings = PublishSettings{
  74. DelayThreshold: 1 * time.Millisecond,
  75. CountThreshold: 100,
  76. ByteThreshold: 1e6,
  77. Timeout: 60 * time.Second,
  78. }
  79. // CreateTopic creates a new topic.
  80. // The specified topic ID must start with a letter, and contain only letters
  81. // ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
  82. // tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
  83. // characters in length, and must not start with "goog".
  84. // If the topic already exists an error will be returned.
  85. func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error) {
  86. t := c.Topic(id)
  87. _, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name})
  88. if err != nil {
  89. return nil, err
  90. }
  91. return t, nil
  92. }
  93. // Topic creates a reference to a topic in the client's project.
  94. //
  95. // If a Topic's Publish method is called, it has background goroutines
  96. // associated with it. Clean them up by calling Topic.Stop.
  97. //
  98. // Avoid creating many Topic instances if you use them to publish.
  99. func (c *Client) Topic(id string) *Topic {
  100. return c.TopicInProject(id, c.projectID)
  101. }
  102. // TopicInProject creates a reference to a topic in the given project.
  103. //
  104. // If a Topic's Publish method is called, it has background goroutines
  105. // associated with it. Clean them up by calling Topic.Stop.
  106. //
  107. // Avoid creating many Topic instances if you use them to publish.
  108. func (c *Client) TopicInProject(id, projectID string) *Topic {
  109. return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id))
  110. }
  111. func newTopic(c *Client, name string) *Topic {
  112. return &Topic{
  113. c: c,
  114. name: name,
  115. PublishSettings: DefaultPublishSettings,
  116. }
  117. }
  118. // TopicConfig describes the configuration of a topic.
  119. type TopicConfig struct {
  120. // The set of labels for the topic.
  121. Labels map[string]string
  122. // The topic's message storage policy.
  123. MessageStoragePolicy MessageStoragePolicy
  124. }
  125. // TopicConfigToUpdate describes how to update a topic.
  126. type TopicConfigToUpdate struct {
  127. // If non-nil, the current set of labels is completely
  128. // replaced by the new set.
  129. // This field has beta status. It is not subject to the stability guarantee
  130. // and may change.
  131. Labels map[string]string
  132. }
  133. func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
  134. return TopicConfig{
  135. Labels: pbt.Labels,
  136. MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
  137. }
  138. }
  139. // MessageStoragePolicy constrains how messages published to the topic may be stored. It
  140. // is determined when the topic is created based on the policy configured at
  141. // the project level.
  142. type MessageStoragePolicy struct {
  143. // The list of GCP regions where messages that are published to the topic may
  144. // be persisted in storage. Messages published by publishers running in
  145. // non-allowed GCP regions (or running outside of GCP altogether) will be
  146. // routed for storage in one of the allowed regions. An empty list indicates a
  147. // misconfiguration at the project or organization level, which will result in
  148. // all Publish operations failing.
  149. AllowedPersistenceRegions []string
  150. }
  151. func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy {
  152. if msp == nil {
  153. return MessageStoragePolicy{}
  154. }
  155. return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
  156. }
  157. // Config returns the TopicConfig for the topic.
  158. func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
  159. pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
  160. if err != nil {
  161. return TopicConfig{}, err
  162. }
  163. return protoToTopicConfig(pbt), nil
  164. }
  165. // Update changes an existing topic according to the fields set in cfg. It returns
  166. // the new TopicConfig.
  167. //
  168. // Any call to Update (even with an empty TopicConfigToUpdate) will update the
  169. // MessageStoragePolicy for the topic from the organization's settings.
  170. func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
  171. req := t.updateRequest(cfg)
  172. if len(req.UpdateMask.Paths) == 0 {
  173. return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update")
  174. }
  175. rpt, err := t.c.pubc.UpdateTopic(ctx, req)
  176. if err != nil {
  177. return TopicConfig{}, err
  178. }
  179. return protoToTopicConfig(rpt), nil
  180. }
  181. func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
  182. pt := &pb.Topic{Name: t.name}
  183. paths := []string{"message_storage_policy"} // always fetch
  184. if cfg.Labels != nil {
  185. pt.Labels = cfg.Labels
  186. paths = append(paths, "labels")
  187. }
  188. return &pb.UpdateTopicRequest{
  189. Topic: pt,
  190. UpdateMask: &fmpb.FieldMask{Paths: paths},
  191. }
  192. }
  193. // Topics returns an iterator which returns all of the topics for the client's project.
  194. func (c *Client) Topics(ctx context.Context) *TopicIterator {
  195. it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
  196. return &TopicIterator{
  197. c: c,
  198. next: func() (string, error) {
  199. topic, err := it.Next()
  200. if err != nil {
  201. return "", err
  202. }
  203. return topic.Name, nil
  204. },
  205. }
  206. }
  207. // TopicIterator is an iterator that returns a series of topics.
  208. type TopicIterator struct {
  209. c *Client
  210. next func() (string, error)
  211. }
  212. // Next returns the next topic. If there are no more topics, iterator.Done will be returned.
  213. func (tps *TopicIterator) Next() (*Topic, error) {
  214. topicName, err := tps.next()
  215. if err != nil {
  216. return nil, err
  217. }
  218. return newTopic(tps.c, topicName), nil
  219. }
  220. // ID returns the unique identifier of the topic within its project.
  221. func (t *Topic) ID() string {
  222. slash := strings.LastIndex(t.name, "/")
  223. if slash == -1 {
  224. // name is not a fully-qualified name.
  225. panic("bad topic name")
  226. }
  227. return t.name[slash+1:]
  228. }
  229. // String returns the printable globally unique name for the topic.
  230. func (t *Topic) String() string {
  231. return t.name
  232. }
  233. // Delete deletes the topic.
  234. func (t *Topic) Delete(ctx context.Context) error {
  235. return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name})
  236. }
  237. // Exists reports whether the topic exists on the server.
  238. func (t *Topic) Exists(ctx context.Context) (bool, error) {
  239. if t.name == "_deleted-topic_" {
  240. return false, nil
  241. }
  242. _, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
  243. if err == nil {
  244. return true, nil
  245. }
  246. if grpc.Code(err) == codes.NotFound {
  247. return false, nil
  248. }
  249. return false, err
  250. }
  251. // IAM returns the topic's IAM handle.
  252. func (t *Topic) IAM() *iam.Handle {
  253. return iam.InternalNewHandle(t.c.pubc.Connection(), t.name)
  254. }
  255. // Subscriptions returns an iterator which returns the subscriptions for this topic.
  256. //
  257. // Some of the returned subscriptions may belong to a project other than t.
  258. func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
  259. it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
  260. Topic: t.name,
  261. })
  262. return &SubscriptionIterator{
  263. c: t.c,
  264. next: it.Next,
  265. }
  266. }
  267. var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
  268. // Publish publishes msg to the topic asynchronously. Messages are batched and
  269. // sent according to the topic's PublishSettings. Publish never blocks.
  270. //
  271. // Publish returns a non-nil PublishResult which will be ready when the
  272. // message has been sent (or has failed to be sent) to the server.
  273. //
  274. // Publish creates goroutines for batching and sending messages. These goroutines
  275. // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
  276. // will immediately return a PublishResult with an error.
  277. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
  278. // TODO(jba): if this turns out to take significant time, try to approximate it.
  279. // Or, convert the messages to protos in Publish, instead of in the service.
  280. msg.size = proto.Size(&pb.PubsubMessage{
  281. Data: msg.Data,
  282. Attributes: msg.Attributes,
  283. })
  284. r := &PublishResult{ready: make(chan struct{})}
  285. t.initBundler()
  286. t.mu.RLock()
  287. defer t.mu.RUnlock()
  288. // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
  289. if t.stopped {
  290. r.set("", errTopicStopped)
  291. return r
  292. }
  293. // TODO(jba) [from bcmills] consider using a shared channel per bundle
  294. // (requires Bundler API changes; would reduce allocations)
  295. // The call to Add should never return an error because the bundler's
  296. // BufferedByteLimit is set to maxInt; we do not perform any flow
  297. // control in the client.
  298. err := t.bundler.Add(&bundledMessage{msg, r}, msg.size)
  299. if err != nil {
  300. r.set("", err)
  301. }
  302. return r
  303. }
  304. // Stop sends all remaining published messages and stop goroutines created for handling
  305. // publishing. Returns once all outstanding messages have been sent or have
  306. // failed to be sent.
  307. func (t *Topic) Stop() {
  308. t.mu.Lock()
  309. noop := t.stopped || t.bundler == nil
  310. t.stopped = true
  311. t.mu.Unlock()
  312. if noop {
  313. return
  314. }
  315. t.bundler.Flush()
  316. }
  317. // A PublishResult holds the result from a call to Publish.
  318. type PublishResult struct {
  319. ready chan struct{}
  320. serverID string
  321. err error
  322. }
  323. // Ready returns a channel that is closed when the result is ready.
  324. // When the Ready channel is closed, Get is guaranteed not to block.
  325. func (r *PublishResult) Ready() <-chan struct{} { return r.ready }
  326. // Get returns the server-generated message ID and/or error result of a Publish call.
  327. // Get blocks until the Publish call completes or the context is done.
  328. func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) {
  329. // If the result is already ready, return it even if the context is done.
  330. select {
  331. case <-r.Ready():
  332. return r.serverID, r.err
  333. default:
  334. }
  335. select {
  336. case <-ctx.Done():
  337. return "", ctx.Err()
  338. case <-r.Ready():
  339. return r.serverID, r.err
  340. }
  341. }
  342. func (r *PublishResult) set(sid string, err error) {
  343. r.serverID = sid
  344. r.err = err
  345. close(r.ready)
  346. }
  347. type bundledMessage struct {
  348. msg *Message
  349. res *PublishResult
  350. }
  351. func (t *Topic) initBundler() {
  352. t.mu.RLock()
  353. noop := t.stopped || t.bundler != nil
  354. t.mu.RUnlock()
  355. if noop {
  356. return
  357. }
  358. t.mu.Lock()
  359. defer t.mu.Unlock()
  360. // Must re-check, since we released the lock.
  361. if t.stopped || t.bundler != nil {
  362. return
  363. }
  364. timeout := t.PublishSettings.Timeout
  365. t.bundler = bundler.NewBundler(&bundledMessage{}, func(items interface{}) {
  366. // TODO(jba): use a context detached from the one passed to NewClient.
  367. ctx := context.TODO()
  368. if timeout != 0 {
  369. var cancel func()
  370. ctx, cancel = context.WithTimeout(ctx, timeout)
  371. defer cancel()
  372. }
  373. t.publishMessageBundle(ctx, items.([]*bundledMessage))
  374. })
  375. t.bundler.DelayThreshold = t.PublishSettings.DelayThreshold
  376. t.bundler.BundleCountThreshold = t.PublishSettings.CountThreshold
  377. if t.bundler.BundleCountThreshold > MaxPublishRequestCount {
  378. t.bundler.BundleCountThreshold = MaxPublishRequestCount
  379. }
  380. t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold
  381. t.bundler.BufferedByteLimit = maxInt
  382. t.bundler.BundleByteLimit = MaxPublishRequestBytes
  383. // Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently.
  384. // The default value was determined via extensive load testing (see the loadtest subdirectory).
  385. if t.PublishSettings.NumGoroutines > 0 {
  386. t.bundler.HandlerLimit = t.PublishSettings.NumGoroutines
  387. } else {
  388. t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0)
  389. }
  390. }
  391. func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
  392. pbMsgs := make([]*pb.PubsubMessage, len(bms))
  393. for i, bm := range bms {
  394. pbMsgs[i] = &pb.PubsubMessage{
  395. Data: bm.msg.Data,
  396. Attributes: bm.msg.Attributes,
  397. }
  398. bm.msg = nil // release bm.msg for GC
  399. }
  400. res, err := t.c.pubc.Publish(ctx, &pb.PublishRequest{
  401. Topic: t.name,
  402. Messages: pbMsgs,
  403. }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)))
  404. for i, bm := range bms {
  405. if err != nil {
  406. bm.res.set("", err)
  407. } else {
  408. bm.res.set(res.MessageIds[i], nil)
  409. }
  410. }
  411. }