You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

279 lines
9.2 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package firestore
  15. import (
  16. "context"
  17. "errors"
  18. gax "github.com/googleapis/gax-go/v2"
  19. pb "google.golang.org/genproto/googleapis/firestore/v1"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/codes"
  22. "google.golang.org/grpc/status"
  23. )
  24. // Transaction represents a Firestore transaction.
  25. type Transaction struct {
  26. c *Client
  27. ctx context.Context
  28. id []byte
  29. writes []*pb.Write
  30. maxAttempts int
  31. readOnly bool
  32. readAfterWrite bool
  33. }
  34. // A TransactionOption is an option passed to Client.Transaction.
  35. type TransactionOption interface {
  36. config(t *Transaction)
  37. }
  38. // MaxAttempts is a TransactionOption that configures the maximum number of times to
  39. // try a transaction. In defaults to DefaultTransactionMaxAttempts.
  40. func MaxAttempts(n int) maxAttempts { return maxAttempts(n) }
  41. type maxAttempts int
  42. func (m maxAttempts) config(t *Transaction) { t.maxAttempts = int(m) }
  43. // DefaultTransactionMaxAttempts is the default number of times to attempt a transaction.
  44. const DefaultTransactionMaxAttempts = 5
  45. // ReadOnly is a TransactionOption that makes the transaction read-only. Read-only
  46. // transactions cannot issue write operations, but are more efficient.
  47. var ReadOnly = ro{}
  48. type ro struct{}
  49. func (ro) config(t *Transaction) { t.readOnly = true }
  50. var (
  51. // Defined here for testing.
  52. errReadAfterWrite = errors.New("firestore: read after write in transaction")
  53. errWriteReadOnly = errors.New("firestore: write in read-only transaction")
  54. errNestedTransaction = errors.New("firestore: nested transaction")
  55. )
  56. type transactionInProgressKey struct{}
  57. // RunTransaction runs f in a transaction. f should use the transaction it is given
  58. // for all Firestore operations. For any operation requiring a context, f should use
  59. // the context it is passed, not the first argument to RunTransaction.
  60. //
  61. // f must not call Commit or Rollback on the provided Transaction.
  62. //
  63. // If f returns nil, RunTransaction commits the transaction. If the commit fails due
  64. // to a conflicting transaction, RunTransaction retries f. It gives up and returns an
  65. // error after a number of attempts that can be configured with the MaxAttempts
  66. // option. If the commit succeeds, RunTransaction returns a nil error.
  67. //
  68. // If f returns non-nil, then the transaction will be rolled back and
  69. // this method will return the same error. The function f is not retried.
  70. //
  71. // Note that when f returns, the transaction is not committed. Calling code
  72. // must not assume that any of f's changes have been committed until
  73. // RunTransaction returns nil.
  74. //
  75. // Since f may be called more than once, f should usually be idempotent – that is, it
  76. // should have the same result when called multiple times.
  77. func (c *Client) RunTransaction(ctx context.Context, f func(context.Context, *Transaction) error, opts ...TransactionOption) error {
  78. if ctx.Value(transactionInProgressKey{}) != nil {
  79. return errNestedTransaction
  80. }
  81. db := c.path()
  82. t := &Transaction{
  83. c: c,
  84. ctx: withResourceHeader(ctx, db),
  85. maxAttempts: DefaultTransactionMaxAttempts,
  86. }
  87. for _, opt := range opts {
  88. opt.config(t)
  89. }
  90. var txOpts *pb.TransactionOptions
  91. if t.readOnly {
  92. txOpts = &pb.TransactionOptions{
  93. Mode: &pb.TransactionOptions_ReadOnly_{&pb.TransactionOptions_ReadOnly{}},
  94. }
  95. }
  96. var backoff gax.Backoff
  97. // TODO(jba): use other than the standard backoff parameters?
  98. // TODO(jba): get backoff time from gRPC trailer metadata? See
  99. // extractRetryDelay in https://code.googlesource.com/gocloud/+/master/spanner/retry.go.
  100. var err error
  101. for i := 0; i < t.maxAttempts; i++ {
  102. var res *pb.BeginTransactionResponse
  103. res, err = t.c.c.BeginTransaction(t.ctx, &pb.BeginTransactionRequest{
  104. Database: db,
  105. Options: txOpts,
  106. })
  107. if err != nil {
  108. return err
  109. }
  110. t.id = res.Transaction
  111. err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t)
  112. // Read after write can only be checked client-side, so we make sure to check
  113. // even if the user does not.
  114. if err == nil && t.readAfterWrite {
  115. err = errReadAfterWrite
  116. }
  117. if err != nil {
  118. t.rollback()
  119. // Prefer f's returned error to rollback error.
  120. return err
  121. }
  122. _, err = t.c.c.Commit(t.ctx, &pb.CommitRequest{
  123. Database: t.c.path(),
  124. Writes: t.writes,
  125. Transaction: t.id,
  126. })
  127. // If a read-write transaction returns Aborted, retry.
  128. // On success or other failures, return here.
  129. if t.readOnly || grpc.Code(err) != codes.Aborted {
  130. // According to the Firestore team, we should not roll back here
  131. // if err != nil. But spanner does.
  132. // See https://code.googlesource.com/gocloud/+/master/spanner/transaction.go#740.
  133. return err
  134. }
  135. if txOpts == nil {
  136. // txOpts can only be nil if is the first retry of a read-write transaction.
  137. // (It is only set here and in the body of "if t.readOnly" above.)
  138. // Mention the transaction ID in BeginTransaction so the service
  139. // knows it is a retry.
  140. txOpts = &pb.TransactionOptions{
  141. Mode: &pb.TransactionOptions_ReadWrite_{
  142. &pb.TransactionOptions_ReadWrite{RetryTransaction: t.id},
  143. },
  144. }
  145. }
  146. // Use exponential backoff to avoid contention with other running
  147. // transactions.
  148. if cerr := sleep(ctx, backoff.Pause()); cerr != nil {
  149. err = cerr
  150. break
  151. }
  152. // Reset state for the next attempt.
  153. t.writes = nil
  154. }
  155. // If we run out of retries, return the last error we saw (which should
  156. // be the Aborted from Commit, or a context error).
  157. if err != nil {
  158. t.rollback()
  159. }
  160. return err
  161. }
  162. func (t *Transaction) rollback() {
  163. _ = t.c.c.Rollback(t.ctx, &pb.RollbackRequest{
  164. Database: t.c.path(),
  165. Transaction: t.id,
  166. })
  167. // Ignore the rollback error.
  168. // TODO(jba): Log it?
  169. // Note: Rollback is idempotent so it will be retried by the gapic layer.
  170. }
  171. // Get gets the document in the context of the transaction. The transaction holds a
  172. // pessimistic lock on the returned document.
  173. func (t *Transaction) Get(dr *DocumentRef) (*DocumentSnapshot, error) {
  174. docsnaps, err := t.GetAll([]*DocumentRef{dr})
  175. if err != nil {
  176. return nil, err
  177. }
  178. ds := docsnaps[0]
  179. if !ds.Exists() {
  180. return ds, status.Errorf(codes.NotFound, "%q not found", dr.Path)
  181. }
  182. return ds, nil
  183. }
  184. // GetAll retrieves multiple documents with a single call. The DocumentSnapshots are
  185. // returned in the order of the given DocumentRefs. If a document is not present, the
  186. // corresponding DocumentSnapshot's Exists method will return false. The transaction
  187. // holds a pessimistic lock on all of the returned documents.
  188. func (t *Transaction) GetAll(drs []*DocumentRef) ([]*DocumentSnapshot, error) {
  189. if len(t.writes) > 0 {
  190. t.readAfterWrite = true
  191. return nil, errReadAfterWrite
  192. }
  193. return t.c.getAll(t.ctx, drs, t.id)
  194. }
  195. // A Queryer is a Query or a CollectionRef. CollectionRefs act as queries whose
  196. // results are all the documents in the collection.
  197. type Queryer interface {
  198. query() *Query
  199. }
  200. // Documents returns a DocumentIterator based on given Query or CollectionRef. The
  201. // results will be in the context of the transaction.
  202. func (t *Transaction) Documents(q Queryer) *DocumentIterator {
  203. if len(t.writes) > 0 {
  204. t.readAfterWrite = true
  205. return &DocumentIterator{err: errReadAfterWrite}
  206. }
  207. return &DocumentIterator{
  208. iter: newQueryDocumentIterator(t.ctx, q.query(), t.id),
  209. }
  210. }
  211. // DocumentRefs returns references to all the documents in the collection, including
  212. // missing documents. A missing document is a document that does not exist but has
  213. // sub-documents.
  214. func (t *Transaction) DocumentRefs(cr *CollectionRef) *DocumentRefIterator {
  215. if len(t.writes) > 0 {
  216. t.readAfterWrite = true
  217. return &DocumentRefIterator{err: errReadAfterWrite}
  218. }
  219. return newDocumentRefIterator(t.ctx, cr, t.id)
  220. }
  221. // Create adds a Create operation to the Transaction.
  222. // See DocumentRef.Create for details.
  223. func (t *Transaction) Create(dr *DocumentRef, data interface{}) error {
  224. return t.addWrites(dr.newCreateWrites(data))
  225. }
  226. // Set adds a Set operation to the Transaction.
  227. // See DocumentRef.Set for details.
  228. func (t *Transaction) Set(dr *DocumentRef, data interface{}, opts ...SetOption) error {
  229. return t.addWrites(dr.newSetWrites(data, opts))
  230. }
  231. // Delete adds a Delete operation to the Transaction.
  232. // See DocumentRef.Delete for details.
  233. func (t *Transaction) Delete(dr *DocumentRef, opts ...Precondition) error {
  234. return t.addWrites(dr.newDeleteWrites(opts))
  235. }
  236. // Update adds a new Update operation to the Transaction.
  237. // See DocumentRef.Update for details.
  238. func (t *Transaction) Update(dr *DocumentRef, data []Update, opts ...Precondition) error {
  239. return t.addWrites(dr.newUpdatePathWrites(data, opts))
  240. }
  241. func (t *Transaction) addWrites(ws []*pb.Write, err error) error {
  242. if t.readOnly {
  243. return errWriteReadOnly
  244. }
  245. if err != nil {
  246. return err
  247. }
  248. t.writes = append(t.writes, ws...)
  249. return nil
  250. }