You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

287 lines
8.0 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package firestore
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "os"
  21. "strings"
  22. "time"
  23. vkit "cloud.google.com/go/firestore/apiv1"
  24. "cloud.google.com/go/internal/version"
  25. "github.com/golang/protobuf/ptypes"
  26. gax "github.com/googleapis/gax-go/v2"
  27. "google.golang.org/api/iterator"
  28. "google.golang.org/api/option"
  29. pb "google.golang.org/genproto/googleapis/firestore/v1"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/metadata"
  33. "google.golang.org/grpc/status"
  34. )
  35. // resourcePrefixHeader is the name of the metadata header used to indicate
  36. // the resource being operated on.
  37. const resourcePrefixHeader = "google-cloud-resource-prefix"
  38. // A Client provides access to the Firestore service.
  39. type Client struct {
  40. c *vkit.Client
  41. projectID string
  42. databaseID string // A client is tied to a single database.
  43. }
  44. // NewClient creates a new Firestore client that uses the given project.
  45. func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
  46. var o []option.ClientOption
  47. // Environment variables for gcloud emulator:
  48. // https://cloud.google.com/sdk/gcloud/reference/beta/emulators/firestore/
  49. if addr := os.Getenv("FIRESTORE_EMULATOR_HOST"); addr != "" {
  50. conn, err := grpc.Dial(addr, grpc.WithInsecure())
  51. if err != nil {
  52. return nil, fmt.Errorf("firestore: dialing address from env var FIRESTORE_EMULATOR_HOST: %v", err)
  53. }
  54. o = []option.ClientOption{option.WithGRPCConn(conn)}
  55. }
  56. o = append(o, opts...)
  57. vc, err := vkit.NewClient(ctx, o...)
  58. if err != nil {
  59. return nil, err
  60. }
  61. vc.SetGoogleClientInfo("gccl", version.Repo)
  62. c := &Client{
  63. c: vc,
  64. projectID: projectID,
  65. databaseID: "(default)", // always "(default)", for now
  66. }
  67. return c, nil
  68. }
  69. // Close closes any resources held by the client.
  70. //
  71. // Close need not be called at program exit.
  72. func (c *Client) Close() error {
  73. return c.c.Close()
  74. }
  75. func (c *Client) path() string {
  76. return fmt.Sprintf("projects/%s/databases/%s", c.projectID, c.databaseID)
  77. }
  78. func withResourceHeader(ctx context.Context, resource string) context.Context {
  79. md, _ := metadata.FromOutgoingContext(ctx)
  80. md = md.Copy()
  81. md[resourcePrefixHeader] = []string{resource}
  82. return metadata.NewOutgoingContext(ctx, md)
  83. }
  84. // Collection creates a reference to a collection with the given path.
  85. // A path is a sequence of IDs separated by slashes.
  86. //
  87. // Collection returns nil if path contains an even number of IDs or any ID is empty.
  88. func (c *Client) Collection(path string) *CollectionRef {
  89. coll, _ := c.idsToRef(strings.Split(path, "/"), c.path())
  90. return coll
  91. }
  92. // Doc creates a reference to a document with the given path.
  93. // A path is a sequence of IDs separated by slashes.
  94. //
  95. // Doc returns nil if path contains an odd number of IDs or any ID is empty.
  96. func (c *Client) Doc(path string) *DocumentRef {
  97. _, doc := c.idsToRef(strings.Split(path, "/"), c.path())
  98. return doc
  99. }
  100. func (c *Client) idsToRef(IDs []string, dbPath string) (*CollectionRef, *DocumentRef) {
  101. if len(IDs) == 0 {
  102. return nil, nil
  103. }
  104. for _, id := range IDs {
  105. if id == "" {
  106. return nil, nil
  107. }
  108. }
  109. coll := newTopLevelCollRef(c, dbPath, IDs[0])
  110. i := 1
  111. for i < len(IDs) {
  112. doc := newDocRef(coll, IDs[i])
  113. i++
  114. if i == len(IDs) {
  115. return nil, doc
  116. }
  117. coll = newCollRefWithParent(c, doc, IDs[i])
  118. i++
  119. }
  120. return coll, nil
  121. }
  122. // GetAll retrieves multiple documents with a single call. The DocumentSnapshots are
  123. // returned in the order of the given DocumentRefs.
  124. //
  125. // If a document is not present, the corresponding DocumentSnapshot's Exists method will return false.
  126. func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) ([]*DocumentSnapshot, error) {
  127. return c.getAll(ctx, docRefs, nil)
  128. }
  129. func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) ([]*DocumentSnapshot, error) {
  130. var docNames []string
  131. docIndex := map[string]int{} // doc name to position in docRefs
  132. for i, dr := range docRefs {
  133. if dr == nil {
  134. return nil, errNilDocRef
  135. }
  136. docNames = append(docNames, dr.Path)
  137. docIndex[dr.Path] = i
  138. }
  139. req := &pb.BatchGetDocumentsRequest{
  140. Database: c.path(),
  141. Documents: docNames,
  142. }
  143. if tid != nil {
  144. req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{tid}
  145. }
  146. streamClient, err := c.c.BatchGetDocuments(withResourceHeader(ctx, req.Database), req)
  147. if err != nil {
  148. return nil, err
  149. }
  150. // Read and remember all results from the stream.
  151. var resps []*pb.BatchGetDocumentsResponse
  152. for {
  153. resp, err := streamClient.Recv()
  154. if err == io.EOF {
  155. break
  156. }
  157. if err != nil {
  158. return nil, err
  159. }
  160. resps = append(resps, resp)
  161. }
  162. // Results may arrive out of order. Put each at the right index.
  163. docs := make([]*DocumentSnapshot, len(docNames))
  164. for _, resp := range resps {
  165. var (
  166. i int
  167. doc *pb.Document
  168. err error
  169. )
  170. switch r := resp.Result.(type) {
  171. case *pb.BatchGetDocumentsResponse_Found:
  172. i = docIndex[r.Found.Name]
  173. doc = r.Found
  174. case *pb.BatchGetDocumentsResponse_Missing:
  175. i = docIndex[r.Missing]
  176. doc = nil
  177. default:
  178. return nil, errors.New("firestore: unknown BatchGetDocumentsResponse result type")
  179. }
  180. if docs[i] != nil {
  181. return nil, fmt.Errorf("firestore: %q seen twice", docRefs[i].Path)
  182. }
  183. docs[i], err = newDocumentSnapshot(docRefs[i], doc, c, resp.ReadTime)
  184. if err != nil {
  185. return nil, err
  186. }
  187. }
  188. return docs, nil
  189. }
  190. // Collections returns an interator over the top-level collections.
  191. func (c *Client) Collections(ctx context.Context) *CollectionIterator {
  192. it := &CollectionIterator{
  193. client: c,
  194. it: c.c.ListCollectionIds(
  195. withResourceHeader(ctx, c.path()),
  196. &pb.ListCollectionIdsRequest{Parent: c.path() + "/documents"}),
  197. }
  198. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  199. it.fetch,
  200. func() int { return len(it.items) },
  201. func() interface{} { b := it.items; it.items = nil; return b })
  202. return it
  203. }
  204. // Batch returns a WriteBatch.
  205. func (c *Client) Batch() *WriteBatch {
  206. return &WriteBatch{c: c}
  207. }
  208. // commit calls the Commit RPC outside of a transaction.
  209. func (c *Client) commit(ctx context.Context, ws []*pb.Write) ([]*WriteResult, error) {
  210. req := &pb.CommitRequest{
  211. Database: c.path(),
  212. Writes: ws,
  213. }
  214. res, err := c.c.Commit(withResourceHeader(ctx, req.Database), req)
  215. if err != nil {
  216. return nil, err
  217. }
  218. if len(res.WriteResults) == 0 {
  219. return nil, errors.New("firestore: missing WriteResult")
  220. }
  221. var wrs []*WriteResult
  222. for _, pwr := range res.WriteResults {
  223. wr, err := writeResultFromProto(pwr)
  224. if err != nil {
  225. return nil, err
  226. }
  227. wrs = append(wrs, wr)
  228. }
  229. return wrs, nil
  230. }
  231. func (c *Client) commit1(ctx context.Context, ws []*pb.Write) (*WriteResult, error) {
  232. wrs, err := c.commit(ctx, ws)
  233. if err != nil {
  234. return nil, err
  235. }
  236. return wrs[0], nil
  237. }
  238. // A WriteResult is returned by methods that write documents.
  239. type WriteResult struct {
  240. // The time at which the document was updated, or created if it did not
  241. // previously exist. Writes that do not actually change the document do
  242. // not change the update time.
  243. UpdateTime time.Time
  244. }
  245. func writeResultFromProto(wr *pb.WriteResult) (*WriteResult, error) {
  246. t, err := ptypes.Timestamp(wr.UpdateTime)
  247. if err != nil {
  248. t = time.Time{}
  249. // TODO(jba): Follow up if Delete is supposed to return a nil timestamp.
  250. }
  251. return &WriteResult{UpdateTime: t}, nil
  252. }
  253. func sleep(ctx context.Context, dur time.Duration) error {
  254. switch err := gax.Sleep(ctx, dur); err {
  255. case context.Canceled:
  256. return status.Error(codes.Canceled, "context canceled")
  257. case context.DeadlineExceeded:
  258. return status.Error(codes.DeadlineExceeded, "context deadline exceeded")
  259. default:
  260. return err
  261. }
  262. }