From 2770ceb517738ea895a732ddb5ba94f608e4ccd8 Mon Sep 17 00:00:00 2001 From: Remco Date: Fri, 14 Nov 2014 23:32:44 +0100 Subject: [PATCH] resolve #11, multipart upload for large files --- transfersh-server/storage.go | 117 ++++++++++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 3 deletions(-) diff --git a/transfersh-server/storage.go b/transfersh-server/storage.go index ac6aecf..a1c1ba5 100644 --- a/transfersh-server/storage.go +++ b/transfersh-server/storage.go @@ -1,13 +1,16 @@ package main import ( + "bytes" "fmt" "github.com/goamz/goamz/s3" "io" + "log" "mime" "os" "path/filepath" "strconv" + "sync" ) type Storage interface { @@ -136,8 +139,116 @@ func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, co return } -func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error { +func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) { key := fmt.Sprintf("%s/%s", token, filename) - err := s.bucket.PutReader(key, reader, int64(contentLength), contentType, s3.Private, s3.Options{}) - return err + + var ( + multi *s3.Multi + parts []s3.Part + ) + + if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil { + log.Printf(err.Error()) + return + } + + // 20 mb parts + partsChan := make(chan interface{}) + // partsChan := make(chan s3.Part) + + go func() { + // maximize to 20 threads + sem := make(chan int, 20) + index := 1 + var wg sync.WaitGroup + + for { + // buffered in memory because goamz s3 multi needs seekable reader + var ( + buffer []byte = make([]byte, (1<<20)*10) + count int + err error + ) + + // Amazon expects parts of at least 5MB, except for the last one + if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + log.Printf(err.Error()) + return + } + + // always send minimal 1 part + if err == io.EOF && index > 1 { + log.Printf("Waiting for all parts to finish uploading.") + + // wait for all parts to be finished uploading + wg.Wait() + + // and close the channel + close(partsChan) + + return + } + + wg.Add(1) + + sem <- 1 + + // using goroutines because of retries when upload fails + go func(multi *s3.Multi, buffer []byte, index int) { + log.Printf("Uploading part %d %d", index, len(buffer)) + + defer func() { + log.Printf("Finished part %d %d", index, len(buffer)) + + wg.Done() + + <-sem + }() + + partReader := bytes.NewReader(buffer) + + var part s3.Part + + if part, err = multi.PutPart(index, partReader); err != nil { + log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error()) + partsChan <- err + return + } + + log.Printf("Finished uploading part %d %d", index, len(buffer)) + + partsChan <- part + + }(multi, buffer[:count], index) + + index++ + } + }() + + // wait for all parts to be uploaded + for part := range partsChan { + switch part.(type) { + case s3.Part: + parts = append(parts, part.(s3.Part)) + case error: + // abort multi upload + log.Printf("Error during upload, aborting %s.", part.(error).Error()) + err = part.(error) + + multi.Abort() + return + } + + } + + log.Printf("Completing upload %d parts", len(parts)) + + if err = multi.Complete(parts); err != nil { + log.Printf("Error during completing upload %d parts %s", len(parts), err.Error()) + return + } + + log.Printf("Completed uploading %d", len(parts)) + + return }