You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

118 lines
3.8 KiB

  1. // Copyright 2017 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package datastore
  15. import (
  16. "context"
  17. "fmt"
  18. "cloud.google.com/go/internal"
  19. "cloud.google.com/go/internal/version"
  20. gax "github.com/googleapis/gax-go/v2"
  21. pb "google.golang.org/genproto/googleapis/datastore/v1"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/codes"
  24. "google.golang.org/grpc/metadata"
  25. "google.golang.org/grpc/status"
  26. )
  27. // datastoreClient is a wrapper for the pb.DatastoreClient that includes gRPC
  28. // metadata to be sent in each request for server-side traffic management.
  29. type datastoreClient struct {
  30. // Embed so we still implement the DatastoreClient interface,
  31. // if the interface adds more methods.
  32. pb.DatastoreClient
  33. c pb.DatastoreClient
  34. md metadata.MD
  35. }
  36. func newDatastoreClient(conn *grpc.ClientConn, projectID string) pb.DatastoreClient {
  37. return &datastoreClient{
  38. c: pb.NewDatastoreClient(conn),
  39. md: metadata.Pairs(
  40. resourcePrefixHeader, "projects/"+projectID,
  41. "x-goog-api-client", fmt.Sprintf("gl-go/%s gccl/%s grpc/", version.Go(), version.Repo)),
  42. }
  43. }
  44. func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
  45. err = dc.invoke(ctx, func(ctx context.Context) error {
  46. res, err = dc.c.Lookup(ctx, in, opts...)
  47. return err
  48. })
  49. return res, err
  50. }
  51. func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
  52. err = dc.invoke(ctx, func(ctx context.Context) error {
  53. res, err = dc.c.RunQuery(ctx, in, opts...)
  54. return err
  55. })
  56. return res, err
  57. }
  58. func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
  59. err = dc.invoke(ctx, func(ctx context.Context) error {
  60. res, err = dc.c.BeginTransaction(ctx, in, opts...)
  61. return err
  62. })
  63. return res, err
  64. }
  65. func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
  66. err = dc.invoke(ctx, func(ctx context.Context) error {
  67. res, err = dc.c.Commit(ctx, in, opts...)
  68. return err
  69. })
  70. return res, err
  71. }
  72. func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
  73. err = dc.invoke(ctx, func(ctx context.Context) error {
  74. res, err = dc.c.Rollback(ctx, in, opts...)
  75. return err
  76. })
  77. return res, err
  78. }
  79. func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
  80. err = dc.invoke(ctx, func(ctx context.Context) error {
  81. res, err = dc.c.AllocateIds(ctx, in, opts...)
  82. return err
  83. })
  84. return res, err
  85. }
  86. func (dc *datastoreClient) invoke(ctx context.Context, f func(ctx context.Context) error) error {
  87. ctx = metadata.NewOutgoingContext(ctx, dc.md)
  88. return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
  89. err = f(ctx)
  90. return !shouldRetry(err), err
  91. })
  92. }
  93. func shouldRetry(err error) bool {
  94. if err == nil {
  95. return false
  96. }
  97. s, ok := status.FromError(err)
  98. if !ok {
  99. return false
  100. }
  101. // See https://cloud.google.com/datastore/docs/concepts/errors.
  102. return s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded
  103. }