You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

386 lines
11 KiB

  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "hash/crc32"
  20. "io"
  21. "io/ioutil"
  22. "net/http"
  23. "net/url"
  24. "reflect"
  25. "strconv"
  26. "strings"
  27. "time"
  28. "cloud.google.com/go/internal/trace"
  29. "google.golang.org/api/googleapi"
  30. )
  31. var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
  32. // ReaderObjectAttrs are attributes about the object being read. These are populated
  33. // during the New call. This struct only holds a subset of object attributes: to
  34. // get the full set of attributes, use ObjectHandle.Attrs.
  35. //
  36. // Each field is read-only.
  37. type ReaderObjectAttrs struct {
  38. // Size is the length of the object's content.
  39. Size int64
  40. // ContentType is the MIME type of the object's content.
  41. ContentType string
  42. // ContentEncoding is the encoding of the object's content.
  43. ContentEncoding string
  44. // CacheControl specifies whether and for how long browser and Internet
  45. // caches are allowed to cache your objects.
  46. CacheControl string
  47. // LastModified is the time that the object was last modified.
  48. LastModified time.Time
  49. // Generation is the generation number of the object's content.
  50. Generation int64
  51. // Metageneration is the version of the metadata for this object at
  52. // this generation. This field is used for preconditions and for
  53. // detecting changes in metadata. A metageneration number is only
  54. // meaningful in the context of a particular generation of a
  55. // particular object.
  56. Metageneration int64
  57. }
  58. // NewReader creates a new Reader to read the contents of the
  59. // object.
  60. // ErrObjectNotExist will be returned if the object is not found.
  61. //
  62. // The caller must call Close on the returned Reader when done reading.
  63. func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
  64. return o.NewRangeReader(ctx, 0, -1)
  65. }
  66. // NewRangeReader reads part of an object, reading at most length bytes
  67. // starting at the given offset. If length is negative, the object is read
  68. // until the end.
  69. func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) {
  70. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
  71. defer func() { trace.EndSpan(ctx, err) }()
  72. if err := o.validate(); err != nil {
  73. return nil, err
  74. }
  75. if offset < 0 {
  76. return nil, fmt.Errorf("storage: invalid offset %d < 0", offset)
  77. }
  78. if o.conds != nil {
  79. if err := o.conds.validate("NewRangeReader"); err != nil {
  80. return nil, err
  81. }
  82. }
  83. u := &url.URL{
  84. Scheme: "https",
  85. Host: "storage.googleapis.com",
  86. Path: fmt.Sprintf("/%s/%s", o.bucket, o.object),
  87. }
  88. verb := "GET"
  89. if length == 0 {
  90. verb = "HEAD"
  91. }
  92. req, err := http.NewRequest(verb, u.String(), nil)
  93. if err != nil {
  94. return nil, err
  95. }
  96. req = req.WithContext(ctx)
  97. if o.userProject != "" {
  98. req.Header.Set("X-Goog-User-Project", o.userProject)
  99. }
  100. if o.readCompressed {
  101. req.Header.Set("Accept-Encoding", "gzip")
  102. }
  103. if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
  104. return nil, err
  105. }
  106. gen := o.gen
  107. // Define a function that initiates a Read with offset and length, assuming we
  108. // have already read seen bytes.
  109. reopen := func(seen int64) (*http.Response, error) {
  110. start := offset + seen
  111. if length < 0 && start > 0 {
  112. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start))
  113. } else if length > 0 {
  114. // The end character isn't affected by how many bytes we've seen.
  115. req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1))
  116. }
  117. // We wait to assign conditions here because the generation number can change in between reopen() runs.
  118. req.URL.RawQuery = conditionsQuery(gen, o.conds)
  119. var res *http.Response
  120. err = runWithRetry(ctx, func() error {
  121. res, err = o.c.hc.Do(req)
  122. if err != nil {
  123. return err
  124. }
  125. if res.StatusCode == http.StatusNotFound {
  126. res.Body.Close()
  127. return ErrObjectNotExist
  128. }
  129. if res.StatusCode < 200 || res.StatusCode > 299 {
  130. body, _ := ioutil.ReadAll(res.Body)
  131. res.Body.Close()
  132. return &googleapi.Error{
  133. Code: res.StatusCode,
  134. Header: res.Header,
  135. Body: string(body),
  136. }
  137. }
  138. if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent {
  139. res.Body.Close()
  140. return errors.New("storage: partial request not satisfied")
  141. }
  142. // If a generation hasn't been specified, and this is the first response we get, let's record the
  143. // generation. In future requests we'll use this generation as a precondition to avoid data races.
  144. if gen < 0 && res.Header.Get("X-Goog-Generation") != "" {
  145. gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64)
  146. if err != nil {
  147. return err
  148. }
  149. gen = gen64
  150. }
  151. return nil
  152. })
  153. if err != nil {
  154. return nil, err
  155. }
  156. return res, nil
  157. }
  158. res, err := reopen(0)
  159. if err != nil {
  160. return nil, err
  161. }
  162. var (
  163. size int64 // total size of object, even if a range was requested.
  164. checkCRC bool
  165. crc uint32
  166. )
  167. if res.StatusCode == http.StatusPartialContent {
  168. cr := strings.TrimSpace(res.Header.Get("Content-Range"))
  169. if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
  170. return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
  171. }
  172. size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
  173. if err != nil {
  174. return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
  175. }
  176. } else {
  177. size = res.ContentLength
  178. // Check the CRC iff all of the following hold:
  179. // - We asked for content (length != 0).
  180. // - We got all the content (status != PartialContent).
  181. // - The server sent a CRC header.
  182. // - The Go http stack did not uncompress the file.
  183. // - We were not served compressed data that was uncompressed on download.
  184. // The problem with the last two cases is that the CRC will not match -- GCS
  185. // computes it on the compressed contents, but we compute it on the
  186. // uncompressed contents.
  187. if length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
  188. crc, checkCRC = parseCRC32c(res)
  189. }
  190. }
  191. remain := res.ContentLength
  192. body := res.Body
  193. if length == 0 {
  194. remain = 0
  195. body.Close()
  196. body = emptyBody
  197. }
  198. var metaGen int64
  199. if res.Header.Get("X-Goog-Generation") != "" {
  200. metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64)
  201. if err != nil {
  202. return nil, err
  203. }
  204. }
  205. var lm time.Time
  206. if res.Header.Get("Last-Modified") != "" {
  207. lm, err = http.ParseTime(res.Header.Get("Last-Modified"))
  208. if err != nil {
  209. return nil, err
  210. }
  211. }
  212. attrs := ReaderObjectAttrs{
  213. Size: size,
  214. ContentType: res.Header.Get("Content-Type"),
  215. ContentEncoding: res.Header.Get("Content-Encoding"),
  216. CacheControl: res.Header.Get("Cache-Control"),
  217. LastModified: lm,
  218. Generation: gen,
  219. Metageneration: metaGen,
  220. }
  221. return &Reader{
  222. Attrs: attrs,
  223. body: body,
  224. size: size,
  225. remain: remain,
  226. wantCRC: crc,
  227. checkCRC: checkCRC,
  228. reopen: reopen,
  229. }, nil
  230. }
  231. func uncompressedByServer(res *http.Response) bool {
  232. // If the data is stored as gzip but is not encoded as gzip, then it
  233. // was uncompressed by the server.
  234. return res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" &&
  235. res.Header.Get("Content-Encoding") != "gzip"
  236. }
  237. func parseCRC32c(res *http.Response) (uint32, bool) {
  238. const prefix = "crc32c="
  239. for _, spec := range res.Header["X-Goog-Hash"] {
  240. if strings.HasPrefix(spec, prefix) {
  241. c, err := decodeUint32(spec[len(prefix):])
  242. if err == nil {
  243. return c, true
  244. }
  245. }
  246. }
  247. return 0, false
  248. }
  249. var emptyBody = ioutil.NopCloser(strings.NewReader(""))
  250. // Reader reads a Cloud Storage object.
  251. // It implements io.Reader.
  252. //
  253. // Typically, a Reader computes the CRC of the downloaded content and compares it to
  254. // the stored CRC, returning an error from Read if there is a mismatch. This integrity check
  255. // is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
  256. type Reader struct {
  257. Attrs ReaderObjectAttrs
  258. body io.ReadCloser
  259. seen, remain, size int64
  260. checkCRC bool // should we check the CRC?
  261. wantCRC uint32 // the CRC32c value the server sent in the header
  262. gotCRC uint32 // running crc
  263. reopen func(seen int64) (*http.Response, error)
  264. }
  265. // Close closes the Reader. It must be called when done reading.
  266. func (r *Reader) Close() error {
  267. return r.body.Close()
  268. }
  269. func (r *Reader) Read(p []byte) (int, error) {
  270. n, err := r.readWithRetry(p)
  271. if r.remain != -1 {
  272. r.remain -= int64(n)
  273. }
  274. if r.checkCRC {
  275. r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
  276. // Check CRC here. It would be natural to check it in Close, but
  277. // everybody defers Close on the assumption that it doesn't return
  278. // anything worth looking at.
  279. if err == io.EOF {
  280. if r.gotCRC != r.wantCRC {
  281. return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
  282. r.gotCRC, r.wantCRC)
  283. }
  284. }
  285. }
  286. return n, err
  287. }
  288. func (r *Reader) readWithRetry(p []byte) (int, error) {
  289. n := 0
  290. for len(p[n:]) > 0 {
  291. m, err := r.body.Read(p[n:])
  292. n += m
  293. r.seen += int64(m)
  294. if !shouldRetryRead(err) {
  295. return n, err
  296. }
  297. // Read failed, but we will try again. Send a ranged read request that takes
  298. // into account the number of bytes we've already seen.
  299. res, err := r.reopen(r.seen)
  300. if err != nil {
  301. // reopen already retries
  302. return n, err
  303. }
  304. r.body.Close()
  305. r.body = res.Body
  306. }
  307. return n, nil
  308. }
  309. func shouldRetryRead(err error) bool {
  310. if err == nil {
  311. return false
  312. }
  313. return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
  314. }
  315. // Size returns the size of the object in bytes.
  316. // The returned value is always the same and is not affected by
  317. // calls to Read or Close.
  318. //
  319. // Deprecated: use Reader.Attrs.Size.
  320. func (r *Reader) Size() int64 {
  321. return r.Attrs.Size
  322. }
  323. // Remain returns the number of bytes left to read, or -1 if unknown.
  324. func (r *Reader) Remain() int64 {
  325. return r.remain
  326. }
  327. // ContentType returns the content type of the object.
  328. //
  329. // Deprecated: use Reader.Attrs.ContentType.
  330. func (r *Reader) ContentType() string {
  331. return r.Attrs.ContentType
  332. }
  333. // ContentEncoding returns the content encoding of the object.
  334. //
  335. // Deprecated: use Reader.Attrs.ContentEncoding.
  336. func (r *Reader) ContentEncoding() string {
  337. return r.Attrs.ContentEncoding
  338. }
  339. // CacheControl returns the cache control of the object.
  340. //
  341. // Deprecated: use Reader.Attrs.CacheControl.
  342. func (r *Reader) CacheControl() string {
  343. return r.Attrs.CacheControl
  344. }
  345. // LastModified returns the value of the Last-Modified header.
  346. //
  347. // Deprecated: use Reader.Attrs.LastModified.
  348. func (r *Reader) LastModified() (time.Time, error) {
  349. return r.Attrs.LastModified, nil
  350. }