You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

229 lines
7.9 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "cloud.google.com/go/internal/trace"
  20. raw "google.golang.org/api/storage/v1"
  21. )
  22. // CopierFrom creates a Copier that can copy src to dst.
  23. // You can immediately call Run on the returned Copier, or
  24. // you can configure it first.
  25. //
  26. // For Requester Pays buckets, the user project of dst is billed, unless it is empty,
  27. // in which case the user project of src is billed.
  28. func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier {
  29. return &Copier{dst: dst, src: src}
  30. }
  31. // A Copier copies a source object to a destination.
  32. type Copier struct {
  33. // ObjectAttrs are optional attributes to set on the destination object.
  34. // Any attributes must be initialized before any calls on the Copier. Nil
  35. // or zero-valued attributes are ignored.
  36. ObjectAttrs
  37. // RewriteToken can be set before calling Run to resume a copy
  38. // operation. After Run returns a non-nil error, RewriteToken will
  39. // have been updated to contain the value needed to resume the copy.
  40. RewriteToken string
  41. // ProgressFunc can be used to monitor the progress of a multi-RPC copy
  42. // operation. If ProgressFunc is not nil and copying requires multiple
  43. // calls to the underlying service (see
  44. // https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then
  45. // ProgressFunc will be invoked after each call with the number of bytes of
  46. // content copied so far and the total size in bytes of the source object.
  47. //
  48. // ProgressFunc is intended to make upload progress available to the
  49. // application. For example, the implementation of ProgressFunc may update
  50. // a progress bar in the application's UI, or log the result of
  51. // float64(copiedBytes)/float64(totalBytes).
  52. //
  53. // ProgressFunc should return quickly without blocking.
  54. ProgressFunc func(copiedBytes, totalBytes uint64)
  55. // The Cloud KMS key, in the form projects/P/locations/L/keyRings/R/cryptoKeys/K,
  56. // that will be used to encrypt the object. Overrides the object's KMSKeyName, if
  57. // any.
  58. //
  59. // Providing both a DestinationKMSKeyName and a customer-supplied encryption key
  60. // (via ObjectHandle.Key) on the destination object will result in an error when
  61. // Run is called.
  62. DestinationKMSKeyName string
  63. dst, src *ObjectHandle
  64. }
  65. // Run performs the copy.
  66. func (c *Copier) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
  67. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Copier.Run")
  68. defer func() { trace.EndSpan(ctx, err) }()
  69. if err := c.src.validate(); err != nil {
  70. return nil, err
  71. }
  72. if err := c.dst.validate(); err != nil {
  73. return nil, err
  74. }
  75. if c.DestinationKMSKeyName != "" && c.dst.encryptionKey != nil {
  76. return nil, errors.New("storage: cannot use DestinationKMSKeyName with a customer-supplied encryption key")
  77. }
  78. // Convert destination attributes to raw form, omitting the bucket.
  79. // If the bucket is included but name or content-type aren't, the service
  80. // returns a 400 with "Required" as the only message. Omitting the bucket
  81. // does not cause any problems.
  82. rawObject := c.ObjectAttrs.toRawObject("")
  83. for {
  84. res, err := c.callRewrite(ctx, rawObject)
  85. if err != nil {
  86. return nil, err
  87. }
  88. if c.ProgressFunc != nil {
  89. c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize))
  90. }
  91. if res.Done { // Finished successfully.
  92. return newObject(res.Resource), nil
  93. }
  94. }
  95. }
  96. func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) {
  97. call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj)
  98. call.Context(ctx).Projection("full")
  99. if c.RewriteToken != "" {
  100. call.RewriteToken(c.RewriteToken)
  101. }
  102. if c.DestinationKMSKeyName != "" {
  103. call.DestinationKmsKeyName(c.DestinationKMSKeyName)
  104. }
  105. if c.PredefinedACL != "" {
  106. call.DestinationPredefinedAcl(c.PredefinedACL)
  107. }
  108. if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil {
  109. return nil, err
  110. }
  111. if c.dst.userProject != "" {
  112. call.UserProject(c.dst.userProject)
  113. } else if c.src.userProject != "" {
  114. call.UserProject(c.src.userProject)
  115. }
  116. if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil {
  117. return nil, err
  118. }
  119. if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
  120. return nil, err
  121. }
  122. if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil {
  123. return nil, err
  124. }
  125. var res *raw.RewriteResponse
  126. var err error
  127. setClientHeader(call.Header())
  128. err = runWithRetry(ctx, func() error { res, err = call.Do(); return err })
  129. if err != nil {
  130. return nil, err
  131. }
  132. c.RewriteToken = res.RewriteToken
  133. return res, nil
  134. }
  135. // ComposerFrom creates a Composer that can compose srcs into dst.
  136. // You can immediately call Run on the returned Composer, or you can
  137. // configure it first.
  138. //
  139. // The encryption key for the destination object will be used to decrypt all
  140. // source objects and encrypt the destination object. It is an error
  141. // to specify an encryption key for any of the source objects.
  142. func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer {
  143. return &Composer{dst: dst, srcs: srcs}
  144. }
  145. // A Composer composes source objects into a destination object.
  146. //
  147. // For Requester Pays buckets, the user project of dst is billed.
  148. type Composer struct {
  149. // ObjectAttrs are optional attributes to set on the destination object.
  150. // Any attributes must be initialized before any calls on the Composer. Nil
  151. // or zero-valued attributes are ignored.
  152. ObjectAttrs
  153. dst *ObjectHandle
  154. srcs []*ObjectHandle
  155. }
  156. // Run performs the compose operation.
  157. func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
  158. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Composer.Run")
  159. defer func() { trace.EndSpan(ctx, err) }()
  160. if err := c.dst.validate(); err != nil {
  161. return nil, err
  162. }
  163. if len(c.srcs) == 0 {
  164. return nil, errors.New("storage: at least one source object must be specified")
  165. }
  166. req := &raw.ComposeRequest{}
  167. // Compose requires a non-empty Destination, so we always set it,
  168. // even if the caller-provided ObjectAttrs is the zero value.
  169. req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
  170. for _, src := range c.srcs {
  171. if err := src.validate(); err != nil {
  172. return nil, err
  173. }
  174. if src.bucket != c.dst.bucket {
  175. return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket)
  176. }
  177. if src.encryptionKey != nil {
  178. return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object)
  179. }
  180. srcObj := &raw.ComposeRequestSourceObjects{
  181. Name: src.object,
  182. }
  183. if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
  184. return nil, err
  185. }
  186. req.SourceObjects = append(req.SourceObjects, srcObj)
  187. }
  188. call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
  189. if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil {
  190. return nil, err
  191. }
  192. if c.dst.userProject != "" {
  193. call.UserProject(c.dst.userProject)
  194. }
  195. if c.PredefinedACL != "" {
  196. call.DestinationPredefinedAcl(c.PredefinedACL)
  197. }
  198. if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
  199. return nil, err
  200. }
  201. var obj *raw.Object
  202. setClientHeader(call.Header())
  203. err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
  204. if err != nil {
  205. return nil, err
  206. }
  207. return newObject(obj), nil
  208. }