Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 

225 строки
5.8 KiB

  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "strconv"
  7. "time"
  8. "github.com/aws/aws-sdk-go/aws"
  9. "github.com/aws/aws-sdk-go/aws/awserr"
  10. "github.com/aws/aws-sdk-go/aws/credentials"
  11. "github.com/aws/aws-sdk-go/aws/session"
  12. "github.com/aws/aws-sdk-go/service/s3"
  13. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  14. )
  15. type S3Storage struct {
  16. Storage
  17. bucket string
  18. session *session.Session
  19. s3 *s3.S3
  20. logger *log.Logger
  21. noMultipart bool
  22. }
  23. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  24. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  25. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  26. }
  27. func (s *S3Storage) Type() string {
  28. return "s3"
  29. }
  30. func (s *S3Storage) Head(token string, filename string) (metadata Metadata, err error) {
  31. key := fmt.Sprintf("%s/%s", token, filename)
  32. headRequest := &s3.HeadObjectInput{
  33. Bucket: aws.String(s.bucket),
  34. Key: aws.String(key),
  35. }
  36. // content type , content length
  37. response, err := s.s3.HeadObject(headRequest)
  38. if err != nil {
  39. return Metadata{}, err
  40. }
  41. downloads, err := strconv.Atoi(*response.Metadata["downloads"])
  42. if err != nil {
  43. return Metadata{}, err
  44. }
  45. maxdownloads, err := strconv.Atoi(*response.Metadata["maxDownloads"])
  46. if err != nil {
  47. return Metadata{}, err
  48. }
  49. expires, err := time.Parse("2020-02-02 02:02:02", *response.Expires)
  50. if err != nil {
  51. return Metadata{}, err
  52. }
  53. metadata = Metadata{
  54. ContentType: "",
  55. ContentLength: *response.ContentLength,
  56. Downloads: downloads,
  57. MaxDownloads: maxdownloads,
  58. MaxDate: expires,
  59. DeletionToken: *response.Metadata["deletionToken"],
  60. Secret: *response.Metadata["deletionSecret"],
  61. }
  62. return metadata, nil
  63. }
  64. func (s *S3Storage) Patch(token string, filename string, reader io.Reader, metadata Metadata) error {
  65. if reader != nil {
  66. return s.Put(token, filename, reader, metadata)
  67. }
  68. key := fmt.Sprintf("%s/%s", token, filename)
  69. input := &s3.CopyObjectInput{
  70. Bucket: aws.String(s.bucket),
  71. CopySource: aws.String(key),
  72. Key: aws.String(key),
  73. MetadataDirective: aws.String("REPLACE"),
  74. Metadata: map[string]*string{
  75. "downloads": aws.String(strconv.Itoa(metadata.Downloads)),
  76. "maxDownloads": aws.String(strconv.Itoa(metadata.MaxDownloads)),
  77. "deletionToken": aws.String(metadata.DeletionToken),
  78. "deletionSecret": aws.String(metadata.Secret),
  79. },
  80. ContentType: aws.String(metadata.ContentType),
  81. Expires: aws.Time(metadata.MaxDate),
  82. }
  83. _, err := s.s3.CopyObject(input)
  84. if err != nil {
  85. return err
  86. }
  87. return nil
  88. }
  89. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, metadata Metadata, err error) {
  90. key := fmt.Sprintf("%s/%s", token, filename)
  91. getRequest := &s3.GetObjectInput{
  92. Bucket: aws.String(s.bucket),
  93. Key: aws.String(key),
  94. }
  95. response, err := s.s3.GetObject(getRequest)
  96. if err != nil {
  97. return
  98. }
  99. downloads, err := strconv.Atoi(*response.Metadata["downloads"])
  100. if err != nil {
  101. return nil, Metadata{}, err
  102. }
  103. maxdownloads, err := strconv.Atoi(*response.Metadata["maxDownloads"])
  104. if err != nil {
  105. return nil, Metadata{}, err
  106. }
  107. expires, err := time.Parse("2020-02-02 02:02:02", *response.Expires)
  108. if err != nil {
  109. return nil, Metadata{}, err
  110. }
  111. metadata = Metadata{
  112. ContentType: "",
  113. ContentLength: *response.ContentLength,
  114. Downloads: downloads,
  115. MaxDownloads: maxdownloads,
  116. MaxDate: expires,
  117. DeletionToken: *response.Metadata["deletionToken"],
  118. Secret: *response.Metadata["deletionSecret"],
  119. }
  120. reader = response.Body
  121. return
  122. }
  123. func (s *S3Storage) Delete(token string, filename string) (err error) {
  124. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  125. deleteRequest := &s3.DeleteObjectInput{
  126. Bucket: aws.String(s.bucket),
  127. Key: aws.String(metadata),
  128. }
  129. _, err = s.s3.DeleteObject(deleteRequest)
  130. if err != nil {
  131. return
  132. }
  133. key := fmt.Sprintf("%s/%s", token, filename)
  134. deleteRequest = &s3.DeleteObjectInput{
  135. Bucket: aws.String(s.bucket),
  136. Key: aws.String(key),
  137. }
  138. _, err = s.s3.DeleteObject(deleteRequest)
  139. return
  140. }
  141. func (s *S3Storage) Put(token string, filename string, reader io.Reader, metadata Metadata) (err error) {
  142. key := fmt.Sprintf("%s/%s", token, filename)
  143. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  144. var concurrency int
  145. if !s.noMultipart {
  146. concurrency = 20
  147. } else {
  148. concurrency = 1
  149. }
  150. // Create an uploader with the session and custom options
  151. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  152. u.Concurrency = concurrency // default is 5
  153. u.LeavePartsOnError = false
  154. })
  155. _, err = uploader.Upload(&s3manager.UploadInput{
  156. Bucket: aws.String(s.bucket),
  157. Key: aws.String(key),
  158. Body: reader,
  159. Metadata: map[string]*string{
  160. "downloads": aws.String(strconv.Itoa(metadata.Downloads)),
  161. "maxDownloads": aws.String(strconv.Itoa(metadata.MaxDownloads)),
  162. "deletionToken": aws.String(metadata.DeletionToken),
  163. "deletionSecret": aws.String(metadata.Secret),
  164. },
  165. ContentType: aws.String(metadata.ContentType),
  166. Expires: aws.Time(metadata.MaxDate),
  167. })
  168. return
  169. }
  170. func (s *S3Storage) IsNotExist(err error) bool {
  171. if err == nil {
  172. return false
  173. }
  174. if aerr, ok := err.(awserr.Error); ok {
  175. switch aerr.Code() {
  176. case s3.ErrCodeNoSuchKey:
  177. return true
  178. }
  179. }
  180. return false
  181. }
  182. func getAwsSession(accessKey, secretKey, region, endpoint string, forcePathStyle bool) *session.Session {
  183. return session.Must(session.NewSession(&aws.Config{
  184. Region: aws.String(region),
  185. Endpoint: aws.String(endpoint),
  186. Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
  187. S3ForcePathStyle: aws.Bool(forcePathStyle),
  188. }))
  189. }