You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

217 lines
6.2 KiB

  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gensupport
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "sync"
  12. "time"
  13. )
  14. const (
  15. // statusTooManyRequests is returned by the storage API if the
  16. // per-project limits have been temporarily exceeded. The request
  17. // should be retried.
  18. // https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes
  19. statusTooManyRequests = 429
  20. )
  21. // ResumableUpload is used by the generated APIs to provide resumable uploads.
  22. // It is not used by developers directly.
  23. type ResumableUpload struct {
  24. Client *http.Client
  25. // URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
  26. URI string
  27. UserAgent string // User-Agent for header of the request
  28. // Media is the object being uploaded.
  29. Media *MediaBuffer
  30. // MediaType defines the media type, e.g. "image/jpeg".
  31. MediaType string
  32. mu sync.Mutex // guards progress
  33. progress int64 // number of bytes uploaded so far
  34. // Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
  35. Callback func(int64)
  36. // If not specified, a default exponential backoff strategy will be used.
  37. Backoff BackoffStrategy
  38. }
  39. // Progress returns the number of bytes uploaded at this point.
  40. func (rx *ResumableUpload) Progress() int64 {
  41. rx.mu.Lock()
  42. defer rx.mu.Unlock()
  43. return rx.progress
  44. }
  45. // doUploadRequest performs a single HTTP request to upload data.
  46. // off specifies the offset in rx.Media from which data is drawn.
  47. // size is the number of bytes in data.
  48. // final specifies whether data is the final chunk to be uploaded.
  49. func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
  50. req, err := http.NewRequest("POST", rx.URI, data)
  51. if err != nil {
  52. return nil, err
  53. }
  54. req.ContentLength = size
  55. var contentRange string
  56. if final {
  57. if size == 0 {
  58. contentRange = fmt.Sprintf("bytes */%v", off)
  59. } else {
  60. contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
  61. }
  62. } else {
  63. contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
  64. }
  65. req.Header.Set("Content-Range", contentRange)
  66. req.Header.Set("Content-Type", rx.MediaType)
  67. req.Header.Set("User-Agent", rx.UserAgent)
  68. // Google's upload endpoint uses status code 308 for a
  69. // different purpose than the "308 Permanent Redirect"
  70. // since-standardized in RFC 7238. Because of the conflict in
  71. // semantics, Google added this new request header which
  72. // causes it to not use "308" and instead reply with 200 OK
  73. // and sets the upload-specific "X-HTTP-Status-Code-Override:
  74. // 308" response header.
  75. req.Header.Set("X-GUploader-No-308", "yes")
  76. return SendRequest(ctx, rx.Client, req)
  77. }
  78. func statusResumeIncomplete(resp *http.Response) bool {
  79. // This is how the server signals "status resume incomplete"
  80. // when X-GUploader-No-308 is set to "yes":
  81. return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
  82. }
  83. // reportProgress calls a user-supplied callback to report upload progress.
  84. // If old==updated, the callback is not called.
  85. func (rx *ResumableUpload) reportProgress(old, updated int64) {
  86. if updated-old == 0 {
  87. return
  88. }
  89. rx.mu.Lock()
  90. rx.progress = updated
  91. rx.mu.Unlock()
  92. if rx.Callback != nil {
  93. rx.Callback(updated)
  94. }
  95. }
  96. // transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
  97. func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
  98. chunk, off, size, err := rx.Media.Chunk()
  99. done := err == io.EOF
  100. if !done && err != nil {
  101. return nil, err
  102. }
  103. res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
  104. if err != nil {
  105. return res, err
  106. }
  107. // We sent "X-GUploader-No-308: yes" (see comment elsewhere in
  108. // this file), so we don't expect to get a 308.
  109. if res.StatusCode == 308 {
  110. return nil, errors.New("unexpected 308 response status code")
  111. }
  112. if res.StatusCode == http.StatusOK {
  113. rx.reportProgress(off, off+int64(size))
  114. }
  115. if statusResumeIncomplete(res) {
  116. rx.Media.Next()
  117. }
  118. return res, nil
  119. }
  120. func contextDone(ctx context.Context) bool {
  121. select {
  122. case <-ctx.Done():
  123. return true
  124. default:
  125. return false
  126. }
  127. }
  128. // Upload starts the process of a resumable upload with a cancellable context.
  129. // It retries using the provided back off strategy until cancelled or the
  130. // strategy indicates to stop retrying.
  131. // It is called from the auto-generated API code and is not visible to the user.
  132. // Before sending an HTTP request, Upload calls any registered hook functions,
  133. // and calls the returned functions after the request returns (see send.go).
  134. // rx is private to the auto-generated API code.
  135. // Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
  136. func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
  137. var pause time.Duration
  138. backoff := rx.Backoff
  139. if backoff == nil {
  140. backoff = DefaultBackoffStrategy()
  141. }
  142. for {
  143. // Ensure that we return in the case of cancelled context, even if pause is 0.
  144. if contextDone(ctx) {
  145. return nil, ctx.Err()
  146. }
  147. select {
  148. case <-ctx.Done():
  149. return nil, ctx.Err()
  150. case <-time.After(pause):
  151. }
  152. resp, err = rx.transferChunk(ctx)
  153. var status int
  154. if resp != nil {
  155. status = resp.StatusCode
  156. }
  157. // Check if we should retry the request.
  158. if shouldRetry(status, err) {
  159. var retry bool
  160. pause, retry = backoff.Pause()
  161. if retry {
  162. if resp != nil && resp.Body != nil {
  163. resp.Body.Close()
  164. }
  165. continue
  166. }
  167. }
  168. // If the chunk was uploaded successfully, but there's still
  169. // more to go, upload the next chunk without any delay.
  170. if statusResumeIncomplete(resp) {
  171. pause = 0
  172. backoff.Reset()
  173. resp.Body.Close()
  174. continue
  175. }
  176. // It's possible for err and resp to both be non-nil here, but we expose a simpler
  177. // contract to our callers: exactly one of resp and err will be non-nil. This means
  178. // that any response body must be closed here before returning a non-nil error.
  179. if err != nil {
  180. if resp != nil && resp.Body != nil {
  181. resp.Body.Close()
  182. }
  183. return nil, err
  184. }
  185. return resp, nil
  186. }
  187. }