Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 

255 wiersze
5.5 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/goamz/goamz/s3"
  6. "io"
  7. "log"
  8. "mime"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "sync"
  13. )
  14. type Storage interface {
  15. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  16. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  17. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  18. }
  19. type LocalStorage struct {
  20. Storage
  21. basedir string
  22. }
  23. func NewLocalStorage(basedir string) (*LocalStorage, error) {
  24. return &LocalStorage{basedir: basedir}, nil
  25. }
  26. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  27. path := filepath.Join(s.basedir, token, filename)
  28. var fi os.FileInfo
  29. if fi, err = os.Lstat(path); err != nil {
  30. return
  31. }
  32. contentLength = uint64(fi.Size())
  33. contentType = mime.TypeByExtension(filepath.Ext(filename))
  34. return
  35. }
  36. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  37. path := filepath.Join(s.basedir, token, filename)
  38. // content type , content length
  39. if reader, err = os.Open(path); err != nil {
  40. return
  41. }
  42. var fi os.FileInfo
  43. if fi, err = os.Lstat(path); err != nil {
  44. return
  45. }
  46. contentLength = uint64(fi.Size())
  47. contentType = mime.TypeByExtension(filepath.Ext(filename))
  48. return
  49. }
  50. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  51. var f io.WriteCloser
  52. var err error
  53. path := filepath.Join(s.basedir, token)
  54. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  55. return err
  56. }
  57. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  58. fmt.Printf("%s", err)
  59. return err
  60. }
  61. defer f.Close()
  62. if _, err = io.Copy(f, reader); err != nil {
  63. return err
  64. }
  65. return nil
  66. }
  67. type S3Storage struct {
  68. Storage
  69. bucket *s3.Bucket
  70. }
  71. func NewS3Storage() (*S3Storage, error) {
  72. bucket, err := getBucket()
  73. if err != nil {
  74. return nil, err
  75. }
  76. return &S3Storage{bucket: bucket}, nil
  77. }
  78. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  79. key := fmt.Sprintf("%s/%s", token, filename)
  80. // content type , content length
  81. response, err := s.bucket.Head(key, map[string][]string{})
  82. if err != nil {
  83. return
  84. }
  85. contentType = response.Header.Get("Content-Type")
  86. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  87. if err != nil {
  88. return
  89. }
  90. return
  91. }
  92. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  93. key := fmt.Sprintf("%s/%s", token, filename)
  94. // content type , content length
  95. response, err := s.bucket.GetResponse(key)
  96. if err != nil {
  97. return
  98. }
  99. contentType = response.Header.Get("Content-Type")
  100. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  101. if err != nil {
  102. return
  103. }
  104. reader = response.Body
  105. return
  106. }
  107. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  108. key := fmt.Sprintf("%s/%s", token, filename)
  109. var (
  110. multi *s3.Multi
  111. parts []s3.Part
  112. )
  113. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  114. log.Printf(err.Error())
  115. return
  116. }
  117. // 20 mb parts
  118. partsChan := make(chan interface{})
  119. // partsChan := make(chan s3.Part)
  120. go func() {
  121. // maximize to 20 threads
  122. sem := make(chan int, 20)
  123. index := 1
  124. var wg sync.WaitGroup
  125. for {
  126. // buffered in memory because goamz s3 multi needs seekable reader
  127. var (
  128. buffer []byte = make([]byte, (1<<20)*10)
  129. count int
  130. err error
  131. )
  132. // Amazon expects parts of at least 5MB, except for the last one
  133. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  134. log.Printf(err.Error())
  135. return
  136. }
  137. // always send minimal 1 part
  138. if err == io.EOF && index > 1 {
  139. log.Printf("Waiting for all parts to finish uploading.")
  140. // wait for all parts to be finished uploading
  141. wg.Wait()
  142. // and close the channel
  143. close(partsChan)
  144. return
  145. }
  146. wg.Add(1)
  147. sem <- 1
  148. // using goroutines because of retries when upload fails
  149. go func(multi *s3.Multi, buffer []byte, index int) {
  150. log.Printf("Uploading part %d %d", index, len(buffer))
  151. defer func() {
  152. log.Printf("Finished part %d %d", index, len(buffer))
  153. wg.Done()
  154. <-sem
  155. }()
  156. partReader := bytes.NewReader(buffer)
  157. var part s3.Part
  158. if part, err = multi.PutPart(index, partReader); err != nil {
  159. log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  160. partsChan <- err
  161. return
  162. }
  163. log.Printf("Finished uploading part %d %d", index, len(buffer))
  164. partsChan <- part
  165. }(multi, buffer[:count], index)
  166. index++
  167. }
  168. }()
  169. // wait for all parts to be uploaded
  170. for part := range partsChan {
  171. switch part.(type) {
  172. case s3.Part:
  173. parts = append(parts, part.(s3.Part))
  174. case error:
  175. // abort multi upload
  176. log.Printf("Error during upload, aborting %s.", part.(error).Error())
  177. err = part.(error)
  178. multi.Abort()
  179. return
  180. }
  181. }
  182. log.Printf("Completing upload %d parts", len(parts))
  183. if err = multi.Complete(parts); err != nil {
  184. log.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  185. return
  186. }
  187. log.Printf("Completed uploading %d", len(parts))
  188. return
  189. }