You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

180 lines
6.5 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package longrunning supports Long Running Operations for the Google Cloud Libraries.
  15. // See google.golang.org/genproto/googleapis/longrunning for its service definition.
  16. //
  17. // Users of the Google Cloud Libraries will typically not use this package directly.
  18. // Instead they will call functions returning Operations and call their methods.
  19. //
  20. // This package is still experimental and subject to change.
  21. package longrunning // import "cloud.google.com/go/longrunning"
  22. import (
  23. "context"
  24. "errors"
  25. "fmt"
  26. "time"
  27. autogen "cloud.google.com/go/longrunning/autogen"
  28. "github.com/golang/protobuf/proto"
  29. "github.com/golang/protobuf/ptypes"
  30. gax "github.com/googleapis/gax-go/v2"
  31. pb "google.golang.org/genproto/googleapis/longrunning"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/status"
  34. )
  35. // ErrNoMetadata is the error returned by Metadata if the operation contains no metadata.
  36. var ErrNoMetadata = errors.New("operation contains no metadata")
  37. // Operation represents the result of an API call that may not be ready yet.
  38. type Operation struct {
  39. c operationsClient
  40. proto *pb.Operation
  41. }
  42. type operationsClient interface {
  43. GetOperation(context.Context, *pb.GetOperationRequest, ...gax.CallOption) (*pb.Operation, error)
  44. CancelOperation(context.Context, *pb.CancelOperationRequest, ...gax.CallOption) error
  45. DeleteOperation(context.Context, *pb.DeleteOperationRequest, ...gax.CallOption) error
  46. }
  47. // InternalNewOperation is for use by the google Cloud Libraries only.
  48. //
  49. // InternalNewOperation returns an long-running operation, abstracting the raw pb.Operation.
  50. // The conn parameter refers to a server that proto was received from.
  51. func InternalNewOperation(inner *autogen.OperationsClient, proto *pb.Operation) *Operation {
  52. return &Operation{
  53. c: inner,
  54. proto: proto,
  55. }
  56. }
  57. // Name returns the name of the long-running operation.
  58. // The name is assigned by the server and is unique within the service
  59. // from which the operation is created.
  60. func (op *Operation) Name() string {
  61. return op.proto.Name
  62. }
  63. // Done reports whether the long-running operation has completed.
  64. func (op *Operation) Done() bool {
  65. return op.proto.Done
  66. }
  67. // Metadata unmarshals op's metadata into meta.
  68. // If op does not contain any metadata, Metadata returns ErrNoMetadata and meta is unmodified.
  69. func (op *Operation) Metadata(meta proto.Message) error {
  70. if m := op.proto.Metadata; m != nil {
  71. return ptypes.UnmarshalAny(m, meta)
  72. }
  73. return ErrNoMetadata
  74. }
  75. // Poll fetches the latest state of a long-running operation.
  76. //
  77. // If Poll fails, the error is returned and op is unmodified.
  78. // If Poll succeeds and the operation has completed with failure,
  79. // the error is returned and op.Done will return true.
  80. // If Poll succeeds and the operation has completed successfully,
  81. // op.Done will return true; if resp != nil, the response of the operation
  82. // is stored in resp.
  83. func (op *Operation) Poll(ctx context.Context, resp proto.Message, opts ...gax.CallOption) error {
  84. if !op.Done() {
  85. p, err := op.c.GetOperation(ctx, &pb.GetOperationRequest{Name: op.Name()}, opts...)
  86. if err != nil {
  87. return err
  88. }
  89. op.proto = p
  90. }
  91. if !op.Done() {
  92. return nil
  93. }
  94. switch r := op.proto.Result.(type) {
  95. case *pb.Operation_Error:
  96. // TODO (pongad): r.Details may contain further information
  97. return status.Errorf(codes.Code(r.Error.Code), "%s", r.Error.Message)
  98. case *pb.Operation_Response:
  99. if resp == nil {
  100. return nil
  101. }
  102. return ptypes.UnmarshalAny(r.Response, resp)
  103. default:
  104. return fmt.Errorf("unsupported result type %[1]T: %[1]v", r)
  105. }
  106. }
  107. // DefaultWaitInterval is the polling interval used by Operation.Wait.
  108. const DefaultWaitInterval = 60 * time.Second
  109. // Wait is equivalent to WaitWithInterval using DefaultWaitInterval.
  110. func (op *Operation) Wait(ctx context.Context, resp proto.Message, opts ...gax.CallOption) error {
  111. return op.WaitWithInterval(ctx, resp, DefaultWaitInterval, opts...)
  112. }
  113. // WaitWithInterval blocks until the operation is completed.
  114. // If resp != nil, Wait stores the response in resp.
  115. // WaitWithInterval polls every interval, except initially
  116. // when it polls using exponential backoff.
  117. //
  118. // See documentation of Poll for error-handling information.
  119. func (op *Operation) WaitWithInterval(ctx context.Context, resp proto.Message, interval time.Duration, opts ...gax.CallOption) error {
  120. bo := gax.Backoff{
  121. Initial: 1 * time.Second,
  122. Max: interval,
  123. }
  124. if bo.Max < bo.Initial {
  125. bo.Max = bo.Initial
  126. }
  127. return op.wait(ctx, resp, &bo, gax.Sleep, opts...)
  128. }
  129. type sleeper func(context.Context, time.Duration) error
  130. // wait implements Wait, taking exponentialBackoff and sleeper arguments for testing.
  131. func (op *Operation) wait(ctx context.Context, resp proto.Message, bo *gax.Backoff, sl sleeper, opts ...gax.CallOption) error {
  132. for {
  133. if err := op.Poll(ctx, resp, opts...); err != nil {
  134. return err
  135. }
  136. if op.Done() {
  137. return nil
  138. }
  139. if err := sl(ctx, bo.Pause()); err != nil {
  140. return err
  141. }
  142. }
  143. }
  144. // Cancel starts asynchronous cancellation on a long-running operation. The server
  145. // makes a best effort to cancel the operation, but success is not
  146. // guaranteed. If the server doesn't support this method, it returns
  147. // grpc.Code(error) == codes.Unimplemented. Clients can use
  148. // Poll or other methods to check whether the cancellation succeeded or whether the
  149. // operation completed despite cancellation. On successful cancellation,
  150. // the operation is not deleted; instead, op.Poll returns an error
  151. // with code Canceled.
  152. func (op *Operation) Cancel(ctx context.Context, opts ...gax.CallOption) error {
  153. return op.c.CancelOperation(ctx, &pb.CancelOperationRequest{Name: op.Name()}, opts...)
  154. }
  155. // Delete deletes a long-running operation. This method indicates that the client is
  156. // no longer interested in the operation result. It does not cancel the
  157. // operation. If the server doesn't support this method, grpc.Code(error) == codes.Unimplemented.
  158. func (op *Operation) Delete(ctx context.Context, opts ...gax.CallOption) error {
  159. return op.c.DeleteOperation(ctx, &pb.DeleteOperationRequest{Name: op.Name()}, opts...)
  160. }