You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

298 lines
8.6 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "reflect"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "cloud.google.com/go/internal/testutil"
  25. "cloud.google.com/go/pubsub/pstest"
  26. "google.golang.org/api/option"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/status"
  30. )
  31. var (
  32. projName = "some-project"
  33. topicName = "some-topic"
  34. fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName)
  35. )
  36. func TestSplitRequestIDs(t *testing.T) {
  37. t.Parallel()
  38. ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
  39. for _, test := range []struct {
  40. ids []string
  41. splitIndex int
  42. }{
  43. {[]string{}, 0},
  44. {ids, 2},
  45. {ids[:2], 2},
  46. } {
  47. got1, got2 := splitRequestIDs(test.ids, reqFixedOverhead+20)
  48. want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
  49. if !testutil.Equal(got1, want1) {
  50. t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
  51. }
  52. if !testutil.Equal(got2, want2) {
  53. t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
  54. }
  55. }
  56. }
  57. func TestAckDistribution(t *testing.T) {
  58. if testing.Short() {
  59. t.SkipNow()
  60. }
  61. t.Skip("broken")
  62. ctx, cancel := context.WithCancel(context.Background())
  63. defer cancel()
  64. minAckDeadline = 1 * time.Second
  65. pstest.SetMinAckDeadline(minAckDeadline)
  66. srv := pstest.NewServer()
  67. defer srv.Close()
  68. defer pstest.ResetMinAckDeadline()
  69. // Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client
  70. // has not been established yet, and also because we want to create the topic once whereas the client is established
  71. // below twice.
  72. srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
  73. queuedMsgs := make(chan int32, 1024)
  74. go continuouslySend(ctx, srv, queuedMsgs)
  75. for _, testcase := range []struct {
  76. initialProcessSecs int32
  77. finalProcessSecs int32
  78. }{
  79. {initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up
  80. {initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down
  81. } {
  82. t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs)
  83. // processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should
  84. // pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver
  85. // to process messages received for 3s while sender sends the first batch. Then, as sender begins to
  86. // send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will
  87. // process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes.
  88. processTimeSecs := testcase.initialProcessSecs
  89. s, client, err := initConn(ctx, srv.Addr)
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. // recvdWg increments for each message sent, and decrements for each message received.
  94. recvdWg := &sync.WaitGroup{}
  95. go startReceiving(ctx, t, s, recvdWg, &processTimeSecs)
  96. startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg)
  97. recvdWg.Wait()
  98. time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up
  99. err = client.Close()
  100. if err != nil {
  101. t.Fatal(err)
  102. }
  103. modacks := modacksByTime(srv.Messages())
  104. u := modackDeadlines(modacks)
  105. initialDL := int32(minAckDeadline / time.Second)
  106. if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
  107. t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
  108. initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
  109. }
  110. }
  111. }
  112. // modacksByTime buckets modacks by time.
  113. func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack {
  114. modacks := map[time.Time][]pstest.Modack{}
  115. for _, msg := range msgs {
  116. for _, m := range msg.Modacks {
  117. modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m)
  118. }
  119. }
  120. return modacks
  121. }
  122. // setsAreEqual reports whether a and b contain the same values, ignoring duplicates.
  123. func setsAreEqual(haystack, needles []int32) bool {
  124. hMap := map[int32]bool{}
  125. nMap := map[int32]bool{}
  126. for _, n := range needles {
  127. nMap[n] = true
  128. }
  129. for _, n := range haystack {
  130. hMap[n] = true
  131. }
  132. return reflect.DeepEqual(nMap, hMap)
  133. }
  134. // startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also
  135. // looks out for dupes - any message that arrives twice will cause a failure.
  136. func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) {
  137. t.Log("Receiving..")
  138. var recvdMu sync.Mutex
  139. recvd := map[string]bool{}
  140. err := s.Receive(ctx, func(ctx context.Context, msg *Message) {
  141. msgData := string(msg.Data)
  142. recvdMu.Lock()
  143. _, ok := recvd[msgData]
  144. if ok {
  145. recvdMu.Unlock()
  146. t.Fatalf("already saw \"%s\"\n", msgData)
  147. return
  148. }
  149. recvd[msgData] = true
  150. recvdMu.Unlock()
  151. select {
  152. case <-ctx.Done():
  153. msg.Nack()
  154. recvdWg.Done()
  155. case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second):
  156. msg.Ack()
  157. recvdWg.Done()
  158. }
  159. })
  160. if err != nil {
  161. if status.Code(err) != codes.Canceled {
  162. t.Error(err)
  163. }
  164. }
  165. }
  166. // startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs.
  167. func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) {
  168. var msg int32
  169. // We must send this block to force the receiver to send its initially-configured modack time. The time that
  170. // gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages
  171. // to create a distribution yet.
  172. t.Log("minAckDeadlineSecsSending an initial message")
  173. recvdWg.Add(1)
  174. msg++
  175. queuedMsgs <- msg
  176. <-time.After(minAckDeadline)
  177. t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
  178. "when the next batch of messages go out.", initialProcessSecs)
  179. for i := 0; i < 10; i++ {
  180. recvdWg.Add(1)
  181. msg++
  182. queuedMsgs <- msg
  183. }
  184. atomic.SwapInt32(processTimeSecs, finalProcessSecs)
  185. <-time.After(time.Duration(initialProcessSecs) * time.Second)
  186. t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+
  187. "when the next batch of messages go out.", finalProcessSecs)
  188. for i := 0; i < 100; i++ {
  189. recvdWg.Add(1)
  190. msg++
  191. queuedMsgs <- msg // Send many messages to drastically change distribution
  192. }
  193. <-time.After(time.Duration(finalProcessSecs) * time.Second)
  194. t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs)
  195. recvdWg.Add(1)
  196. msg++
  197. queuedMsgs <- msg
  198. }
  199. // continuouslySend continuously sends messages that exist on the queuedMsgs chan.
  200. func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) {
  201. for {
  202. select {
  203. case <-ctx.Done():
  204. return
  205. case m := <-queuedMsgs:
  206. srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil)
  207. }
  208. }
  209. }
  210. func toSet(arr []int32) []int32 {
  211. var s []int32
  212. m := map[int32]bool{}
  213. for _, v := range arr {
  214. _, ok := m[v]
  215. if !ok {
  216. s = append(s, v)
  217. m[v] = true
  218. }
  219. }
  220. return s
  221. }
  222. func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) {
  223. conn, err := grpc.Dial(addr, grpc.WithInsecure())
  224. if err != nil {
  225. return nil, nil, err
  226. }
  227. client, err := NewClient(ctx, projName, option.WithGRPCConn(conn))
  228. if err != nil {
  229. return nil, nil, err
  230. }
  231. topic := client.Topic(topicName)
  232. s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic})
  233. if err != nil {
  234. return nil, nil, err
  235. }
  236. exists, err := s.Exists(ctx)
  237. if !exists {
  238. return nil, nil, errors.New("Subscription does not exist")
  239. }
  240. if err != nil {
  241. return nil, nil, err
  242. }
  243. return s, client, nil
  244. }
  245. // modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines,
  246. // and returns them as a slice
  247. func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 {
  248. var u []int32
  249. for _, vv := range m {
  250. for _, v := range vv {
  251. u = append(u, v.AckDeadline)
  252. }
  253. }
  254. return u
  255. }