You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

225 lines
7.3 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package firestore
  15. import (
  16. "context"
  17. "sort"
  18. "testing"
  19. "time"
  20. "cloud.google.com/go/internal/btree"
  21. "github.com/golang/protobuf/proto"
  22. gax "github.com/googleapis/gax-go/v2"
  23. pb "google.golang.org/genproto/googleapis/firestore/v1"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/status"
  26. )
  27. func TestWatchRecv(t *testing.T) {
  28. ctx := context.Background()
  29. c, srv := newMock(t)
  30. db := defaultBackoff
  31. defaultBackoff = gax.Backoff{Initial: 1, Max: 1, Multiplier: 1}
  32. defer func() { defaultBackoff = db }()
  33. ws := newWatchStream(ctx, c, nil, &pb.Target{})
  34. request := &pb.ListenRequest{
  35. Database: "projects/projectID/databases/(default)",
  36. TargetChange: &pb.ListenRequest_AddTarget{&pb.Target{}},
  37. }
  38. response := &pb.ListenResponse{ResponseType: &pb.ListenResponse_DocumentChange{&pb.DocumentChange{}}}
  39. // Stream should retry on non-permanent errors, returning only the responses.
  40. srv.addRPC(request, []interface{}{response, status.Error(codes.Unknown, "")})
  41. srv.addRPC(request, []interface{}{response}) // stream will return io.EOF
  42. srv.addRPC(request, []interface{}{response, status.Error(codes.DeadlineExceeded, "")})
  43. srv.addRPC(request, []interface{}{status.Error(codes.ResourceExhausted, "")})
  44. srv.addRPC(request, []interface{}{status.Error(codes.Internal, "")})
  45. srv.addRPC(request, []interface{}{status.Error(codes.Unavailable, "")})
  46. srv.addRPC(request, []interface{}{status.Error(codes.Unauthenticated, "")})
  47. srv.addRPC(request, []interface{}{response})
  48. for i := 0; i < 4; i++ {
  49. res, err := ws.recv()
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. if !proto.Equal(res, response) {
  54. t.Fatalf("got %v, want %v", res, response)
  55. }
  56. }
  57. // Stream should not retry on a permanent error.
  58. srv.addRPC(request, []interface{}{status.Error(codes.AlreadyExists, "")})
  59. _, err := ws.recv()
  60. if got, want := status.Code(err), codes.AlreadyExists; got != want {
  61. t.Fatalf("got %s, want %s", got, want)
  62. }
  63. }
  64. func TestComputeSnapshot(t *testing.T) {
  65. c := &Client{
  66. projectID: "projID",
  67. databaseID: "(database)",
  68. }
  69. ws := newWatchStream(context.Background(), c, nil, &pb.Target{})
  70. tm := time.Now()
  71. i := 0
  72. doc := func(path, value string) *DocumentSnapshot {
  73. i++
  74. return &DocumentSnapshot{
  75. Ref: c.Doc(path),
  76. proto: &pb.Document{Fields: map[string]*pb.Value{"foo": strval(value)}},
  77. UpdateTime: tm.Add(time.Duration(i) * time.Second), // need unique time for updates
  78. }
  79. }
  80. val := func(d *DocumentSnapshot) string { return d.proto.Fields["foo"].GetStringValue() }
  81. less := func(a, b *DocumentSnapshot) bool { return val(a) < val(b) }
  82. type dmap map[string]*DocumentSnapshot
  83. ds1 := doc("C/d1", "a")
  84. ds2 := doc("C/d2", "b")
  85. ds2c := doc("C/d2", "c")
  86. docTree := btree.New(4, func(a, b interface{}) bool { return less(a.(*DocumentSnapshot), b.(*DocumentSnapshot)) })
  87. var gotChanges []DocumentChange
  88. docMap := dmap{}
  89. // The following test cases are not independent; each builds on the output of the previous.
  90. for _, test := range []struct {
  91. desc string
  92. changeMap dmap
  93. wantDocs []*DocumentSnapshot
  94. wantChanges []DocumentChange
  95. }{
  96. {
  97. "no changes",
  98. nil,
  99. nil,
  100. nil,
  101. },
  102. {
  103. "add a doc",
  104. dmap{ds1.Ref.Path: ds1},
  105. []*DocumentSnapshot{ds1},
  106. []DocumentChange{{Kind: DocumentAdded, Doc: ds1, OldIndex: -1, NewIndex: 0}},
  107. },
  108. {
  109. "add, remove",
  110. dmap{ds1.Ref.Path: nil, ds2.Ref.Path: ds2},
  111. []*DocumentSnapshot{ds2},
  112. []DocumentChange{
  113. {Kind: DocumentRemoved, Doc: ds1, OldIndex: 0, NewIndex: -1},
  114. {Kind: DocumentAdded, Doc: ds2, OldIndex: -1, NewIndex: 0},
  115. },
  116. },
  117. {
  118. "add back, modify",
  119. dmap{ds1.Ref.Path: ds1, ds2c.Ref.Path: ds2c},
  120. []*DocumentSnapshot{ds1, ds2c},
  121. []DocumentChange{
  122. {Kind: DocumentAdded, Doc: ds1, OldIndex: -1, NewIndex: 0},
  123. {Kind: DocumentModified, Doc: ds2c, OldIndex: 1, NewIndex: 1},
  124. },
  125. },
  126. } {
  127. docTree, gotChanges = ws.computeSnapshot(docTree, docMap, test.changeMap, time.Time{})
  128. gotDocs := treeDocs(docTree)
  129. if diff := testDiff(gotDocs, test.wantDocs); diff != "" {
  130. t.Fatalf("%s: %s", test.desc, diff)
  131. }
  132. mgot := mapDocs(docMap, less)
  133. if diff := testDiff(gotDocs, mgot); diff != "" {
  134. t.Fatalf("%s: docTree and docMap disagree: %s", test.desc, diff)
  135. }
  136. if diff := testDiff(gotChanges, test.wantChanges); diff != "" {
  137. t.Fatalf("%s: %s", test.desc, diff)
  138. }
  139. }
  140. // Verify that if there are no changes, the returned docTree is identical to the first arg.
  141. // docTree already has ds2c.
  142. got, _ := ws.computeSnapshot(docTree, docMap, dmap{ds2c.Ref.Path: ds2c}, time.Time{})
  143. if got != docTree {
  144. t.Error("returned docTree != arg docTree")
  145. }
  146. }
  147. func treeDocs(bt *btree.BTree) []*DocumentSnapshot {
  148. var ds []*DocumentSnapshot
  149. it := bt.BeforeIndex(0)
  150. for it.Next() {
  151. ds = append(ds, it.Key.(*DocumentSnapshot))
  152. }
  153. return ds
  154. }
  155. func mapDocs(m map[string]*DocumentSnapshot, less func(a, b *DocumentSnapshot) bool) []*DocumentSnapshot {
  156. var ds []*DocumentSnapshot
  157. for _, d := range m {
  158. ds = append(ds, d)
  159. }
  160. sort.Sort(byLess{ds, less})
  161. return ds
  162. }
  163. func TestWatchCancel(t *testing.T) {
  164. // Canceling the context of a watch should result in a codes.Canceled error from the next
  165. // call to the iterator's Next method.
  166. ctx := context.Background()
  167. c, srv := newMock(t)
  168. q := Query{c: c, collectionID: "x"}
  169. // Cancel before open.
  170. ctx2, cancel := context.WithCancel(ctx)
  171. ws, err := newWatchStreamForQuery(ctx2, q)
  172. if err != nil {
  173. t.Fatal(err)
  174. }
  175. cancel()
  176. _, _, _, err = ws.nextSnapshot()
  177. codeEq(t, "cancel before open", codes.Canceled, err)
  178. request := &pb.ListenRequest{
  179. Database: "projects/projectID/databases/(default)",
  180. TargetChange: &pb.ListenRequest_AddTarget{ws.target},
  181. }
  182. current := &pb.ListenResponse{ResponseType: &pb.ListenResponse_TargetChange{&pb.TargetChange{
  183. TargetChangeType: pb.TargetChange_CURRENT,
  184. }}}
  185. noChange := &pb.ListenResponse{ResponseType: &pb.ListenResponse_TargetChange{&pb.TargetChange{
  186. TargetChangeType: pb.TargetChange_NO_CHANGE,
  187. ReadTime: aTimestamp,
  188. }}}
  189. // Cancel from gax.Sleep. We should still see a gRPC error with codes.Canceled, not a
  190. // context.Canceled error.
  191. ctx2, cancel = context.WithCancel(ctx)
  192. ws, err = newWatchStreamForQuery(ctx2, q)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. srv.addRPC(request, []interface{}{current, noChange})
  197. _, _, _, _ = ws.nextSnapshot()
  198. cancel()
  199. // Because of how the mock works, the following results in an EOF on the stream, which
  200. // is a non-permanent error that causes a retry. That retry ends up in gax.Sleep, which
  201. // finds that the context is done and returns ctx.Err(), which is context.Canceled.
  202. // Verify that we transform that context.Canceled into a gRPC Status with code Canceled.
  203. _, _, _, err = ws.nextSnapshot()
  204. codeEq(t, "cancel from gax.Sleep", codes.Canceled, err)
  205. // TODO(jba): Test that we get codes.Canceled when canceling an RPC.
  206. // We had a test for this in a21236af, but it was flaky for unclear reasons.
  207. }