You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

520 lines
15 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package firestore
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "log"
  21. "sort"
  22. "time"
  23. "cloud.google.com/go/internal/btree"
  24. "github.com/golang/protobuf/ptypes"
  25. gax "github.com/googleapis/gax-go/v2"
  26. pb "google.golang.org/genproto/googleapis/firestore/v1"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/status"
  29. )
  30. // LogWatchStreams controls whether watch stream status changes are logged.
  31. // This feature is EXPERIMENTAL and may disappear at any time.
  32. var LogWatchStreams = false
  33. // DocumentChangeKind describes the kind of change to a document between
  34. // query snapshots.
  35. type DocumentChangeKind int
  36. const (
  37. // DocumentAdded indicates that the document was added for the first time.
  38. DocumentAdded DocumentChangeKind = iota
  39. // DocumentRemoved indicates that the document was removed.
  40. DocumentRemoved
  41. // DocumentModified indicates that the document was modified.
  42. DocumentModified
  43. )
  44. // A DocumentChange describes the change to a document from one query snapshot to the next.
  45. type DocumentChange struct {
  46. Kind DocumentChangeKind
  47. Doc *DocumentSnapshot
  48. // The zero-based index of the document in the sequence of query results prior to this change,
  49. // or -1 if the document was not present.
  50. OldIndex int
  51. // The zero-based index of the document in the sequence of query results after this change,
  52. // or -1 if the document is no longer present.
  53. NewIndex int
  54. }
  55. // Implementation of realtime updates (a.k.a. watch).
  56. // This code is closely based on the Node.js implementation,
  57. // https://github.com/googleapis/nodejs-firestore/blob/master/src/watch.js.
  58. // The sole target ID for all streams from this client.
  59. // Variable for testing.
  60. var watchTargetID int32 = 'g' + 'o'
  61. var defaultBackoff = gax.Backoff{
  62. // Values from https://github.com/googleapis/nodejs-firestore/blob/master/src/backoff.js.
  63. Initial: 1 * time.Second,
  64. Max: 60 * time.Second,
  65. Multiplier: 1.5,
  66. }
  67. // not goroutine-safe
  68. type watchStream struct {
  69. ctx context.Context
  70. c *Client
  71. lc pb.Firestore_ListenClient // the gRPC stream
  72. target *pb.Target // document or query being watched
  73. backoff gax.Backoff // for stream retries
  74. err error // sticky permanent error
  75. readTime time.Time // time of most recent snapshot
  76. current bool // saw CURRENT, but not RESET; precondition for a snapshot
  77. hasReturned bool // have we returned a snapshot yet?
  78. compare func(a, b *DocumentSnapshot) (int, error) // compare documents according to query
  79. // An ordered tree where DocumentSnapshots are the keys.
  80. docTree *btree.BTree
  81. // Map of document name to DocumentSnapshot for the last returned snapshot.
  82. docMap map[string]*DocumentSnapshot
  83. // Map of document name to DocumentSnapshot for accumulated changes for the current snapshot.
  84. // A nil value means the document was removed.
  85. changeMap map[string]*DocumentSnapshot
  86. }
  87. func newWatchStreamForDocument(ctx context.Context, dr *DocumentRef) *watchStream {
  88. // A single document is always equal to itself.
  89. compare := func(_, _ *DocumentSnapshot) (int, error) { return 0, nil }
  90. return newWatchStream(ctx, dr.Parent.c, compare, &pb.Target{
  91. TargetType: &pb.Target_Documents{
  92. Documents: &pb.Target_DocumentsTarget{Documents: []string{dr.Path}},
  93. },
  94. TargetId: watchTargetID,
  95. })
  96. }
  97. func newWatchStreamForQuery(ctx context.Context, q Query) (*watchStream, error) {
  98. qp, err := q.toProto()
  99. if err != nil {
  100. return nil, err
  101. }
  102. target := &pb.Target{
  103. TargetType: &pb.Target_Query{
  104. Query: &pb.Target_QueryTarget{
  105. Parent: q.parentPath,
  106. QueryType: &pb.Target_QueryTarget_StructuredQuery{qp},
  107. },
  108. },
  109. TargetId: watchTargetID,
  110. }
  111. return newWatchStream(ctx, q.c, q.compareFunc(), target), nil
  112. }
  113. const btreeDegree = 4
  114. func newWatchStream(ctx context.Context, c *Client, compare func(_, _ *DocumentSnapshot) (int, error), target *pb.Target) *watchStream {
  115. w := &watchStream{
  116. ctx: ctx,
  117. c: c,
  118. compare: compare,
  119. target: target,
  120. backoff: defaultBackoff,
  121. docMap: map[string]*DocumentSnapshot{},
  122. changeMap: map[string]*DocumentSnapshot{},
  123. }
  124. w.docTree = btree.New(btreeDegree, func(a, b interface{}) bool {
  125. return w.less(a.(*DocumentSnapshot), b.(*DocumentSnapshot))
  126. })
  127. return w
  128. }
  129. func (s *watchStream) less(a, b *DocumentSnapshot) bool {
  130. c, err := s.compare(a, b)
  131. if err != nil {
  132. s.err = err
  133. return false
  134. }
  135. return c < 0
  136. }
  137. // Once nextSnapshot returns an error, it will always return the same error.
  138. func (s *watchStream) nextSnapshot() (*btree.BTree, []DocumentChange, time.Time, error) {
  139. if s.err != nil {
  140. return nil, nil, time.Time{}, s.err
  141. }
  142. var changes []DocumentChange
  143. for {
  144. // Process messages until we are in a consistent state.
  145. for !s.handleNextMessage() {
  146. }
  147. if s.err != nil {
  148. _ = s.close() // ignore error
  149. return nil, nil, time.Time{}, s.err
  150. }
  151. var newDocTree *btree.BTree
  152. newDocTree, changes = s.computeSnapshot(s.docTree, s.docMap, s.changeMap, s.readTime)
  153. if s.err != nil {
  154. return nil, nil, time.Time{}, s.err
  155. }
  156. // Only return a snapshot if something has changed, or this is the first snapshot.
  157. if !s.hasReturned || newDocTree != s.docTree {
  158. s.docTree = newDocTree
  159. break
  160. }
  161. }
  162. s.changeMap = map[string]*DocumentSnapshot{}
  163. s.hasReturned = true
  164. return s.docTree, changes, s.readTime, nil
  165. }
  166. // Read a message from the stream and handle it. Return true when
  167. // we're in a consistent state, or there is a permanent error.
  168. func (s *watchStream) handleNextMessage() bool {
  169. res, err := s.recv()
  170. if err != nil {
  171. s.err = err
  172. // Errors returned by recv are permanent.
  173. return true
  174. }
  175. switch r := res.ResponseType.(type) {
  176. case *pb.ListenResponse_TargetChange:
  177. return s.handleTargetChange(r.TargetChange)
  178. case *pb.ListenResponse_DocumentChange:
  179. name := r.DocumentChange.Document.Name
  180. s.logf("DocumentChange %q", name)
  181. if hasWatchTargetID(r.DocumentChange.TargetIds) { // document changed
  182. ref, err := pathToDoc(name, s.c)
  183. if err == nil {
  184. s.changeMap[name], err = newDocumentSnapshot(ref, r.DocumentChange.Document, s.c, nil)
  185. }
  186. if err != nil {
  187. s.err = err
  188. return true
  189. }
  190. } else if hasWatchTargetID(r.DocumentChange.RemovedTargetIds) { // document removed
  191. s.changeMap[name] = nil
  192. }
  193. case *pb.ListenResponse_DocumentDelete:
  194. s.logf("Delete %q", r.DocumentDelete.Document)
  195. s.changeMap[r.DocumentDelete.Document] = nil
  196. case *pb.ListenResponse_DocumentRemove:
  197. s.logf("Remove %q", r.DocumentRemove.Document)
  198. s.changeMap[r.DocumentRemove.Document] = nil
  199. case *pb.ListenResponse_Filter:
  200. s.logf("Filter %d", r.Filter.Count)
  201. if int(r.Filter.Count) != s.currentSize() {
  202. s.resetDocs() // Remove all the current results.
  203. // The filter didn't match; close the stream so it will be re-opened on the next
  204. // call to nextSnapshot.
  205. _ = s.close() // ignore error
  206. s.lc = nil
  207. }
  208. default:
  209. s.err = fmt.Errorf("unknown response type %T", r)
  210. return true
  211. }
  212. return false
  213. }
  214. // Return true iff in a consistent state, or there is a permanent error.
  215. func (s *watchStream) handleTargetChange(tc *pb.TargetChange) bool {
  216. switch tc.TargetChangeType {
  217. case pb.TargetChange_NO_CHANGE:
  218. s.logf("TargetNoChange %d %v", len(tc.TargetIds), tc.ReadTime)
  219. if len(tc.TargetIds) == 0 && tc.ReadTime != nil && s.current {
  220. // Everything is up-to-date, so we are ready to return a snapshot.
  221. rt, err := ptypes.Timestamp(tc.ReadTime)
  222. if err != nil {
  223. s.err = err
  224. return true
  225. }
  226. s.readTime = rt
  227. s.target.ResumeType = &pb.Target_ResumeToken{tc.ResumeToken}
  228. return true
  229. }
  230. case pb.TargetChange_ADD:
  231. s.logf("TargetAdd")
  232. if tc.TargetIds[0] != watchTargetID {
  233. s.err = errors.New("unexpected target ID sent by server")
  234. return true
  235. }
  236. case pb.TargetChange_REMOVE:
  237. s.logf("TargetRemove")
  238. // We should never see a remove.
  239. if tc.Cause != nil {
  240. s.err = status.Error(codes.Code(tc.Cause.Code), tc.Cause.Message)
  241. } else {
  242. s.err = status.Error(codes.Internal, "firestore: client saw REMOVE")
  243. }
  244. return true
  245. // The targets reflect all changes committed before the targets were added
  246. // to the stream.
  247. case pb.TargetChange_CURRENT:
  248. s.logf("TargetCurrent")
  249. s.current = true
  250. // The targets have been reset, and a new initial state for the targets will be
  251. // returned in subsequent changes. Whatever changes have happened so far no
  252. // longer matter.
  253. case pb.TargetChange_RESET:
  254. s.logf("TargetReset")
  255. s.resetDocs()
  256. default:
  257. s.err = fmt.Errorf("firestore: unknown TargetChange type %s", tc.TargetChangeType)
  258. return true
  259. }
  260. // If we see a resume token and our watch ID is affected, we assume the stream
  261. // is now healthy, so we reset our backoff time to the minimum.
  262. if tc.ResumeToken != nil && (len(tc.TargetIds) == 0 || hasWatchTargetID(tc.TargetIds)) {
  263. s.backoff = defaultBackoff
  264. }
  265. return false // not in a consistent state, keep receiving
  266. }
  267. func (s *watchStream) resetDocs() {
  268. s.target.ResumeType = nil // clear resume token
  269. s.current = false
  270. s.changeMap = map[string]*DocumentSnapshot{}
  271. // Mark each document as deleted. If documents are not deleted, they
  272. // will be send again by the server.
  273. it := s.docTree.BeforeIndex(0)
  274. for it.Next() {
  275. s.changeMap[it.Key.(*DocumentSnapshot).Ref.Path] = nil
  276. }
  277. }
  278. func (s *watchStream) currentSize() int {
  279. _, adds, deletes := extractChanges(s.docMap, s.changeMap)
  280. return len(s.docMap) + len(adds) - len(deletes)
  281. }
  282. // Return the changes that have occurred since the last snapshot.
  283. func extractChanges(docMap, changeMap map[string]*DocumentSnapshot) (updates, adds []*DocumentSnapshot, deletes []string) {
  284. for name, doc := range changeMap {
  285. switch {
  286. case doc == nil:
  287. if _, ok := docMap[name]; ok {
  288. deletes = append(deletes, name)
  289. }
  290. case docMap[name] != nil:
  291. updates = append(updates, doc)
  292. default:
  293. adds = append(adds, doc)
  294. }
  295. }
  296. return updates, adds, deletes
  297. }
  298. // For development only.
  299. // TODO(jba): remove.
  300. func assert(b bool) {
  301. if !b {
  302. panic("assertion failed")
  303. }
  304. }
  305. // Applies the mutations in changeMap to both the document tree and the
  306. // document lookup map. Modifies docMap in place and returns a new docTree.
  307. // If there were no changes, returns docTree unmodified.
  308. func (s *watchStream) computeSnapshot(docTree *btree.BTree, docMap, changeMap map[string]*DocumentSnapshot, readTime time.Time) (*btree.BTree, []DocumentChange) {
  309. var changes []DocumentChange
  310. updatedTree := docTree
  311. assert(docTree.Len() == len(docMap))
  312. updates, adds, deletes := extractChanges(docMap, changeMap)
  313. if len(adds) > 0 || len(deletes) > 0 {
  314. updatedTree = docTree.Clone()
  315. }
  316. // Process the sorted changes in the order that is expected by our clients
  317. // (removals, additions, and then modifications). We also need to sort the
  318. // individual changes to assure that oldIndex/newIndex keep incrementing.
  319. deldocs := make([]*DocumentSnapshot, len(deletes))
  320. for i, d := range deletes {
  321. deldocs[i] = docMap[d]
  322. }
  323. sort.Sort(byLess{deldocs, s.less})
  324. for _, oldDoc := range deldocs {
  325. assert(oldDoc != nil)
  326. delete(docMap, oldDoc.Ref.Path)
  327. _, oldi := updatedTree.GetWithIndex(oldDoc)
  328. // TODO(jba): have btree.Delete return old index
  329. _, found := updatedTree.Delete(oldDoc)
  330. assert(found)
  331. changes = append(changes, DocumentChange{
  332. Kind: DocumentRemoved,
  333. Doc: oldDoc,
  334. OldIndex: oldi,
  335. NewIndex: -1,
  336. })
  337. }
  338. sort.Sort(byLess{adds, s.less})
  339. for _, newDoc := range adds {
  340. name := newDoc.Ref.Path
  341. assert(docMap[name] == nil)
  342. newDoc.ReadTime = readTime
  343. docMap[name] = newDoc
  344. updatedTree.Set(newDoc, nil)
  345. // TODO(jba): change btree so Set returns index as second value.
  346. _, newi := updatedTree.GetWithIndex(newDoc)
  347. changes = append(changes, DocumentChange{
  348. Kind: DocumentAdded,
  349. Doc: newDoc,
  350. OldIndex: -1,
  351. NewIndex: newi,
  352. })
  353. }
  354. sort.Sort(byLess{updates, s.less})
  355. for _, newDoc := range updates {
  356. name := newDoc.Ref.Path
  357. oldDoc := docMap[name]
  358. assert(oldDoc != nil)
  359. if newDoc.UpdateTime.Equal(oldDoc.UpdateTime) {
  360. continue
  361. }
  362. if updatedTree == docTree {
  363. updatedTree = docTree.Clone()
  364. }
  365. newDoc.ReadTime = readTime
  366. docMap[name] = newDoc
  367. _, oldi := updatedTree.GetWithIndex(oldDoc)
  368. updatedTree.Delete(oldDoc)
  369. updatedTree.Set(newDoc, nil)
  370. _, newi := updatedTree.GetWithIndex(newDoc)
  371. changes = append(changes, DocumentChange{
  372. Kind: DocumentModified,
  373. Doc: newDoc,
  374. OldIndex: oldi,
  375. NewIndex: newi,
  376. })
  377. }
  378. assert(updatedTree.Len() == len(docMap))
  379. return updatedTree, changes
  380. }
  381. type byLess struct {
  382. s []*DocumentSnapshot
  383. less func(a, b *DocumentSnapshot) bool
  384. }
  385. func (b byLess) Len() int { return len(b.s) }
  386. func (b byLess) Swap(i, j int) { b.s[i], b.s[j] = b.s[j], b.s[i] }
  387. func (b byLess) Less(i, j int) bool { return b.less(b.s[i], b.s[j]) }
  388. func hasWatchTargetID(ids []int32) bool {
  389. for _, id := range ids {
  390. if id == watchTargetID {
  391. return true
  392. }
  393. }
  394. return false
  395. }
  396. func (s *watchStream) logf(format string, args ...interface{}) {
  397. if LogWatchStreams {
  398. log.Printf(format, args...)
  399. }
  400. }
  401. // Close the stream. From this point on, calls to nextSnapshot will return
  402. // io.EOF, or the error from CloseSend.
  403. func (s *watchStream) stop() {
  404. err := s.close()
  405. if s.err != nil { // don't change existing error
  406. return
  407. }
  408. if err != nil {
  409. s.err = err
  410. }
  411. s.err = io.EOF // normal shutdown
  412. }
  413. func (s *watchStream) close() error {
  414. if s.lc == nil {
  415. return nil
  416. }
  417. return s.lc.CloseSend()
  418. }
  419. // recv receives the next message from the stream. It also handles opening the stream
  420. // initially, and reopening it on non-permanent errors.
  421. // recv doesn't have to be goroutine-safe.
  422. func (s *watchStream) recv() (*pb.ListenResponse, error) {
  423. var err error
  424. for {
  425. if s.lc == nil {
  426. s.lc, err = s.open()
  427. if err != nil {
  428. // Do not retry if open fails.
  429. return nil, err
  430. }
  431. }
  432. res, err := s.lc.Recv()
  433. if err == nil || isPermanentWatchError(err) {
  434. return res, err
  435. }
  436. // Non-permanent error. Sleep and retry.
  437. s.changeMap = map[string]*DocumentSnapshot{} // clear changeMap
  438. dur := s.backoff.Pause()
  439. // If we're out of quota, wait a long time before retrying.
  440. if status.Code(err) == codes.ResourceExhausted {
  441. dur = s.backoff.Max
  442. }
  443. if err := sleep(s.ctx, dur); err != nil {
  444. return nil, err
  445. }
  446. s.lc = nil
  447. }
  448. }
  449. func (s *watchStream) open() (pb.Firestore_ListenClient, error) {
  450. dbPath := s.c.path()
  451. lc, err := s.c.c.Listen(withResourceHeader(s.ctx, dbPath))
  452. if err == nil {
  453. err = lc.Send(&pb.ListenRequest{
  454. Database: dbPath,
  455. TargetChange: &pb.ListenRequest_AddTarget{AddTarget: s.target},
  456. })
  457. }
  458. if err != nil {
  459. return nil, err
  460. }
  461. return lc, nil
  462. }
  463. func isPermanentWatchError(err error) bool {
  464. if err == io.EOF {
  465. // Retry on normal end-of-stream.
  466. return false
  467. }
  468. switch status.Code(err) {
  469. case codes.Unknown, codes.DeadlineExceeded, codes.ResourceExhausted,
  470. codes.Internal, codes.Unavailable, codes.Unauthenticated:
  471. return false
  472. default:
  473. return true
  474. }
  475. }