You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

246 lines
5.8 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. "cloud.google.com/go/internal/testutil"
  21. "cloud.google.com/go/pubsub/pstest"
  22. "google.golang.org/api/iterator"
  23. "google.golang.org/api/option"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/status"
  27. )
  28. // All returns the remaining subscriptions from this iterator.
  29. func slurpSubs(it *SubscriptionIterator) ([]*Subscription, error) {
  30. var subs []*Subscription
  31. for {
  32. switch sub, err := it.Next(); err {
  33. case nil:
  34. subs = append(subs, sub)
  35. case iterator.Done:
  36. return subs, nil
  37. default:
  38. return nil, err
  39. }
  40. }
  41. }
  42. func TestSubscriptionID(t *testing.T) {
  43. const id = "id"
  44. c := &Client{projectID: "projid"}
  45. s := c.Subscription(id)
  46. if got, want := s.ID(), id; got != want {
  47. t.Errorf("Subscription.ID() = %q; want %q", got, want)
  48. }
  49. }
  50. func TestListProjectSubscriptions(t *testing.T) {
  51. ctx := context.Background()
  52. c, srv := newFake(t)
  53. defer c.Close()
  54. defer srv.Close()
  55. topic := mustCreateTopic(t, c, "t")
  56. var want []string
  57. for i := 1; i <= 2; i++ {
  58. id := fmt.Sprintf("s%d", i)
  59. want = append(want, id)
  60. _, err := c.CreateSubscription(ctx, id, SubscriptionConfig{Topic: topic})
  61. if err != nil {
  62. t.Fatal(err)
  63. }
  64. }
  65. subs, err := slurpSubs(c.Subscriptions(ctx))
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. got := getSubIDs(subs)
  70. if !testutil.Equal(got, want) {
  71. t.Errorf("got %v, want %v", got, want)
  72. }
  73. }
  74. func getSubIDs(subs []*Subscription) []string {
  75. var names []string
  76. for _, sub := range subs {
  77. names = append(names, sub.ID())
  78. }
  79. return names
  80. }
  81. func TestListTopicSubscriptions(t *testing.T) {
  82. ctx := context.Background()
  83. c, srv := newFake(t)
  84. defer c.Close()
  85. defer srv.Close()
  86. topics := []*Topic{
  87. mustCreateTopic(t, c, "t0"),
  88. mustCreateTopic(t, c, "t1"),
  89. }
  90. wants := make([][]string, 2)
  91. for i := 0; i < 5; i++ {
  92. id := fmt.Sprintf("s%d", i)
  93. sub, err := c.CreateSubscription(ctx, id, SubscriptionConfig{Topic: topics[i%2]})
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. wants[i%2] = append(wants[i%2], sub.ID())
  98. }
  99. for i, topic := range topics {
  100. subs, err := slurpSubs(topic.Subscriptions(ctx))
  101. if err != nil {
  102. t.Fatal(err)
  103. }
  104. got := getSubIDs(subs)
  105. if !testutil.Equal(got, wants[i]) {
  106. t.Errorf("#%d: got %v, want %v", i, got, wants[i])
  107. }
  108. }
  109. }
  110. const defaultRetentionDuration = 168 * time.Hour
  111. func TestUpdateSubscription(t *testing.T) {
  112. ctx := context.Background()
  113. client, srv := newFake(t)
  114. defer client.Close()
  115. defer srv.Close()
  116. topic := mustCreateTopic(t, client, "t")
  117. sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{Topic: topic})
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. cfg, err := sub.Config(ctx)
  122. if err != nil {
  123. t.Fatal(err)
  124. }
  125. want := SubscriptionConfig{
  126. Topic: topic,
  127. AckDeadline: 10 * time.Second,
  128. RetainAckedMessages: false,
  129. RetentionDuration: defaultRetentionDuration,
  130. }
  131. if !testutil.Equal(cfg, want) {
  132. t.Fatalf("\ngot %+v\nwant %+v", cfg, want)
  133. }
  134. got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
  135. AckDeadline: 20 * time.Second,
  136. RetainAckedMessages: true,
  137. Labels: map[string]string{"label": "value"},
  138. })
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. want = SubscriptionConfig{
  143. Topic: topic,
  144. AckDeadline: 20 * time.Second,
  145. RetainAckedMessages: true,
  146. RetentionDuration: defaultRetentionDuration,
  147. Labels: map[string]string{"label": "value"},
  148. }
  149. if !testutil.Equal(got, want) {
  150. t.Fatalf("\ngot %+v\nwant %+v", got, want)
  151. }
  152. got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
  153. RetentionDuration: 2 * time.Hour,
  154. Labels: map[string]string{},
  155. })
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. want.RetentionDuration = 2 * time.Hour
  160. want.Labels = nil
  161. if !testutil.Equal(got, want) {
  162. t.Fatalf("\ngot %+v\nwant %+v", got, want)
  163. }
  164. _, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
  165. if err == nil {
  166. t.Fatal("got nil, want error")
  167. }
  168. }
  169. func TestReceive(t *testing.T) {
  170. testReceive(t, true)
  171. testReceive(t, false)
  172. }
  173. func testReceive(t *testing.T, synchronous bool) {
  174. ctx := context.Background()
  175. client, srv := newFake(t)
  176. defer client.Close()
  177. defer srv.Close()
  178. topic := mustCreateTopic(t, client, "t")
  179. sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{Topic: topic})
  180. if err != nil {
  181. t.Fatal(err)
  182. }
  183. for i := 0; i < 256; i++ {
  184. srv.Publish(topic.name, []byte{byte(i)}, nil)
  185. }
  186. sub.ReceiveSettings.Synchronous = synchronous
  187. msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) { m.Ack() })
  188. if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
  189. t.Fatalf("Pull: %v", err)
  190. }
  191. var seen [256]bool
  192. for _, m := range msgs {
  193. seen[m.Data[0]] = true
  194. }
  195. for i, saw := range seen {
  196. if !saw {
  197. t.Errorf("sync=%t: did not see message #%d", synchronous, i)
  198. }
  199. }
  200. }
  201. func (t1 *Topic) Equal(t2 *Topic) bool {
  202. if t1 == nil && t2 == nil {
  203. return true
  204. }
  205. if t1 == nil || t2 == nil {
  206. return false
  207. }
  208. return t1.c == t2.c && t1.name == t2.name
  209. }
  210. // Note: be sure to close client and server!
  211. func newFake(t *testing.T) (*Client, *pstest.Server) {
  212. ctx := context.Background()
  213. srv := pstest.NewServer()
  214. client, err := NewClient(ctx, "P",
  215. option.WithEndpoint(srv.Addr),
  216. option.WithoutAuthentication(),
  217. option.WithGRPCDialOption(grpc.WithInsecure()))
  218. if err != nil {
  219. t.Fatal(err)
  220. }
  221. return client, srv
  222. }