You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

628 lines
18 KiB

  1. // Copyright 2014 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. "cloud.google.com/go/iam"
  21. "cloud.google.com/go/internal"
  22. "cloud.google.com/go/internal/testutil"
  23. "cloud.google.com/go/internal/uid"
  24. gax "github.com/googleapis/gax-go/v2"
  25. "google.golang.org/api/iterator"
  26. "google.golang.org/api/option"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/status"
  30. )
  31. var (
  32. topicIDs = uid.NewSpace("topic", nil)
  33. subIDs = uid.NewSpace("sub", nil)
  34. )
  35. // messageData is used to hold the contents of a message so that it can be compared against the contents
  36. // of another message without regard to irrelevant fields.
  37. type messageData struct {
  38. ID string
  39. Data []byte
  40. Attributes map[string]string
  41. }
  42. func extractMessageData(m *Message) *messageData {
  43. return &messageData{
  44. ID: m.ID,
  45. Data: m.Data,
  46. Attributes: m.Attributes,
  47. }
  48. }
  49. func integrationTestClient(ctx context.Context, t *testing.T) *Client {
  50. if testing.Short() {
  51. t.Skip("Integration tests skipped in short mode")
  52. }
  53. projID := testutil.ProjID()
  54. if projID == "" {
  55. t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
  56. }
  57. ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
  58. if ts == nil {
  59. t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
  60. }
  61. client, err := NewClient(ctx, projID, option.WithTokenSource(ts))
  62. if err != nil {
  63. t.Fatalf("Creating client error: %v", err)
  64. }
  65. return client
  66. }
  67. func TestIntegration_All(t *testing.T) {
  68. t.Parallel()
  69. ctx := context.Background()
  70. client := integrationTestClient(ctx, t)
  71. defer client.Close()
  72. topic, err := client.CreateTopic(ctx, topicIDs.New())
  73. if err != nil {
  74. t.Errorf("CreateTopic error: %v", err)
  75. }
  76. defer topic.Stop()
  77. exists, err := topic.Exists(ctx)
  78. if err != nil {
  79. t.Fatalf("TopicExists error: %v", err)
  80. }
  81. if !exists {
  82. t.Errorf("topic %v should exist, but it doesn't", topic)
  83. }
  84. var sub *Subscription
  85. if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
  86. t.Errorf("CreateSub error: %v", err)
  87. }
  88. exists, err = sub.Exists(ctx)
  89. if err != nil {
  90. t.Fatalf("SubExists error: %v", err)
  91. }
  92. if !exists {
  93. t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
  94. }
  95. for _, sync := range []bool{false, true} {
  96. for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
  97. testPublishAndReceive(t, topic, sub, maxMsgs, sync)
  98. }
  99. }
  100. if msg, ok := testIAM(ctx, topic.IAM(), "pubsub.topics.get"); !ok {
  101. t.Errorf("topic IAM: %s", msg)
  102. }
  103. if msg, ok := testIAM(ctx, sub.IAM(), "pubsub.subscriptions.get"); !ok {
  104. t.Errorf("sub IAM: %s", msg)
  105. }
  106. snap, err := sub.CreateSnapshot(ctx, "")
  107. if err != nil {
  108. t.Fatalf("CreateSnapshot error: %v", err)
  109. }
  110. timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute)
  111. defer cancel()
  112. err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
  113. snapIt := client.Snapshots(timeoutCtx)
  114. for {
  115. s, err := snapIt.Next()
  116. if err == nil && s.name == snap.name {
  117. return true, nil
  118. }
  119. if err == iterator.Done {
  120. return false, fmt.Errorf("cannot find snapshot: %q", snap.name)
  121. }
  122. if err != nil {
  123. return false, err
  124. }
  125. }
  126. })
  127. if err != nil {
  128. t.Error(err)
  129. }
  130. err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
  131. err := sub.SeekToSnapshot(timeoutCtx, snap.Snapshot)
  132. return err == nil, err
  133. })
  134. if err != nil {
  135. t.Error(err)
  136. }
  137. err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
  138. err := sub.SeekToTime(timeoutCtx, time.Now())
  139. return err == nil, err
  140. })
  141. if err != nil {
  142. t.Error(err)
  143. }
  144. err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
  145. snapHandle := client.Snapshot(snap.ID())
  146. err := snapHandle.Delete(timeoutCtx)
  147. return err == nil, err
  148. })
  149. if err != nil {
  150. t.Error(err)
  151. }
  152. if err := sub.Delete(ctx); err != nil {
  153. t.Errorf("DeleteSub error: %v", err)
  154. }
  155. if err := topic.Delete(ctx); err != nil {
  156. t.Errorf("DeleteTopic error: %v", err)
  157. }
  158. }
  159. func testPublishAndReceive(t *testing.T, topic *Topic, sub *Subscription, maxMsgs int, synchronous bool) {
  160. ctx := context.Background()
  161. var msgs []*Message
  162. for i := 0; i < 10; i++ {
  163. text := fmt.Sprintf("a message with an index %d", i)
  164. attrs := make(map[string]string)
  165. attrs["foo"] = "bar"
  166. msgs = append(msgs, &Message{
  167. Data: []byte(text),
  168. Attributes: attrs,
  169. })
  170. }
  171. // Publish some messages.
  172. type pubResult struct {
  173. m *Message
  174. r *PublishResult
  175. }
  176. var rs []pubResult
  177. for _, m := range msgs {
  178. r := topic.Publish(ctx, m)
  179. rs = append(rs, pubResult{m, r})
  180. }
  181. want := make(map[string]*messageData)
  182. for _, res := range rs {
  183. id, err := res.r.Get(ctx)
  184. if err != nil {
  185. t.Fatal(err)
  186. }
  187. md := extractMessageData(res.m)
  188. md.ID = id
  189. want[md.ID] = md
  190. }
  191. sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
  192. sub.ReceiveSettings.Synchronous = synchronous
  193. // Use a timeout to ensure that Pull does not block indefinitely if there are
  194. // unexpectedly few messages available.
  195. now := time.Now()
  196. timeoutCtx, _ := context.WithTimeout(ctx, time.Minute)
  197. gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
  198. m.Ack()
  199. })
  200. if err != nil {
  201. if c := status.Convert(err); c.Code() == codes.Canceled {
  202. if time.Now().Sub(now) >= time.Minute {
  203. t.Fatal("pullN took too long")
  204. }
  205. } else {
  206. t.Fatalf("Pull: %v", err)
  207. }
  208. }
  209. got := make(map[string]*messageData)
  210. for _, m := range gotMsgs {
  211. md := extractMessageData(m)
  212. got[md.ID] = md
  213. }
  214. if !testutil.Equal(got, want) {
  215. t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %v, messages want: %v",
  216. maxMsgs, synchronous, got, want)
  217. }
  218. }
  219. // IAM tests.
  220. // NOTE: for these to succeed, the test runner identity must have the Pub/Sub Admin or Owner roles.
  221. // To set, visit https://console.developers.google.com, select "IAM & Admin" from the top-left
  222. // menu, choose the account, click the Roles dropdown, and select "Pub/Sub > Pub/Sub Admin".
  223. // TODO(jba): move this to a testing package within cloud.google.com/iam, so we can re-use it.
  224. func testIAM(ctx context.Context, h *iam.Handle, permission string) (msg string, ok bool) {
  225. // Attempting to add an non-existent identity (e.g. "alice@example.com") causes the service
  226. // to return an internal error, so use a real identity.
  227. const member = "domain:google.com"
  228. var policy *iam.Policy
  229. var err error
  230. if policy, err = h.Policy(ctx); err != nil {
  231. return fmt.Sprintf("Policy: %v", err), false
  232. }
  233. // The resource is new, so the policy should be empty.
  234. if got := policy.Roles(); len(got) > 0 {
  235. return fmt.Sprintf("initially: got roles %v, want none", got), false
  236. }
  237. // Add a member, set the policy, then check that the member is present.
  238. policy.Add(member, iam.Viewer)
  239. if err := h.SetPolicy(ctx, policy); err != nil {
  240. return fmt.Sprintf("SetPolicy: %v", err), false
  241. }
  242. if policy, err = h.Policy(ctx); err != nil {
  243. return fmt.Sprintf("Policy: %v", err), false
  244. }
  245. if got, want := policy.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
  246. return fmt.Sprintf("after Add: got %v, want %v", got, want), false
  247. }
  248. // Now remove that member, set the policy, and check that it's empty again.
  249. policy.Remove(member, iam.Viewer)
  250. if err := h.SetPolicy(ctx, policy); err != nil {
  251. return fmt.Sprintf("SetPolicy: %v", err), false
  252. }
  253. if policy, err = h.Policy(ctx); err != nil {
  254. return fmt.Sprintf("Policy: %v", err), false
  255. }
  256. if got := policy.Roles(); len(got) > 0 {
  257. return fmt.Sprintf("after Remove: got roles %v, want none", got), false
  258. }
  259. // Call TestPermissions.
  260. // Because this user is an admin, it has all the permissions on the
  261. // resource type. Note: the service fails if we ask for inapplicable
  262. // permissions (e.g. a subscription permission on a topic, or a topic
  263. // create permission on a topic rather than its parent).
  264. wantPerms := []string{permission}
  265. gotPerms, err := h.TestPermissions(ctx, wantPerms)
  266. if err != nil {
  267. return fmt.Sprintf("TestPermissions: %v", err), false
  268. }
  269. if !testutil.Equal(gotPerms, wantPerms) {
  270. return fmt.Sprintf("TestPermissions: got %v, want %v", gotPerms, wantPerms), false
  271. }
  272. return "", true
  273. }
  274. func TestIntegration_CancelReceive(t *testing.T) {
  275. t.Parallel()
  276. ctx, cancel := context.WithCancel(context.Background())
  277. client := integrationTestClient(ctx, t)
  278. defer client.Close()
  279. topic, err := client.CreateTopic(ctx, topicIDs.New())
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. defer topic.Delete(ctx)
  284. defer topic.Stop()
  285. var sub *Subscription
  286. if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
  287. t.Fatal(err)
  288. }
  289. defer sub.Delete(ctx)
  290. sub.ReceiveSettings.MaxOutstandingMessages = -1
  291. sub.ReceiveSettings.MaxOutstandingBytes = -1
  292. sub.ReceiveSettings.NumGoroutines = 1
  293. doneReceiving := make(chan struct{})
  294. // Publish the messages.
  295. go func() {
  296. for {
  297. select {
  298. case <-doneReceiving:
  299. return
  300. default:
  301. topic.Publish(ctx, &Message{Data: []byte("some msg")})
  302. time.Sleep(time.Second)
  303. }
  304. }
  305. }()
  306. go func() {
  307. defer close(doneReceiving)
  308. err = sub.Receive(ctx, func(_ context.Context, msg *Message) {
  309. cancel()
  310. time.AfterFunc(5*time.Second, msg.Ack)
  311. })
  312. if err != nil {
  313. t.Error(err)
  314. }
  315. }()
  316. select {
  317. case <-time.After(60 * time.Second):
  318. t.Fatalf("Waited 60 seconds for Receive to finish, should have finished sooner")
  319. case <-doneReceiving:
  320. }
  321. }
  322. func TestIntegration_UpdateSubscription(t *testing.T) {
  323. t.Parallel()
  324. ctx := context.Background()
  325. client := integrationTestClient(ctx, t)
  326. defer client.Close()
  327. topic, err := client.CreateTopic(ctx, topicIDs.New())
  328. if err != nil {
  329. t.Fatalf("CreateTopic error: %v", err)
  330. }
  331. defer topic.Stop()
  332. defer topic.Delete(ctx)
  333. var sub *Subscription
  334. if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
  335. t.Fatalf("CreateSub error: %v", err)
  336. }
  337. defer sub.Delete(ctx)
  338. got, err := sub.Config(ctx)
  339. if err != nil {
  340. t.Fatal(err)
  341. }
  342. want := SubscriptionConfig{
  343. Topic: topic,
  344. AckDeadline: 10 * time.Second,
  345. RetainAckedMessages: false,
  346. RetentionDuration: defaultRetentionDuration,
  347. }
  348. if !testutil.Equal(got, want) {
  349. t.Fatalf("\ngot %+v\nwant %+v", got, want)
  350. }
  351. // Add a PushConfig and change other fields.
  352. projID := testutil.ProjID()
  353. pc := PushConfig{
  354. Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
  355. Attributes: map[string]string{"x-goog-version": "v1"},
  356. }
  357. got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
  358. PushConfig: &pc,
  359. AckDeadline: 2 * time.Minute,
  360. RetainAckedMessages: true,
  361. RetentionDuration: 2 * time.Hour,
  362. Labels: map[string]string{"label": "value"},
  363. })
  364. if err != nil {
  365. t.Fatal(err)
  366. }
  367. want = SubscriptionConfig{
  368. Topic: topic,
  369. PushConfig: pc,
  370. AckDeadline: 2 * time.Minute,
  371. RetainAckedMessages: true,
  372. RetentionDuration: 2 * time.Hour,
  373. Labels: map[string]string{"label": "value"},
  374. }
  375. if !testutil.Equal(got, want) {
  376. t.Fatalf("\ngot %+v\nwant %+v", got, want)
  377. }
  378. // Remove the PushConfig, turning the subscription back into pull mode.
  379. // Change AckDeadline, remove labels.
  380. pc = PushConfig{}
  381. got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
  382. PushConfig: &pc,
  383. AckDeadline: 30 * time.Second,
  384. Labels: map[string]string{},
  385. })
  386. if err != nil {
  387. t.Fatal(err)
  388. }
  389. want.PushConfig = pc
  390. want.AckDeadline = 30 * time.Second
  391. want.Labels = nil
  392. // service issue: PushConfig attributes are not removed.
  393. // TODO(jba): remove when issue resolved.
  394. want.PushConfig.Attributes = map[string]string{"x-goog-version": "v1"}
  395. if !testutil.Equal(got, want) {
  396. t.Fatalf("\ngot %+v\nwant %+v", got, want)
  397. }
  398. // If nothing changes, our client returns an error.
  399. _, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
  400. if err == nil {
  401. t.Fatal("got nil, wanted error")
  402. }
  403. }
  404. // NOTE: This test should be skipped by open source contributors. It requires
  405. // whitelisting, a (gsuite) organization project, and specific permissions.
  406. func TestIntegration_UpdateTopic(t *testing.T) {
  407. t.Parallel()
  408. ctx := context.Background()
  409. client := integrationTestClient(ctx, t)
  410. defer client.Close()
  411. compareConfig := func(got TopicConfig, wantLabels map[string]string) bool {
  412. if !testutil.Equal(got.Labels, wantLabels) {
  413. return false
  414. }
  415. // For MessageStoragePolicy, we don't want to check for an exact set of regions.
  416. // That set may change at any time. Instead, just make sure that the set isn't empty.
  417. if len(got.MessageStoragePolicy.AllowedPersistenceRegions) == 0 {
  418. return false
  419. }
  420. return true
  421. }
  422. topic, err := client.CreateTopic(ctx, topicIDs.New())
  423. if err != nil {
  424. t.Fatalf("CreateTopic error: %v", err)
  425. }
  426. defer topic.Stop()
  427. defer topic.Delete(ctx)
  428. got, err := topic.Config(ctx)
  429. if err != nil {
  430. t.Fatal(err)
  431. }
  432. if !compareConfig(got, nil) {
  433. t.Fatalf("\ngot %+v\nwant no labels", got)
  434. }
  435. labels := map[string]string{"label": "value"}
  436. got, err = topic.Update(ctx, TopicConfigToUpdate{Labels: labels})
  437. if err != nil {
  438. t.Fatal(err)
  439. }
  440. if !compareConfig(got, labels) {
  441. t.Fatalf("\ngot %+v\nwant labels %+v", got, labels)
  442. }
  443. // Remove all labels.
  444. got, err = topic.Update(ctx, TopicConfigToUpdate{Labels: map[string]string{}})
  445. if err != nil {
  446. t.Fatal(err)
  447. }
  448. if !compareConfig(got, nil) {
  449. t.Fatalf("\ngot %+v\nwant no labels", got)
  450. }
  451. }
  452. func TestIntegration_PublicTopic(t *testing.T) {
  453. t.Parallel()
  454. ctx := context.Background()
  455. client := integrationTestClient(ctx, t)
  456. defer client.Close()
  457. sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
  458. Topic: client.TopicInProject("taxirides-realtime", "pubsub-public-data"),
  459. })
  460. if err != nil {
  461. t.Fatal(err)
  462. }
  463. defer sub.Delete(ctx)
  464. // Confirm that Receive works. It doesn't matter if we actually get any
  465. // messages.
  466. ctxt, cancel := context.WithTimeout(ctx, 5*time.Second)
  467. err = sub.Receive(ctxt, func(_ context.Context, msg *Message) {
  468. msg.Ack()
  469. cancel()
  470. })
  471. if err != nil {
  472. t.Fatal(err)
  473. }
  474. }
  475. func TestIntegration_Errors(t *testing.T) {
  476. // Test various edge conditions.
  477. t.Parallel()
  478. ctx := context.Background()
  479. client := integrationTestClient(ctx, t)
  480. defer client.Close()
  481. topic, err := client.CreateTopic(ctx, topicIDs.New())
  482. if err != nil {
  483. t.Fatalf("CreateTopic error: %v", err)
  484. }
  485. defer topic.Stop()
  486. defer topic.Delete(ctx)
  487. // Out-of-range retention duration.
  488. sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
  489. Topic: topic,
  490. RetentionDuration: 1 * time.Second,
  491. })
  492. if want := codes.InvalidArgument; grpc.Code(err) != want {
  493. t.Errorf("got <%v>, want %s", err, want)
  494. }
  495. if err == nil {
  496. sub.Delete(ctx)
  497. }
  498. // Ack deadline less than minimum.
  499. sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
  500. Topic: topic,
  501. AckDeadline: 5 * time.Second,
  502. })
  503. if want := codes.Unknown; grpc.Code(err) != want {
  504. t.Errorf("got <%v>, want %s", err, want)
  505. }
  506. if err == nil {
  507. sub.Delete(ctx)
  508. }
  509. // Updating a non-existent subscription.
  510. sub = client.Subscription(subIDs.New())
  511. _, err = sub.Update(ctx, SubscriptionConfigToUpdate{AckDeadline: 20 * time.Second})
  512. if want := codes.NotFound; grpc.Code(err) != want {
  513. t.Errorf("got <%v>, want %s", err, want)
  514. }
  515. // Deleting a non-existent subscription.
  516. err = sub.Delete(ctx)
  517. if want := codes.NotFound; grpc.Code(err) != want {
  518. t.Errorf("got <%v>, want %s", err, want)
  519. }
  520. // Updating out-of-range retention duration.
  521. sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic})
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. defer sub.Delete(ctx)
  526. _, err = sub.Update(ctx, SubscriptionConfigToUpdate{RetentionDuration: 1000 * time.Hour})
  527. if want := codes.InvalidArgument; grpc.Code(err) != want {
  528. t.Errorf("got <%v>, want %s", err, want)
  529. }
  530. }
  531. // NOTE: This test should be skipped by open source contributors. It requires
  532. // whitelisting, a (gsuite) organization project, and specific permissions.
  533. //
  534. // Googlers, see internal bug 77920644. Furthermore, be sure to add your
  535. // service account as an owner of ps-geofencing-test.
  536. func TestIntegration_MessageStoragePolicy(t *testing.T) {
  537. // Verify that the message storage policy is populated.
  538. if testing.Short() {
  539. t.Skip("Integration tests skipped in short mode")
  540. }
  541. ctx := context.Background()
  542. // The message storage policy depends on the Resource Location Restriction org policy.
  543. // The usual testing project is in the google.com org, which has no resource location restrictions,
  544. // so we will always see an empty MessageStoragePolicy. Use a project in another org that does
  545. // have a restriction set ("us-east1").
  546. projID := "ps-geofencing-test"
  547. // We can use the same creds as always because the service account of the default testing project
  548. // has permission to use the above project. This test will fail if a different service account
  549. // is used for testing.
  550. ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
  551. if ts == nil {
  552. t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
  553. }
  554. client, err := NewClient(ctx, projID, option.WithTokenSource(ts))
  555. if err != nil {
  556. t.Fatalf("Creating client error: %v", err)
  557. }
  558. topic, err := client.CreateTopic(ctx, topicIDs.New())
  559. if err != nil {
  560. t.Fatalf("CreateTopic error: %v", err)
  561. }
  562. defer topic.Stop()
  563. defer topic.Delete(ctx)
  564. config, err := topic.Config(ctx)
  565. if err != nil {
  566. t.Fatal(err)
  567. }
  568. got := config.MessageStoragePolicy.AllowedPersistenceRegions
  569. want := []string{"us-east1"}
  570. if !testutil.Equal(got, want) {
  571. t.Errorf("got %v, want %v", got, want)
  572. }
  573. }