You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

139 lines
3.6 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "testing"
  18. "time"
  19. "cloud.google.com/go/internal/testutil"
  20. "cloud.google.com/go/pubsub/pstest"
  21. gax "github.com/googleapis/gax-go/v2"
  22. "google.golang.org/api/option"
  23. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/status"
  27. )
  28. func TestPullStreamGet(t *testing.T) {
  29. // Test that we retry on the initial Send call from pullstream.get. We don't do this
  30. // test with the server in fake_test.go because there's no clear way to get Send
  31. // to fail from the server.
  32. t.Parallel()
  33. for _, test := range []struct {
  34. desc string
  35. errors []error
  36. wantCode codes.Code
  37. }{
  38. {
  39. desc: "nil error",
  40. errors: []error{nil},
  41. wantCode: codes.OK,
  42. },
  43. {
  44. desc: "non-retryable error",
  45. errors: []error{status.Errorf(codes.InvalidArgument, "")},
  46. wantCode: codes.InvalidArgument,
  47. },
  48. {
  49. desc: "retryable errors",
  50. errors: []error{
  51. status.Errorf(codes.Unavailable, "first"),
  52. status.Errorf(codes.Unavailable, "second"),
  53. nil,
  54. },
  55. wantCode: codes.OK,
  56. },
  57. } {
  58. streamingPull := func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error) {
  59. if len(test.errors) == 0 {
  60. panic("out of errors")
  61. }
  62. err := test.errors[0]
  63. test.errors = test.errors[1:]
  64. return &testStreamingPullClient{sendError: err}, nil
  65. }
  66. ps := newPullStream(context.Background(), streamingPull, "")
  67. _, err := ps.get(nil)
  68. if got := status.Code(err); got != test.wantCode {
  69. t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode)
  70. }
  71. }
  72. }
  73. func TestPullStreamGet_ResourceUnavailable(t *testing.T) {
  74. ctx := context.Background()
  75. srv, err := testutil.NewServer()
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. defer srv.Close()
  80. ps := pstest.NewServer()
  81. defer ps.Close()
  82. s := ExhaustedServer{ps.GServer}
  83. pb.RegisterPublisherServer(srv.Gsrv, &s)
  84. pb.RegisterSubscriberServer(srv.Gsrv, &s)
  85. srv.Start()
  86. client, err := NewClient(ctx, "P",
  87. option.WithEndpoint(srv.Addr),
  88. option.WithoutAuthentication(),
  89. option.WithGRPCDialOption(grpc.WithInsecure()))
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. defer client.Close()
  94. errc := make(chan error)
  95. go func() {
  96. errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) {
  97. t.Error("should not have received any data")
  98. })
  99. }()
  100. select {
  101. case <-time.After(5 * time.Second):
  102. t.Fatal("Receive should have failed immediately")
  103. case err := <-errc:
  104. if gerr, ok := status.FromError(err); ok {
  105. if gerr.Code() != codes.ResourceExhausted {
  106. t.Fatal("expected to receive a grpc ResourceExhausted error")
  107. }
  108. } else {
  109. t.Fatal("expected to receive a grpc ResourceExhausted error")
  110. }
  111. }
  112. }
  113. type ExhaustedServer struct {
  114. pstest.GServer
  115. }
  116. func (*ExhaustedServer) StreamingPull(_ pb.Subscriber_StreamingPullServer) error {
  117. return status.Errorf(codes.ResourceExhausted, "This server is exhausted!")
  118. }
  119. type testStreamingPullClient struct {
  120. pb.Subscriber_StreamingPullClient
  121. sendError error
  122. }
  123. func (c *testStreamingPullClient) Send(*pb.StreamingPullRequest) error { return c.sendError }