You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

440 lines
12 KiB

  1. package s3
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/hex"
  7. "encoding/xml"
  8. "errors"
  9. "io"
  10. "sort"
  11. "strconv"
  12. )
  13. // Multi represents an unfinished multipart upload.
  14. //
  15. // Multipart uploads allow sending big objects in smaller chunks.
  16. // After all parts have been sent, the upload must be explicitly
  17. // completed by calling Complete with the list of parts.
  18. //
  19. // See http://goo.gl/vJfTG for an overview of multipart uploads.
  20. type Multi struct {
  21. Bucket *Bucket
  22. Key string
  23. UploadId string
  24. }
  25. // That's the default. Here just for testing.
  26. var listMultiMax = 1000
  27. type listMultiResp struct {
  28. NextKeyMarker string
  29. NextUploadIdMarker string
  30. IsTruncated bool
  31. Upload []Multi
  32. CommonPrefixes []string `xml:"CommonPrefixes>Prefix"`
  33. }
  34. // ListMulti returns the list of unfinished multipart uploads in b.
  35. //
  36. // The prefix parameter limits the response to keys that begin with the
  37. // specified prefix. You can use prefixes to separate a bucket into different
  38. // groupings of keys (to get the feeling of folders, for example).
  39. //
  40. // The delim parameter causes the response to group all of the keys that
  41. // share a common prefix up to the next delimiter in a single entry within
  42. // the CommonPrefixes field. You can use delimiters to separate a bucket
  43. // into different groupings of keys, similar to how folders would work.
  44. //
  45. // See http://goo.gl/ePioY for details.
  46. func (b *Bucket) ListMulti(prefix, delim string) (multis []*Multi, prefixes []string, err error) {
  47. params := map[string][]string{
  48. "uploads": {""},
  49. "max-uploads": {strconv.FormatInt(int64(listMultiMax), 10)},
  50. "prefix": {prefix},
  51. "delimiter": {delim},
  52. }
  53. for attempt := b.S3.AttemptStrategy.Start(); attempt.Next(); {
  54. req := &request{
  55. method: "GET",
  56. bucket: b.Name,
  57. params: params,
  58. }
  59. var resp listMultiResp
  60. err := b.S3.query(req, &resp)
  61. if shouldRetry(err) && attempt.HasNext() {
  62. continue
  63. }
  64. if err != nil {
  65. return nil, nil, err
  66. }
  67. for i := range resp.Upload {
  68. multi := &resp.Upload[i]
  69. multi.Bucket = b
  70. multis = append(multis, multi)
  71. }
  72. prefixes = append(prefixes, resp.CommonPrefixes...)
  73. if !resp.IsTruncated {
  74. return multis, prefixes, nil
  75. }
  76. params["key-marker"] = []string{resp.NextKeyMarker}
  77. params["upload-id-marker"] = []string{resp.NextUploadIdMarker}
  78. attempt = b.S3.AttemptStrategy.Start() // Last request worked.
  79. }
  80. panic("unreachable")
  81. }
  82. // Multi returns a multipart upload handler for the provided key
  83. // inside b. If a multipart upload exists for key, it is returned,
  84. // otherwise a new multipart upload is initiated with contType and perm.
  85. func (b *Bucket) Multi(key, contType string, perm ACL) (*Multi, error) {
  86. multis, _, err := b.ListMulti(key, "")
  87. if err != nil && !hasCode(err, "NoSuchUpload") {
  88. return nil, err
  89. }
  90. for _, m := range multis {
  91. if m.Key == key {
  92. return m, nil
  93. }
  94. }
  95. return b.InitMulti(key, contType, perm)
  96. }
  97. // InitMulti initializes a new multipart upload at the provided
  98. // key inside b and returns a value for manipulating it.
  99. //
  100. // See http://goo.gl/XP8kL for details.
  101. func (b *Bucket) InitMulti(key string, contType string, perm ACL) (*Multi, error) {
  102. headers := map[string][]string{
  103. "Content-Type": {contType},
  104. "Content-Length": {"0"},
  105. "x-amz-acl": {string(perm)},
  106. }
  107. params := map[string][]string{
  108. "uploads": {""},
  109. }
  110. req := &request{
  111. method: "POST",
  112. bucket: b.Name,
  113. path: key,
  114. headers: headers,
  115. params: params,
  116. }
  117. var err error
  118. var resp struct {
  119. UploadId string `xml:"UploadId"`
  120. }
  121. for attempt := b.S3.AttemptStrategy.Start(); attempt.Next(); {
  122. err = b.S3.query(req, &resp)
  123. if !shouldRetry(err) {
  124. break
  125. }
  126. }
  127. if err != nil {
  128. return nil, err
  129. }
  130. return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil
  131. }
  132. // PutPart sends part n of the multipart upload, reading all the content from r.
  133. // Each part, except for the last one, must be at least 5MB in size.
  134. //
  135. // See http://goo.gl/pqZer for details.
  136. func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) {
  137. partSize, _, md5b64, err := seekerInfo(r)
  138. if err != nil {
  139. return Part{}, err
  140. }
  141. return m.putPart(n, r, partSize, md5b64)
  142. }
  143. func (m *Multi) putPart(n int, r io.ReadSeeker, partSize int64, md5b64 string) (Part, error) {
  144. headers := map[string][]string{
  145. "Content-Length": {strconv.FormatInt(partSize, 10)},
  146. "Content-MD5": {md5b64},
  147. }
  148. params := map[string][]string{
  149. "uploadId": {m.UploadId},
  150. "partNumber": {strconv.FormatInt(int64(n), 10)},
  151. }
  152. for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
  153. _, err := r.Seek(0, 0)
  154. if err != nil {
  155. return Part{}, err
  156. }
  157. req := &request{
  158. method: "PUT",
  159. bucket: m.Bucket.Name,
  160. path: m.Key,
  161. headers: headers,
  162. params: params,
  163. payload: r,
  164. }
  165. err = m.Bucket.S3.prepare(req)
  166. if err != nil {
  167. return Part{}, err
  168. }
  169. resp, err := m.Bucket.S3.run(req, nil)
  170. if shouldRetry(err) && attempt.HasNext() {
  171. continue
  172. }
  173. if err != nil {
  174. return Part{}, err
  175. }
  176. etag := resp.Header.Get("ETag")
  177. if etag == "" {
  178. return Part{}, errors.New("part upload succeeded with no ETag")
  179. }
  180. return Part{n, etag, partSize}, nil
  181. }
  182. panic("unreachable")
  183. }
  184. func seekerInfo(r io.ReadSeeker) (size int64, md5hex string, md5b64 string, err error) {
  185. _, err = r.Seek(0, 0)
  186. if err != nil {
  187. return 0, "", "", err
  188. }
  189. digest := md5.New()
  190. size, err = io.Copy(digest, r)
  191. if err != nil {
  192. return 0, "", "", err
  193. }
  194. sum := digest.Sum(nil)
  195. md5hex = hex.EncodeToString(sum)
  196. md5b64 = base64.StdEncoding.EncodeToString(sum)
  197. return size, md5hex, md5b64, nil
  198. }
  199. type Part struct {
  200. N int `xml:"PartNumber"`
  201. ETag string
  202. Size int64
  203. }
  204. type partSlice []Part
  205. func (s partSlice) Len() int { return len(s) }
  206. func (s partSlice) Less(i, j int) bool { return s[i].N < s[j].N }
  207. func (s partSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  208. type listPartsResp struct {
  209. NextPartNumberMarker string
  210. IsTruncated bool
  211. Part []Part
  212. }
  213. // That's the default. Here just for testing.
  214. var listPartsMax = 1000
  215. // ListParts returns the list of previously uploaded parts in m,
  216. // ordered by part number.
  217. //
  218. // See http://goo.gl/ePioY for details.
  219. func (m *Multi) ListParts() ([]Part, error) {
  220. params := map[string][]string{
  221. "uploadId": {m.UploadId},
  222. "max-parts": {strconv.FormatInt(int64(listPartsMax), 10)},
  223. }
  224. var parts partSlice
  225. for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
  226. req := &request{
  227. method: "GET",
  228. bucket: m.Bucket.Name,
  229. path: m.Key,
  230. params: params,
  231. }
  232. var resp listPartsResp
  233. err := m.Bucket.S3.query(req, &resp)
  234. if shouldRetry(err) && attempt.HasNext() {
  235. continue
  236. }
  237. if err != nil {
  238. return nil, err
  239. }
  240. parts = append(parts, resp.Part...)
  241. if !resp.IsTruncated {
  242. sort.Sort(parts)
  243. return parts, nil
  244. }
  245. params["part-number-marker"] = []string{resp.NextPartNumberMarker}
  246. attempt = m.Bucket.S3.AttemptStrategy.Start() // Last request worked.
  247. }
  248. panic("unreachable")
  249. }
  250. type ReaderAtSeeker interface {
  251. io.ReaderAt
  252. io.ReadSeeker
  253. }
  254. // PutAll sends all of r via a multipart upload with parts no larger
  255. // than partSize bytes, which must be set to at least 5MB.
  256. // Parts previously uploaded are either reused if their checksum
  257. // and size match the new part, or otherwise overwritten with the
  258. // new content.
  259. // PutAll returns all the parts of m (reused or not).
  260. func (m *Multi) PutAll(r ReaderAtSeeker, partSize int64) ([]Part, error) {
  261. old, err := m.ListParts()
  262. if err != nil && !hasCode(err, "NoSuchUpload") {
  263. return nil, err
  264. }
  265. reuse := 0 // Index of next old part to consider reusing.
  266. current := 1 // Part number of latest good part handled.
  267. totalSize, err := r.Seek(0, 2)
  268. if err != nil {
  269. return nil, err
  270. }
  271. first := true // Must send at least one empty part if the file is empty.
  272. var result []Part
  273. NextSection:
  274. for offset := int64(0); offset < totalSize || first; offset += partSize {
  275. first = false
  276. if offset+partSize > totalSize {
  277. partSize = totalSize - offset
  278. }
  279. section := io.NewSectionReader(r, offset, partSize)
  280. _, md5hex, md5b64, err := seekerInfo(section)
  281. if err != nil {
  282. return nil, err
  283. }
  284. for reuse < len(old) && old[reuse].N <= current {
  285. // Looks like this part was already sent.
  286. part := &old[reuse]
  287. etag := `"` + md5hex + `"`
  288. if part.N == current && part.Size == partSize && part.ETag == etag {
  289. // Checksum matches. Reuse the old part.
  290. result = append(result, *part)
  291. current++
  292. continue NextSection
  293. }
  294. reuse++
  295. }
  296. // Part wasn't found or doesn't match. Send it.
  297. part, err := m.putPart(current, section, partSize, md5b64)
  298. if err != nil {
  299. return nil, err
  300. }
  301. result = append(result, part)
  302. current++
  303. }
  304. return result, nil
  305. }
  306. type completeUpload struct {
  307. XMLName xml.Name `xml:"CompleteMultipartUpload"`
  308. Parts completeParts `xml:"Part"`
  309. }
  310. type completePart struct {
  311. PartNumber int
  312. ETag string
  313. }
  314. type completeParts []completePart
  315. func (p completeParts) Len() int { return len(p) }
  316. func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
  317. func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  318. type completeResponse struct {
  319. // The element name: should be either CompleteMultipartUploadResult or Error.
  320. XMLName xml.Name
  321. // If the element was error, then it should have the following:
  322. Code string
  323. Message string
  324. RequestId string
  325. HostId string
  326. }
  327. // Complete assembles the given previously uploaded parts into the
  328. // final object. This operation may take several minutes.
  329. //
  330. // The complete call to AMZ may still fail after returning HTTP 200,
  331. // so even though it's unusued, the body of the reply must be demarshalled
  332. // and checked to see whether or not the complete succeeded.
  333. //
  334. // See http://goo.gl/2Z7Tw for details.
  335. func (m *Multi) Complete(parts []Part) error {
  336. params := map[string][]string{
  337. "uploadId": {m.UploadId},
  338. }
  339. c := completeUpload{}
  340. for _, p := range parts {
  341. c.Parts = append(c.Parts, completePart{p.N, p.ETag})
  342. }
  343. sort.Sort(c.Parts)
  344. data, err := xml.Marshal(&c)
  345. if err != nil {
  346. return err
  347. }
  348. // Setting Content-Length prevents breakage on DreamObjects
  349. for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
  350. req := &request{
  351. method: "POST",
  352. bucket: m.Bucket.Name,
  353. path: m.Key,
  354. params: params,
  355. payload: bytes.NewReader(data),
  356. headers: map[string][]string{
  357. "Content-Length": []string{strconv.Itoa(len(data))},
  358. },
  359. }
  360. resp := &completeResponse{}
  361. err := m.Bucket.S3.query(req, resp)
  362. if shouldRetry(err) && attempt.HasNext() {
  363. continue
  364. }
  365. if err == nil && resp.XMLName.Local == "Error" {
  366. err = &Error{
  367. StatusCode: 200,
  368. Code: resp.Code,
  369. Message: resp.Message,
  370. RequestId: resp.RequestId,
  371. HostId: resp.HostId,
  372. }
  373. }
  374. return err
  375. }
  376. panic("unreachable")
  377. }
  378. // Abort deletes an unifinished multipart upload and any previously
  379. // uploaded parts for it.
  380. //
  381. // After a multipart upload is aborted, no additional parts can be
  382. // uploaded using it. However, if any part uploads are currently in
  383. // progress, those part uploads might or might not succeed. As a result,
  384. // it might be necessary to abort a given multipart upload multiple
  385. // times in order to completely free all storage consumed by all parts.
  386. //
  387. // NOTE: If the described scenario happens to you, please report back to
  388. // the goamz authors with details. In the future such retrying should be
  389. // handled internally, but it's not clear what happens precisely (Is an
  390. // error returned? Is the issue completely undetectable?).
  391. //
  392. // See http://goo.gl/dnyJw for details.
  393. func (m *Multi) Abort() error {
  394. params := map[string][]string{
  395. "uploadId": {m.UploadId},
  396. }
  397. for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
  398. req := &request{
  399. method: "DELETE",
  400. bucket: m.Bucket.Name,
  401. path: m.Key,
  402. params: params,
  403. }
  404. err := m.Bucket.S3.query(req, nil)
  405. if shouldRetry(err) && attempt.HasNext() {
  406. continue
  407. }
  408. return err
  409. }
  410. panic("unreachable")
  411. }