You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

409 lines
13 KiB

  1. // Copyright 2014 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package datastore
  15. import (
  16. "context"
  17. "errors"
  18. "cloud.google.com/go/internal/trace"
  19. pb "google.golang.org/genproto/googleapis/datastore/v1"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/codes"
  22. )
  23. // ErrConcurrentTransaction is returned when a transaction is rolled back due
  24. // to a conflict with a concurrent transaction.
  25. var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction")
  26. var errExpiredTransaction = errors.New("datastore: transaction expired")
  27. type transactionSettings struct {
  28. attempts int
  29. readOnly bool
  30. prevID []byte // ID of the transaction to retry
  31. }
  32. // newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
  33. // Unconfigured options will be set to default values.
  34. func newTransactionSettings(opts []TransactionOption) *transactionSettings {
  35. s := &transactionSettings{attempts: 3}
  36. for _, o := range opts {
  37. o.apply(s)
  38. }
  39. return s
  40. }
  41. // TransactionOption configures the way a transaction is executed.
  42. type TransactionOption interface {
  43. apply(*transactionSettings)
  44. }
  45. // MaxAttempts returns a TransactionOption that overrides the default 3 attempt times.
  46. func MaxAttempts(attempts int) TransactionOption {
  47. return maxAttempts(attempts)
  48. }
  49. type maxAttempts int
  50. func (w maxAttempts) apply(s *transactionSettings) {
  51. if w > 0 {
  52. s.attempts = int(w)
  53. }
  54. }
  55. // ReadOnly is a TransactionOption that marks the transaction as read-only.
  56. var ReadOnly TransactionOption
  57. func init() {
  58. ReadOnly = readOnly{}
  59. }
  60. type readOnly struct{}
  61. func (readOnly) apply(s *transactionSettings) {
  62. s.readOnly = true
  63. }
  64. // Transaction represents a set of datastore operations to be committed atomically.
  65. //
  66. // Operations are enqueued by calling the Put and Delete methods on Transaction
  67. // (or their Multi-equivalents). These operations are only committed when the
  68. // Commit method is invoked. To ensure consistency, reads must be performed by
  69. // using Transaction's Get method or by using the Transaction method when
  70. // building a query.
  71. //
  72. // A Transaction must be committed or rolled back exactly once.
  73. type Transaction struct {
  74. id []byte
  75. client *Client
  76. ctx context.Context
  77. mutations []*pb.Mutation // The mutations to apply.
  78. pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
  79. }
  80. // NewTransaction starts a new transaction.
  81. func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (t *Transaction, err error) {
  82. ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.NewTransaction")
  83. defer func() { trace.EndSpan(ctx, err) }()
  84. for _, o := range opts {
  85. if _, ok := o.(maxAttempts); ok {
  86. return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option")
  87. }
  88. }
  89. return c.newTransaction(ctx, newTransactionSettings(opts))
  90. }
  91. func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (*Transaction, error) {
  92. req := &pb.BeginTransactionRequest{ProjectId: c.dataset}
  93. if s.readOnly {
  94. req.TransactionOptions = &pb.TransactionOptions{
  95. Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}},
  96. }
  97. } else if s.prevID != nil {
  98. req.TransactionOptions = &pb.TransactionOptions{
  99. Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
  100. PreviousTransaction: s.prevID,
  101. }},
  102. }
  103. }
  104. resp, err := c.client.BeginTransaction(ctx, req)
  105. if err != nil {
  106. return nil, err
  107. }
  108. return &Transaction{
  109. id: resp.Transaction,
  110. ctx: ctx,
  111. client: c,
  112. mutations: nil,
  113. pending: make(map[int]*PendingKey),
  114. }, nil
  115. }
  116. // RunInTransaction runs f in a transaction. f is invoked with a Transaction
  117. // that f should use for all the transaction's datastore operations.
  118. //
  119. // f must not call Commit or Rollback on the provided Transaction.
  120. //
  121. // If f returns nil, RunInTransaction commits the transaction,
  122. // returning the Commit and a nil error if it succeeds. If the commit fails due
  123. // to a conflicting transaction, RunInTransaction retries f with a new
  124. // Transaction. It gives up and returns ErrConcurrentTransaction after three
  125. // failed attempts (or as configured with MaxAttempts).
  126. //
  127. // If f returns non-nil, then the transaction will be rolled back and
  128. // RunInTransaction will return the same error. The function f is not retried.
  129. //
  130. // Note that when f returns, the transaction is not committed. Calling code
  131. // must not assume that any of f's changes have been committed until
  132. // RunInTransaction returns nil.
  133. //
  134. // Since f may be called multiple times, f should usually be idempotent – that
  135. // is, it should have the same result when called multiple times. Note that
  136. // Transaction.Get will append when unmarshalling slice fields, so it is not
  137. // necessarily idempotent.
  138. func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (cmt *Commit, err error) {
  139. ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.RunInTransaction")
  140. defer func() { trace.EndSpan(ctx, err) }()
  141. settings := newTransactionSettings(opts)
  142. for n := 0; n < settings.attempts; n++ {
  143. tx, err := c.newTransaction(ctx, settings)
  144. if err != nil {
  145. return nil, err
  146. }
  147. if err := f(tx); err != nil {
  148. _ = tx.Rollback()
  149. return nil, err
  150. }
  151. if cmt, err := tx.Commit(); err != ErrConcurrentTransaction {
  152. return cmt, err
  153. }
  154. // Pass this transaction's ID to the retry transaction to preserve
  155. // transaction priority.
  156. if !settings.readOnly {
  157. settings.prevID = tx.id
  158. }
  159. }
  160. return nil, ErrConcurrentTransaction
  161. }
  162. // Commit applies the enqueued operations atomically.
  163. func (t *Transaction) Commit() (c *Commit, err error) {
  164. t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit")
  165. defer func() { trace.EndSpan(t.ctx, err) }()
  166. if t.id == nil {
  167. return nil, errExpiredTransaction
  168. }
  169. req := &pb.CommitRequest{
  170. ProjectId: t.client.dataset,
  171. TransactionSelector: &pb.CommitRequest_Transaction{Transaction: t.id},
  172. Mutations: t.mutations,
  173. Mode: pb.CommitRequest_TRANSACTIONAL,
  174. }
  175. resp, err := t.client.client.Commit(t.ctx, req)
  176. if grpc.Code(err) == codes.Aborted {
  177. return nil, ErrConcurrentTransaction
  178. }
  179. t.id = nil // mark the transaction as expired
  180. if err != nil {
  181. return nil, err
  182. }
  183. // Copy any newly minted keys into the returned keys.
  184. for i, p := range t.pending {
  185. if i >= len(resp.MutationResults) || resp.MutationResults[i].Key == nil {
  186. return nil, errors.New("datastore: internal error: server returned the wrong mutation results")
  187. }
  188. key, err := protoToKey(resp.MutationResults[i].Key)
  189. if err != nil {
  190. return nil, errors.New("datastore: internal error: server returned an invalid key")
  191. }
  192. p.key = key
  193. p.commit = c
  194. }
  195. return c, nil
  196. }
  197. // Rollback abandons a pending transaction.
  198. func (t *Transaction) Rollback() (err error) {
  199. t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback")
  200. defer func() { trace.EndSpan(t.ctx, err) }()
  201. if t.id == nil {
  202. return errExpiredTransaction
  203. }
  204. id := t.id
  205. t.id = nil
  206. _, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
  207. ProjectId: t.client.dataset,
  208. Transaction: id,
  209. })
  210. return err
  211. }
  212. // Get is the transaction-specific version of the package function Get.
  213. // All reads performed during the transaction will come from a single consistent
  214. // snapshot. Furthermore, if the transaction is set to a serializable isolation
  215. // level, another transaction cannot concurrently modify the data that is read
  216. // or modified by this transaction.
  217. func (t *Transaction) Get(key *Key, dst interface{}) (err error) {
  218. t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Get")
  219. defer func() { trace.EndSpan(t.ctx, err) }()
  220. opts := &pb.ReadOptions{
  221. ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
  222. }
  223. err = t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts)
  224. if me, ok := err.(MultiError); ok {
  225. return me[0]
  226. }
  227. return err
  228. }
  229. // GetMulti is a batch version of Get.
  230. func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
  231. t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti")
  232. defer func() { trace.EndSpan(t.ctx, err) }()
  233. if t.id == nil {
  234. return errExpiredTransaction
  235. }
  236. opts := &pb.ReadOptions{
  237. ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
  238. }
  239. return t.client.get(t.ctx, keys, dst, opts)
  240. }
  241. // Put is the transaction-specific version of the package function Put.
  242. //
  243. // Put returns a PendingKey which can be resolved into a Key using the
  244. // return value from a successful Commit. If key is an incomplete key, the
  245. // returned pending key will resolve to a unique key generated by the
  246. // datastore.
  247. func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
  248. h, err := t.PutMulti([]*Key{key}, []interface{}{src})
  249. if err != nil {
  250. if me, ok := err.(MultiError); ok {
  251. return nil, me[0]
  252. }
  253. return nil, err
  254. }
  255. return h[0], nil
  256. }
  257. // PutMulti is a batch version of Put. One PendingKey is returned for each
  258. // element of src in the same order.
  259. // TODO(jba): rewrite in terms of Mutate.
  260. func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) {
  261. t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.PutMulti")
  262. defer func() { trace.EndSpan(t.ctx, err) }()
  263. if t.id == nil {
  264. return nil, errExpiredTransaction
  265. }
  266. mutations, err := putMutations(keys, src)
  267. if err != nil {
  268. return nil, err
  269. }
  270. origin := len(t.mutations)
  271. t.mutations = append(t.mutations, mutations...)
  272. // Prepare the returned handles, pre-populating where possible.
  273. ret = make([]*PendingKey, len(keys))
  274. for i, key := range keys {
  275. p := &PendingKey{}
  276. if key.Incomplete() {
  277. // This key will be in the final commit result.
  278. t.pending[origin+i] = p
  279. } else {
  280. p.key = key
  281. }
  282. ret[i] = p
  283. }
  284. return ret, nil
  285. }
  286. // Delete is the transaction-specific version of the package function Delete.
  287. // Delete enqueues the deletion of the entity for the given key, to be
  288. // committed atomically upon calling Commit.
  289. func (t *Transaction) Delete(key *Key) error {
  290. err := t.DeleteMulti([]*Key{key})
  291. if me, ok := err.(MultiError); ok {
  292. return me[0]
  293. }
  294. return err
  295. }
  296. // DeleteMulti is a batch version of Delete.
  297. // TODO(jba): rewrite in terms of Mutate.
  298. func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
  299. t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.DeleteMulti")
  300. defer func() { trace.EndSpan(t.ctx, err) }()
  301. if t.id == nil {
  302. return errExpiredTransaction
  303. }
  304. mutations, err := deleteMutations(keys)
  305. if err != nil {
  306. return err
  307. }
  308. t.mutations = append(t.mutations, mutations...)
  309. return nil
  310. }
  311. // Mutate adds the mutations to the transaction. They will all be applied atomically
  312. // upon calling Commit. Mutate returns a PendingKey for each Mutation in the argument
  313. // list, in the same order. PendingKeys for Delete mutations are always nil.
  314. //
  315. // If any of the mutations are invalid, Mutate returns a MultiError with the errors.
  316. // Mutate returns a MultiError in this case even if there is only one Mutation.
  317. //
  318. // For an example, see Client.Mutate.
  319. func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) {
  320. if t.id == nil {
  321. return nil, errExpiredTransaction
  322. }
  323. pmuts, err := mutationProtos(muts)
  324. if err != nil {
  325. return nil, err
  326. }
  327. origin := len(t.mutations)
  328. t.mutations = append(t.mutations, pmuts...)
  329. // Prepare the returned handles, pre-populating where possible.
  330. ret := make([]*PendingKey, len(muts))
  331. for i, mut := range muts {
  332. if mut.isDelete() {
  333. continue
  334. }
  335. p := &PendingKey{}
  336. if mut.key.Incomplete() {
  337. // This key will be in the final commit result.
  338. t.pending[origin+i] = p
  339. } else {
  340. p.key = mut.key
  341. }
  342. ret[i] = p
  343. }
  344. return ret, nil
  345. }
  346. // Commit represents the result of a committed transaction.
  347. type Commit struct{}
  348. // Key resolves a pending key handle into a final key.
  349. func (c *Commit) Key(p *PendingKey) *Key {
  350. if p == nil { // if called on a *PendingKey from a Delete mutation
  351. return nil
  352. }
  353. // If p.commit is nil, the PendingKey did not come from an incomplete key,
  354. // so p.key is valid.
  355. if p.commit != nil && c != p.commit {
  356. panic("PendingKey was not created by corresponding transaction")
  357. }
  358. return p.key
  359. }
  360. // PendingKey represents the key for newly-inserted entity. It can be
  361. // resolved into a Key by calling the Key method of Commit.
  362. type PendingKey struct {
  363. key *Key
  364. commit *Commit
  365. }