You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

102 lines
3.1 KiB

  1. // Copyright 2018 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package spanner
  15. import (
  16. "context"
  17. "time"
  18. "google.golang.org/api/iterator"
  19. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  20. "google.golang.org/grpc/codes"
  21. )
  22. // PartitionedUpdate executes a DML statement in parallel across the database, using
  23. // separate, internal transactions that commit independently. The DML statement must
  24. // be fully partitionable: it must be expressible as the union of many statements
  25. // each of which accesses only a single row of the table. The statement should also be
  26. // idempotent, because it may be applied more than once.
  27. //
  28. // PartitionedUpdate returns an estimated count of the number of rows affected. The actual
  29. // number of affected rows may be greater than the estimate.
  30. func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (count int64, err error) {
  31. ctx = startSpan(ctx, "cloud.google.com/go/spanner.PartitionedUpdate")
  32. defer func() { endSpan(ctx, err) }()
  33. if err := checkNestedTxn(ctx); err != nil {
  34. return 0, err
  35. }
  36. var (
  37. tx transactionID
  38. s *session
  39. sh *sessionHandle
  40. )
  41. // create session
  42. sc := c.rrNext()
  43. s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md)
  44. if err != nil {
  45. return 0, toSpannerError(err)
  46. }
  47. defer s.delete(ctx)
  48. sh = &sessionHandle{session: s}
  49. // begin transaction
  50. err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
  51. res, e := sc.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
  52. Session: sh.getID(),
  53. Options: &sppb.TransactionOptions{
  54. Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
  55. },
  56. })
  57. if e != nil {
  58. return e
  59. }
  60. tx = res.Id
  61. return nil
  62. })
  63. if err != nil {
  64. return 0, toSpannerError(err)
  65. }
  66. req := &sppb.ExecuteSqlRequest{
  67. Session: sh.getID(),
  68. Transaction: &sppb.TransactionSelector{
  69. Selector: &sppb.TransactionSelector_Id{Id: tx},
  70. },
  71. Sql: statement.SQL,
  72. }
  73. rpc := func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
  74. req.ResumeToken = resumeToken
  75. return sc.ExecuteStreamingSql(ctx, req)
  76. }
  77. iter := stream(contextWithOutgoingMetadata(ctx, sh.getMetadata()),
  78. rpc, func(time.Time) {}, func(error) {})
  79. // TODO(jba): factor out the following code from here and ReadWriteTransaction.Update.
  80. defer iter.Stop()
  81. for {
  82. _, err := iter.Next()
  83. if err == iterator.Done {
  84. break
  85. }
  86. if err != nil {
  87. return 0, toSpannerError(err)
  88. }
  89. time.Sleep(time.Second)
  90. }
  91. if !iter.sawStats {
  92. return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", statement.SQL)
  93. }
  94. return iter.RowCount, nil
  95. }