You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

782 lines
23 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package firestore
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "math"
  21. "reflect"
  22. "time"
  23. "cloud.google.com/go/internal/btree"
  24. "github.com/golang/protobuf/ptypes/wrappers"
  25. "google.golang.org/api/iterator"
  26. pb "google.golang.org/genproto/googleapis/firestore/v1"
  27. )
  28. // Query represents a Firestore query.
  29. //
  30. // Query values are immutable. Each Query method creates
  31. // a new Query; it does not modify the old.
  32. type Query struct {
  33. c *Client
  34. path string // path to query (collection)
  35. parentPath string // path of the collection's parent (document)
  36. collectionID string
  37. selection []FieldPath
  38. filters []filter
  39. orders []order
  40. offset int32
  41. limit *wrappers.Int32Value
  42. startVals, endVals []interface{}
  43. startDoc, endDoc *DocumentSnapshot
  44. startBefore, endBefore bool
  45. err error
  46. }
  47. // DocumentID is the special field name representing the ID of a document
  48. // in queries.
  49. const DocumentID = "__name__"
  50. // Select returns a new Query that specifies the paths
  51. // to return from the result documents.
  52. // Each path argument can be a single field or a dot-separated sequence of
  53. // fields, and must not contain any of the runes "˜*/[]".
  54. //
  55. // An empty Select call will produce a query that returns only document IDs.
  56. func (q Query) Select(paths ...string) Query {
  57. var fps []FieldPath
  58. for _, s := range paths {
  59. fp, err := parseDotSeparatedString(s)
  60. if err != nil {
  61. q.err = err
  62. return q
  63. }
  64. fps = append(fps, fp)
  65. }
  66. return q.SelectPaths(fps...)
  67. }
  68. // SelectPaths returns a new Query that specifies the field paths
  69. // to return from the result documents.
  70. //
  71. // An empty SelectPaths call will produce a query that returns only document IDs.
  72. func (q Query) SelectPaths(fieldPaths ...FieldPath) Query {
  73. if len(fieldPaths) == 0 {
  74. q.selection = []FieldPath{{DocumentID}}
  75. } else {
  76. q.selection = fieldPaths
  77. }
  78. return q
  79. }
  80. // Where returns a new Query that filters the set of results.
  81. // A Query can have multiple filters.
  82. // The path argument can be a single field or a dot-separated sequence of
  83. // fields, and must not contain any of the runes "˜*/[]".
  84. // The op argument must be one of "==", "<", "<=", ">" or ">=".
  85. func (q Query) Where(path, op string, value interface{}) Query {
  86. fp, err := parseDotSeparatedString(path)
  87. if err != nil {
  88. q.err = err
  89. return q
  90. }
  91. q.filters = append(append([]filter(nil), q.filters...), filter{fp, op, value})
  92. return q
  93. }
  94. // WherePath returns a new Query that filters the set of results.
  95. // A Query can have multiple filters.
  96. // The op argument must be one of "==", "<", "<=", ">" or ">=".
  97. func (q Query) WherePath(fp FieldPath, op string, value interface{}) Query {
  98. q.filters = append(append([]filter(nil), q.filters...), filter{fp, op, value})
  99. return q
  100. }
  101. // Direction is the sort direction for result ordering.
  102. type Direction int32
  103. const (
  104. // Asc sorts results from smallest to largest.
  105. Asc Direction = Direction(pb.StructuredQuery_ASCENDING)
  106. // Desc sorts results from largest to smallest.
  107. Desc Direction = Direction(pb.StructuredQuery_DESCENDING)
  108. )
  109. // OrderBy returns a new Query that specifies the order in which results are
  110. // returned. A Query can have multiple OrderBy/OrderByPath specifications. OrderBy
  111. // appends the specification to the list of existing ones.
  112. //
  113. // The path argument can be a single field or a dot-separated sequence of
  114. // fields, and must not contain any of the runes "˜*/[]".
  115. //
  116. // To order by document name, use the special field path DocumentID.
  117. func (q Query) OrderBy(path string, dir Direction) Query {
  118. fp, err := parseDotSeparatedString(path)
  119. if err != nil {
  120. q.err = err
  121. return q
  122. }
  123. q.orders = append(q.copyOrders(), order{fp, dir})
  124. return q
  125. }
  126. // OrderByPath returns a new Query that specifies the order in which results are
  127. // returned. A Query can have multiple OrderBy/OrderByPath specifications.
  128. // OrderByPath appends the specification to the list of existing ones.
  129. func (q Query) OrderByPath(fp FieldPath, dir Direction) Query {
  130. q.orders = append(q.copyOrders(), order{fp, dir})
  131. return q
  132. }
  133. func (q *Query) copyOrders() []order {
  134. return append([]order(nil), q.orders...)
  135. }
  136. // Offset returns a new Query that specifies the number of initial results to skip.
  137. // It must not be negative.
  138. func (q Query) Offset(n int) Query {
  139. q.offset = trunc32(n)
  140. return q
  141. }
  142. // Limit returns a new Query that specifies the maximum number of results to return.
  143. // It must not be negative.
  144. func (q Query) Limit(n int) Query {
  145. q.limit = &wrappers.Int32Value{Value: trunc32(n)}
  146. return q
  147. }
  148. // StartAt returns a new Query that specifies that results should start at
  149. // the document with the given field values.
  150. //
  151. // StartAt may be called with a single DocumentSnapshot, representing an
  152. // existing document within the query. The document must be a direct child of
  153. // the location being queried (not a parent document, or document in a
  154. // different collection, or a grandchild document, for example).
  155. //
  156. // Otherwise, StartAt should be called with one field value for each OrderBy clause,
  157. // in the order that they appear. For example, in
  158. // q.OrderBy("X", Asc).OrderBy("Y", Desc).StartAt(1, 2)
  159. // results will begin at the first document where X = 1 and Y = 2.
  160. //
  161. // If an OrderBy call uses the special DocumentID field path, the corresponding value
  162. // should be the document ID relative to the query's collection. For example, to
  163. // start at the document "NewYork" in the "States" collection, write
  164. //
  165. // client.Collection("States").OrderBy(DocumentID, firestore.Asc).StartAt("NewYork")
  166. //
  167. // Calling StartAt overrides a previous call to StartAt or StartAfter.
  168. func (q Query) StartAt(docSnapshotOrFieldValues ...interface{}) Query {
  169. q.startBefore = true
  170. q.startVals, q.startDoc, q.err = q.processCursorArg("StartAt", docSnapshotOrFieldValues)
  171. return q
  172. }
  173. // StartAfter returns a new Query that specifies that results should start just after
  174. // the document with the given field values. See Query.StartAt for more information.
  175. //
  176. // Calling StartAfter overrides a previous call to StartAt or StartAfter.
  177. func (q Query) StartAfter(docSnapshotOrFieldValues ...interface{}) Query {
  178. q.startBefore = false
  179. q.startVals, q.startDoc, q.err = q.processCursorArg("StartAfter", docSnapshotOrFieldValues)
  180. return q
  181. }
  182. // EndAt returns a new Query that specifies that results should end at the
  183. // document with the given field values. See Query.StartAt for more information.
  184. //
  185. // Calling EndAt overrides a previous call to EndAt or EndBefore.
  186. func (q Query) EndAt(docSnapshotOrFieldValues ...interface{}) Query {
  187. q.endBefore = false
  188. q.endVals, q.endDoc, q.err = q.processCursorArg("EndAt", docSnapshotOrFieldValues)
  189. return q
  190. }
  191. // EndBefore returns a new Query that specifies that results should end just before
  192. // the document with the given field values. See Query.StartAt for more information.
  193. //
  194. // Calling EndBefore overrides a previous call to EndAt or EndBefore.
  195. func (q Query) EndBefore(docSnapshotOrFieldValues ...interface{}) Query {
  196. q.endBefore = true
  197. q.endVals, q.endDoc, q.err = q.processCursorArg("EndBefore", docSnapshotOrFieldValues)
  198. return q
  199. }
  200. func (q *Query) processCursorArg(name string, docSnapshotOrFieldValues []interface{}) ([]interface{}, *DocumentSnapshot, error) {
  201. for _, e := range docSnapshotOrFieldValues {
  202. if ds, ok := e.(*DocumentSnapshot); ok {
  203. if len(docSnapshotOrFieldValues) == 1 {
  204. return nil, ds, nil
  205. }
  206. return nil, nil, fmt.Errorf("firestore: a document snapshot must be the only argument to %s", name)
  207. }
  208. }
  209. return docSnapshotOrFieldValues, nil, nil
  210. }
  211. func (q Query) query() *Query { return &q }
  212. func (q Query) toProto() (*pb.StructuredQuery, error) {
  213. if q.err != nil {
  214. return nil, q.err
  215. }
  216. if q.collectionID == "" {
  217. return nil, errors.New("firestore: query created without CollectionRef")
  218. }
  219. if q.startBefore {
  220. if len(q.startVals) == 0 && q.startDoc == nil {
  221. return nil, errors.New("firestore: StartAt/StartAfter must be called with at least one value")
  222. }
  223. }
  224. if q.endBefore {
  225. if len(q.endVals) == 0 && q.endDoc == nil {
  226. return nil, errors.New("firestore: EndAt/EndBefore must be called with at least one value")
  227. }
  228. }
  229. p := &pb.StructuredQuery{
  230. From: []*pb.StructuredQuery_CollectionSelector{{CollectionId: q.collectionID}},
  231. Offset: q.offset,
  232. Limit: q.limit,
  233. }
  234. if len(q.selection) > 0 {
  235. p.Select = &pb.StructuredQuery_Projection{}
  236. for _, fp := range q.selection {
  237. if err := fp.validate(); err != nil {
  238. return nil, err
  239. }
  240. p.Select.Fields = append(p.Select.Fields, fref(fp))
  241. }
  242. }
  243. // If there is only filter, use it directly. Otherwise, construct
  244. // a CompositeFilter.
  245. if len(q.filters) == 1 {
  246. pf, err := q.filters[0].toProto()
  247. if err != nil {
  248. return nil, err
  249. }
  250. p.Where = pf
  251. } else if len(q.filters) > 1 {
  252. cf := &pb.StructuredQuery_CompositeFilter{
  253. Op: pb.StructuredQuery_CompositeFilter_AND,
  254. }
  255. p.Where = &pb.StructuredQuery_Filter{
  256. FilterType: &pb.StructuredQuery_Filter_CompositeFilter{cf},
  257. }
  258. for _, f := range q.filters {
  259. pf, err := f.toProto()
  260. if err != nil {
  261. return nil, err
  262. }
  263. cf.Filters = append(cf.Filters, pf)
  264. }
  265. }
  266. orders := q.orders
  267. if q.startDoc != nil || q.endDoc != nil {
  268. orders = q.adjustOrders()
  269. }
  270. for _, ord := range orders {
  271. po, err := ord.toProto()
  272. if err != nil {
  273. return nil, err
  274. }
  275. p.OrderBy = append(p.OrderBy, po)
  276. }
  277. cursor, err := q.toCursor(q.startVals, q.startDoc, q.startBefore, orders)
  278. if err != nil {
  279. return nil, err
  280. }
  281. p.StartAt = cursor
  282. cursor, err = q.toCursor(q.endVals, q.endDoc, q.endBefore, orders)
  283. if err != nil {
  284. return nil, err
  285. }
  286. p.EndAt = cursor
  287. return p, nil
  288. }
  289. // If there is a start/end that uses a Document Snapshot, we may need to adjust the OrderBy
  290. // clauses that the user provided: we add OrderBy(__name__) if it isn't already present, and
  291. // we make sure we don't invalidate the original query by adding an OrderBy for inequality filters.
  292. func (q *Query) adjustOrders() []order {
  293. // If the user is already ordering by document ID, don't change anything.
  294. for _, ord := range q.orders {
  295. if ord.isDocumentID() {
  296. return q.orders
  297. }
  298. }
  299. // If there are OrderBy clauses, append an OrderBy(DocumentID), using the direction of the last OrderBy clause.
  300. if len(q.orders) > 0 {
  301. return append(q.copyOrders(), order{
  302. fieldPath: FieldPath{DocumentID},
  303. dir: q.orders[len(q.orders)-1].dir,
  304. })
  305. }
  306. // If there are no OrderBy clauses but there is an inequality, add an OrderBy clause
  307. // for the field of the first inequality.
  308. var orders []order
  309. for _, f := range q.filters {
  310. if f.op != "==" {
  311. orders = []order{{fieldPath: f.fieldPath, dir: Asc}}
  312. break
  313. }
  314. }
  315. // Add an ascending OrderBy(DocumentID).
  316. return append(orders, order{fieldPath: FieldPath{DocumentID}, dir: Asc})
  317. }
  318. func (q *Query) toCursor(fieldValues []interface{}, ds *DocumentSnapshot, before bool, orders []order) (*pb.Cursor, error) {
  319. var vals []*pb.Value
  320. var err error
  321. if ds != nil {
  322. vals, err = q.docSnapshotToCursorValues(ds, orders)
  323. } else if len(fieldValues) != 0 {
  324. vals, err = q.fieldValuesToCursorValues(fieldValues)
  325. } else {
  326. return nil, nil
  327. }
  328. if err != nil {
  329. return nil, err
  330. }
  331. return &pb.Cursor{Values: vals, Before: before}, nil
  332. }
  333. // toPositionValues converts the field values to protos.
  334. func (q *Query) fieldValuesToCursorValues(fieldValues []interface{}) ([]*pb.Value, error) {
  335. if len(fieldValues) != len(q.orders) {
  336. return nil, errors.New("firestore: number of field values in StartAt/StartAfter/EndAt/EndBefore does not match number of OrderBy fields")
  337. }
  338. vals := make([]*pb.Value, len(fieldValues))
  339. var err error
  340. for i, ord := range q.orders {
  341. fval := fieldValues[i]
  342. if ord.isDocumentID() {
  343. // TODO(jba): support DocumentRefs as well as strings.
  344. // TODO(jba): error if document ref does not belong to the right collection.
  345. docID, ok := fval.(string)
  346. if !ok {
  347. return nil, fmt.Errorf("firestore: expected doc ID for DocumentID field, got %T", fval)
  348. }
  349. vals[i] = &pb.Value{ValueType: &pb.Value_ReferenceValue{q.path + "/" + docID}}
  350. } else {
  351. var sawTransform bool
  352. vals[i], sawTransform, err = toProtoValue(reflect.ValueOf(fval))
  353. if err != nil {
  354. return nil, err
  355. }
  356. if sawTransform {
  357. return nil, errors.New("firestore: transforms disallowed in query value")
  358. }
  359. }
  360. }
  361. return vals, nil
  362. }
  363. func (q *Query) docSnapshotToCursorValues(ds *DocumentSnapshot, orders []order) ([]*pb.Value, error) {
  364. // TODO(jba): error if doc snap does not belong to the right collection.
  365. vals := make([]*pb.Value, len(orders))
  366. for i, ord := range orders {
  367. if ord.isDocumentID() {
  368. dp, qp := ds.Ref.Parent.Path, q.path
  369. if dp != qp {
  370. return nil, fmt.Errorf("firestore: document snapshot for %s passed to query on %s", dp, qp)
  371. }
  372. vals[i] = &pb.Value{ValueType: &pb.Value_ReferenceValue{ds.Ref.Path}}
  373. } else {
  374. val, err := valueAtPath(ord.fieldPath, ds.proto.Fields)
  375. if err != nil {
  376. return nil, err
  377. }
  378. vals[i] = val
  379. }
  380. }
  381. return vals, nil
  382. }
  383. // Returns a function that compares DocumentSnapshots according to q's ordering.
  384. func (q Query) compareFunc() func(d1, d2 *DocumentSnapshot) (int, error) {
  385. // Add implicit sorting by name, using the last specified direction.
  386. lastDir := Asc
  387. if len(q.orders) > 0 {
  388. lastDir = q.orders[len(q.orders)-1].dir
  389. }
  390. orders := append(q.copyOrders(), order{[]string{DocumentID}, lastDir})
  391. return func(d1, d2 *DocumentSnapshot) (int, error) {
  392. for _, ord := range orders {
  393. var cmp int
  394. if len(ord.fieldPath) == 1 && ord.fieldPath[0] == DocumentID {
  395. cmp = compareReferences(d1.Ref.Path, d2.Ref.Path)
  396. } else {
  397. v1, err := valueAtPath(ord.fieldPath, d1.proto.Fields)
  398. if err != nil {
  399. return 0, err
  400. }
  401. v2, err := valueAtPath(ord.fieldPath, d2.proto.Fields)
  402. if err != nil {
  403. return 0, err
  404. }
  405. cmp = compareValues(v1, v2)
  406. }
  407. if cmp != 0 {
  408. if ord.dir == Desc {
  409. cmp = -cmp
  410. }
  411. return cmp, nil
  412. }
  413. }
  414. return 0, nil
  415. }
  416. }
  417. type filter struct {
  418. fieldPath FieldPath
  419. op string
  420. value interface{}
  421. }
  422. func (f filter) toProto() (*pb.StructuredQuery_Filter, error) {
  423. if err := f.fieldPath.validate(); err != nil {
  424. return nil, err
  425. }
  426. if uop, ok := unaryOpFor(f.value); ok {
  427. if f.op != "==" {
  428. return nil, fmt.Errorf("firestore: must use '==' when comparing %v", f.value)
  429. }
  430. return &pb.StructuredQuery_Filter{
  431. FilterType: &pb.StructuredQuery_Filter_UnaryFilter{
  432. UnaryFilter: &pb.StructuredQuery_UnaryFilter{
  433. OperandType: &pb.StructuredQuery_UnaryFilter_Field{
  434. Field: fref(f.fieldPath),
  435. },
  436. Op: uop,
  437. },
  438. },
  439. }, nil
  440. }
  441. var op pb.StructuredQuery_FieldFilter_Operator
  442. switch f.op {
  443. case "<":
  444. op = pb.StructuredQuery_FieldFilter_LESS_THAN
  445. case "<=":
  446. op = pb.StructuredQuery_FieldFilter_LESS_THAN_OR_EQUAL
  447. case ">":
  448. op = pb.StructuredQuery_FieldFilter_GREATER_THAN
  449. case ">=":
  450. op = pb.StructuredQuery_FieldFilter_GREATER_THAN_OR_EQUAL
  451. case "==":
  452. op = pb.StructuredQuery_FieldFilter_EQUAL
  453. case "array-contains":
  454. op = pb.StructuredQuery_FieldFilter_ARRAY_CONTAINS
  455. default:
  456. return nil, fmt.Errorf("firestore: invalid operator %q", f.op)
  457. }
  458. val, sawTransform, err := toProtoValue(reflect.ValueOf(f.value))
  459. if err != nil {
  460. return nil, err
  461. }
  462. if sawTransform {
  463. return nil, errors.New("firestore: transforms disallowed in query value")
  464. }
  465. return &pb.StructuredQuery_Filter{
  466. FilterType: &pb.StructuredQuery_Filter_FieldFilter{
  467. FieldFilter: &pb.StructuredQuery_FieldFilter{
  468. Field: fref(f.fieldPath),
  469. Op: op,
  470. Value: val,
  471. },
  472. },
  473. }, nil
  474. }
  475. func unaryOpFor(value interface{}) (pb.StructuredQuery_UnaryFilter_Operator, bool) {
  476. switch {
  477. case value == nil:
  478. return pb.StructuredQuery_UnaryFilter_IS_NULL, true
  479. case isNaN(value):
  480. return pb.StructuredQuery_UnaryFilter_IS_NAN, true
  481. default:
  482. return pb.StructuredQuery_UnaryFilter_OPERATOR_UNSPECIFIED, false
  483. }
  484. }
  485. func isNaN(x interface{}) bool {
  486. switch x := x.(type) {
  487. case float32:
  488. return math.IsNaN(float64(x))
  489. case float64:
  490. return math.IsNaN(x)
  491. default:
  492. return false
  493. }
  494. }
  495. type order struct {
  496. fieldPath FieldPath
  497. dir Direction
  498. }
  499. func (r order) isDocumentID() bool {
  500. return len(r.fieldPath) == 1 && r.fieldPath[0] == DocumentID
  501. }
  502. func (r order) toProto() (*pb.StructuredQuery_Order, error) {
  503. if err := r.fieldPath.validate(); err != nil {
  504. return nil, err
  505. }
  506. return &pb.StructuredQuery_Order{
  507. Field: fref(r.fieldPath),
  508. Direction: pb.StructuredQuery_Direction(r.dir),
  509. }, nil
  510. }
  511. func fref(fp FieldPath) *pb.StructuredQuery_FieldReference {
  512. return &pb.StructuredQuery_FieldReference{FieldPath: fp.toServiceFieldPath()}
  513. }
  514. func trunc32(i int) int32 {
  515. if i > math.MaxInt32 {
  516. i = math.MaxInt32
  517. }
  518. return int32(i)
  519. }
  520. // Documents returns an iterator over the query's resulting documents.
  521. func (q Query) Documents(ctx context.Context) *DocumentIterator {
  522. return &DocumentIterator{
  523. iter: newQueryDocumentIterator(withResourceHeader(ctx, q.c.path()), &q, nil),
  524. }
  525. }
  526. // DocumentIterator is an iterator over documents returned by a query.
  527. type DocumentIterator struct {
  528. iter docIterator
  529. err error
  530. }
  531. // Unexported interface so we can have two different kinds of DocumentIterator: one
  532. // for straight queries, and one for query snapshots. We do it this way instead of
  533. // making DocumentIterator an interface because in the client libraries, iterators are
  534. // always concrete types, and the fact that this one has two different implementations
  535. // is an internal detail.
  536. type docIterator interface {
  537. next() (*DocumentSnapshot, error)
  538. stop()
  539. }
  540. // Next returns the next result. Its second return value is iterator.Done if there
  541. // are no more results. Once Next returns Done, all subsequent calls will return
  542. // Done.
  543. func (it *DocumentIterator) Next() (*DocumentSnapshot, error) {
  544. if it.err != nil {
  545. return nil, it.err
  546. }
  547. ds, err := it.iter.next()
  548. if err != nil {
  549. it.err = err
  550. }
  551. return ds, err
  552. }
  553. // Stop stops the iterator, freeing its resources.
  554. // Always call Stop when you are done with a DocumentIterator.
  555. // It is not safe to call Stop concurrently with Next.
  556. func (it *DocumentIterator) Stop() {
  557. if it.iter != nil { // possible in error cases
  558. it.iter.stop()
  559. }
  560. if it.err == nil {
  561. it.err = iterator.Done
  562. }
  563. }
  564. // GetAll returns all the documents remaining from the iterator.
  565. // It is not necessary to call Stop on the iterator after calling GetAll.
  566. func (it *DocumentIterator) GetAll() ([]*DocumentSnapshot, error) {
  567. defer it.Stop()
  568. var docs []*DocumentSnapshot
  569. for {
  570. doc, err := it.Next()
  571. if err == iterator.Done {
  572. break
  573. }
  574. if err != nil {
  575. return nil, err
  576. }
  577. docs = append(docs, doc)
  578. }
  579. return docs, nil
  580. }
  581. type queryDocumentIterator struct {
  582. ctx context.Context
  583. cancel func()
  584. q *Query
  585. tid []byte // transaction ID, if any
  586. streamClient pb.Firestore_RunQueryClient
  587. }
  588. func newQueryDocumentIterator(ctx context.Context, q *Query, tid []byte) *queryDocumentIterator {
  589. ctx, cancel := context.WithCancel(ctx)
  590. return &queryDocumentIterator{
  591. ctx: ctx,
  592. cancel: cancel,
  593. q: q,
  594. tid: tid,
  595. }
  596. }
  597. func (it *queryDocumentIterator) next() (*DocumentSnapshot, error) {
  598. client := it.q.c
  599. if it.streamClient == nil {
  600. sq, err := it.q.toProto()
  601. if err != nil {
  602. return nil, err
  603. }
  604. req := &pb.RunQueryRequest{
  605. Parent: it.q.parentPath,
  606. QueryType: &pb.RunQueryRequest_StructuredQuery{sq},
  607. }
  608. if it.tid != nil {
  609. req.ConsistencySelector = &pb.RunQueryRequest_Transaction{it.tid}
  610. }
  611. it.streamClient, err = client.c.RunQuery(it.ctx, req)
  612. if err != nil {
  613. return nil, err
  614. }
  615. }
  616. var res *pb.RunQueryResponse
  617. var err error
  618. for {
  619. res, err = it.streamClient.Recv()
  620. if err == io.EOF {
  621. return nil, iterator.Done
  622. }
  623. if err != nil {
  624. return nil, err
  625. }
  626. if res.Document != nil {
  627. break
  628. }
  629. // No document => partial progress; keep receiving.
  630. }
  631. docRef, err := pathToDoc(res.Document.Name, client)
  632. if err != nil {
  633. return nil, err
  634. }
  635. doc, err := newDocumentSnapshot(docRef, res.Document, client, res.ReadTime)
  636. if err != nil {
  637. return nil, err
  638. }
  639. return doc, nil
  640. }
  641. func (it *queryDocumentIterator) stop() {
  642. it.cancel()
  643. }
  644. // Snapshots returns an iterator over snapshots of the query. Each time the query
  645. // results change, a new snapshot will be generated.
  646. func (q Query) Snapshots(ctx context.Context) *QuerySnapshotIterator {
  647. ws, err := newWatchStreamForQuery(ctx, q)
  648. if err != nil {
  649. return &QuerySnapshotIterator{err: err}
  650. }
  651. return &QuerySnapshotIterator{
  652. Query: q,
  653. ws: ws,
  654. }
  655. }
  656. // QuerySnapshotIterator is an iterator over snapshots of a query.
  657. // Call Next on the iterator to get a snapshot of the query's results each time they change.
  658. // Call Stop on the iterator when done.
  659. //
  660. // For an example, see Query.Snapshots.
  661. type QuerySnapshotIterator struct {
  662. // The Query used to construct this iterator.
  663. Query Query
  664. ws *watchStream
  665. err error
  666. }
  667. // Next blocks until the query's results change, then returns a QuerySnapshot for
  668. // the current results.
  669. //
  670. // Next never returns iterator.Done unless it is called after Stop.
  671. func (it *QuerySnapshotIterator) Next() (*QuerySnapshot, error) {
  672. if it.err != nil {
  673. return nil, it.err
  674. }
  675. btree, changes, readTime, err := it.ws.nextSnapshot()
  676. if err != nil {
  677. if err == io.EOF {
  678. err = iterator.Done
  679. }
  680. it.err = err
  681. return nil, it.err
  682. }
  683. return &QuerySnapshot{
  684. Documents: &DocumentIterator{
  685. iter: (*btreeDocumentIterator)(btree.BeforeIndex(0)),
  686. },
  687. Size: btree.Len(),
  688. Changes: changes,
  689. ReadTime: readTime,
  690. }, nil
  691. }
  692. // Stop stops receiving snapshots. You should always call Stop when you are done with
  693. // a QuerySnapshotIterator, to free up resources. It is not safe to call Stop
  694. // concurrently with Next.
  695. func (it *QuerySnapshotIterator) Stop() {
  696. if it.ws != nil {
  697. it.ws.stop()
  698. }
  699. }
  700. // A QuerySnapshot is a snapshot of query results. It is returned by
  701. // QuerySnapshotIterator.Next whenever the results of a query change.
  702. type QuerySnapshot struct {
  703. // An iterator over the query results.
  704. // It is not necessary to call Stop on this iterator.
  705. Documents *DocumentIterator
  706. // The number of results in this snapshot.
  707. Size int
  708. // The changes since the previous snapshot.
  709. Changes []DocumentChange
  710. // The time at which this snapshot was obtained from Firestore.
  711. ReadTime time.Time
  712. }
  713. type btreeDocumentIterator btree.Iterator
  714. func (it *btreeDocumentIterator) next() (*DocumentSnapshot, error) {
  715. if !(*btree.Iterator)(it).Next() {
  716. return nil, iterator.Done
  717. }
  718. return it.Key.(*DocumentSnapshot), nil
  719. }
  720. func (*btreeDocumentIterator) stop() {}