You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

560 lines
13 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/aws/aws-sdk-go/aws"
  6. "github.com/aws/aws-sdk-go/aws/awserr"
  7. "github.com/aws/aws-sdk-go/aws/session"
  8. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  9. "io"
  10. "io/ioutil"
  11. "log"
  12. "mime"
  13. "net/http"
  14. "os"
  15. "path/filepath"
  16. "strings"
  17. "github.com/aws/aws-sdk-go/service/s3"
  18. "golang.org/x/net/context"
  19. "golang.org/x/oauth2"
  20. "golang.org/x/oauth2/google"
  21. "google.golang.org/api/drive/v3"
  22. "google.golang.org/api/googleapi"
  23. )
  24. type Storage interface {
  25. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  26. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  27. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  28. Delete(token string, filename string) error
  29. IsNotExist(err error) bool
  30. Type() string
  31. }
  32. type LocalStorage struct {
  33. Storage
  34. basedir string
  35. logger *log.Logger
  36. }
  37. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  38. return &LocalStorage{basedir: basedir, logger: logger}, nil
  39. }
  40. func (s *LocalStorage) Type() string {
  41. return "local"
  42. }
  43. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  44. path := filepath.Join(s.basedir, token, filename)
  45. var fi os.FileInfo
  46. if fi, err = os.Lstat(path); err != nil {
  47. return
  48. }
  49. contentLength = uint64(fi.Size())
  50. contentType = mime.TypeByExtension(filepath.Ext(filename))
  51. return
  52. }
  53. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  54. path := filepath.Join(s.basedir, token, filename)
  55. // content type , content length
  56. if reader, err = os.Open(path); err != nil {
  57. return
  58. }
  59. var fi os.FileInfo
  60. if fi, err = os.Lstat(path); err != nil {
  61. return
  62. }
  63. contentLength = uint64(fi.Size())
  64. contentType = mime.TypeByExtension(filepath.Ext(filename))
  65. return
  66. }
  67. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  68. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  69. os.Remove(metadata)
  70. path := filepath.Join(s.basedir, token, filename)
  71. err = os.Remove(path)
  72. return
  73. }
  74. func (s *LocalStorage) IsNotExist(err error) bool {
  75. if err == nil {
  76. return false
  77. }
  78. return os.IsNotExist(err)
  79. }
  80. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  81. var f io.WriteCloser
  82. var err error
  83. path := filepath.Join(s.basedir, token)
  84. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  85. return err
  86. }
  87. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  88. return err
  89. }
  90. defer f.Close()
  91. if _, err = io.Copy(f, reader); err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. type S3Storage struct {
  97. Storage
  98. bucket string
  99. session *session.Session
  100. s3 *s3.S3
  101. logger *log.Logger
  102. noMultipart bool
  103. }
  104. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger, disableMultipart bool) (*S3Storage, error) {
  105. sess := getAwsSession(accessKey, secretKey, endpoint)
  106. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  107. }
  108. func (s *S3Storage) Type() string {
  109. return "s3"
  110. }
  111. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  112. key := fmt.Sprintf("%s/%s", token, filename)
  113. headRequest := &s3.HeadObjectInput{
  114. Bucket: aws.String(s.bucket),
  115. Key: aws.String(key),
  116. }
  117. // content type , content length
  118. _, response := s.s3.HeadObjectRequest(headRequest)
  119. if response.ContentType != nil {
  120. contentType = *response.ContentType
  121. }
  122. if response.ContentLength != nil {
  123. contentLength = uint64(*response.ContentLength)
  124. }
  125. return
  126. }
  127. func (s *S3Storage) IsNotExist(err error) bool {
  128. if err == nil {
  129. return false
  130. }
  131. if aerr, ok := err.(awserr.Error); ok {
  132. switch aerr.Code() {
  133. case s3.ErrCodeNoSuchKey:
  134. return true
  135. }
  136. }
  137. return false
  138. }
  139. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  140. key := fmt.Sprintf("%s/%s", token, filename)
  141. getRequest := &s3.GetObjectInput{
  142. Bucket: aws.String(s.bucket),
  143. Key: aws.String(key),
  144. }
  145. _, response := s.s3.GetObjectRequest(getRequest)
  146. if response.ContentType != nil {
  147. contentType = *response.ContentType
  148. }
  149. if response.ContentLength != nil {
  150. contentLength = uint64(*response.ContentLength)
  151. }
  152. reader = response.Body
  153. return
  154. }
  155. func (s *S3Storage) Delete(token string, filename string) (err error) {
  156. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  157. deleteRequest := &s3.DeleteObjectInput{
  158. Bucket: aws.String(s.bucket),
  159. Key: aws.String(metadata),
  160. }
  161. _, err = s.s3.DeleteObject(deleteRequest)
  162. if err != nil {
  163. return
  164. }
  165. key := fmt.Sprintf("%s/%s", token, filename)
  166. deleteRequest = &s3.DeleteObjectInput{
  167. Bucket: aws.String(s.bucket),
  168. Key: aws.String(key),
  169. }
  170. _, err = s.s3.DeleteObject(deleteRequest)
  171. return
  172. }
  173. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  174. key := fmt.Sprintf("%s/%s", token, filename)
  175. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  176. var concurrency int
  177. if !s.noMultipart {
  178. concurrency = 20
  179. } else {
  180. concurrency = 1
  181. }
  182. // Create an uploader with the session and custom options
  183. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  184. u.PartSize = (1 << 20) * 5 // The minimum/default allowed part size is 5MB
  185. u.Concurrency = concurrency // default is 5
  186. u.MaxUploadParts = concurrency
  187. u.LeavePartsOnError = false
  188. })
  189. _, err = uploader.Upload(&s3manager.UploadInput{
  190. Bucket: aws.String(s.bucket),
  191. Key: aws.String(key),
  192. Body: reader,
  193. })
  194. return
  195. }
  196. type GDrive struct {
  197. service *drive.Service
  198. rootId string
  199. basedir string
  200. localConfigPath string
  201. chunkSize int
  202. logger *log.Logger
  203. }
  204. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  205. b, err := ioutil.ReadFile(clientJsonFilepath)
  206. if err != nil {
  207. return nil, err
  208. }
  209. // If modifying these scopes, delete your previously saved client_secret.json.
  210. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  211. if err != nil {
  212. return nil, err
  213. }
  214. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  215. if err != nil {
  216. return nil, err
  217. }
  218. chunkSize = chunkSize * 1024 * 1024
  219. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  220. err = storage.setupRoot()
  221. if err != nil {
  222. return nil, err
  223. }
  224. return storage, nil
  225. }
  226. const GDriveRootConfigFile = "root_id.conf"
  227. const GDriveTokenJsonFile = "token.json"
  228. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  229. func (s *GDrive) setupRoot() error {
  230. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  231. rootId, err := ioutil.ReadFile(rootFileConfig)
  232. if err != nil && !os.IsNotExist(err) {
  233. return err
  234. }
  235. if string(rootId) != "" {
  236. s.rootId = string(rootId)
  237. return nil
  238. }
  239. dir := &drive.File{
  240. Name: s.basedir,
  241. MimeType: GDriveDirectoryMimeType,
  242. }
  243. di, err := s.service.Files.Create(dir).Fields("id").Do()
  244. if err != nil {
  245. return err
  246. }
  247. s.rootId = di.Id
  248. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  249. if err != nil {
  250. return err
  251. }
  252. return nil
  253. }
  254. func (s *GDrive) hasChecksum(f *drive.File) bool {
  255. return f.Md5Checksum != ""
  256. }
  257. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  258. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  259. }
  260. func (s *GDrive) findId(filename string, token string) (string, error) {
  261. filename = strings.Replace(filename, `'`, `\'`, -1)
  262. filename = strings.Replace(filename, `"`, `\"`, -1)
  263. fileId, tokenId, nextPageToken := "", "", ""
  264. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  265. l, err := s.list(nextPageToken, q)
  266. if err != nil {
  267. return "", err
  268. }
  269. for 0 < len(l.Files) {
  270. for _, fi := range l.Files {
  271. tokenId = fi.Id
  272. break
  273. }
  274. if l.NextPageToken == "" {
  275. break
  276. }
  277. l, err = s.list(l.NextPageToken, q)
  278. }
  279. if filename == "" {
  280. return tokenId, nil
  281. } else if tokenId == "" {
  282. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  283. }
  284. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  285. l, err = s.list(nextPageToken, q)
  286. if err != nil {
  287. return "", err
  288. }
  289. for 0 < len(l.Files) {
  290. for _, fi := range l.Files {
  291. fileId = fi.Id
  292. break
  293. }
  294. if l.NextPageToken == "" {
  295. break
  296. }
  297. l, err = s.list(l.NextPageToken, q)
  298. }
  299. if fileId == "" {
  300. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  301. }
  302. return fileId, nil
  303. }
  304. func (s *GDrive) Type() string {
  305. return "gdrive"
  306. }
  307. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  308. var fileId string
  309. fileId, err = s.findId(filename, token)
  310. if err != nil {
  311. return
  312. }
  313. var fi *drive.File
  314. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  315. return
  316. }
  317. contentLength = uint64(fi.Size)
  318. contentType = fi.MimeType
  319. return
  320. }
  321. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  322. var fileId string
  323. fileId, err = s.findId(filename, token)
  324. if err != nil {
  325. return
  326. }
  327. var fi *drive.File
  328. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  329. if !s.hasChecksum(fi) {
  330. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  331. return
  332. }
  333. contentLength = uint64(fi.Size)
  334. contentType = fi.MimeType
  335. ctx := context.Background()
  336. var res *http.Response
  337. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  338. if err != nil {
  339. return
  340. }
  341. reader = res.Body
  342. return
  343. }
  344. func (s *GDrive) Delete(token string, filename string) (err error) {
  345. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  346. s.service.Files.Delete(metadata).Do()
  347. var fileId string
  348. fileId, err = s.findId(filename, token)
  349. if err != nil {
  350. return
  351. }
  352. err = s.service.Files.Delete(fileId).Do()
  353. return
  354. }
  355. func (s *GDrive) IsNotExist(err error) bool {
  356. if err != nil {
  357. if e, ok := err.(*googleapi.Error); ok {
  358. return e.Code == http.StatusNotFound
  359. }
  360. }
  361. return false
  362. }
  363. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  364. dirId, err := s.findId("", token)
  365. if err != nil {
  366. return err
  367. }
  368. if dirId == "" {
  369. dir := &drive.File{
  370. Name: token,
  371. Parents: []string{s.rootId},
  372. MimeType: GDriveDirectoryMimeType,
  373. }
  374. di, err := s.service.Files.Create(dir).Fields("id").Do()
  375. if err != nil {
  376. return err
  377. }
  378. dirId = di.Id
  379. }
  380. // Instantiate empty drive file
  381. dst := &drive.File{
  382. Name: filename,
  383. Parents: []string{dirId},
  384. MimeType: contentType,
  385. }
  386. ctx := context.Background()
  387. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  388. if err != nil {
  389. return err
  390. }
  391. return nil
  392. }
  393. // Retrieve a token, saves the token, then returns the generated client.
  394. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  395. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  396. tok, err := gDriveTokenFromFile(tokenFile)
  397. if err != nil {
  398. tok = getGDriveTokenFromWeb(config, logger)
  399. saveGDriveToken(tokenFile, tok, logger)
  400. }
  401. return config.Client(context.Background(), tok)
  402. }
  403. // Request a token from the web, then returns the retrieved token.
  404. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  405. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  406. fmt.Printf("Go to the following link in your browser then type the "+
  407. "authorization code: \n%v\n", authURL)
  408. var authCode string
  409. if _, err := fmt.Scan(&authCode); err != nil {
  410. logger.Fatalf("Unable to read authorization code %v", err)
  411. }
  412. tok, err := config.Exchange(context.TODO(), authCode)
  413. if err != nil {
  414. logger.Fatalf("Unable to retrieve token from web %v", err)
  415. }
  416. return tok
  417. }
  418. // Retrieves a token from a local file.
  419. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  420. f, err := os.Open(file)
  421. defer f.Close()
  422. if err != nil {
  423. return nil, err
  424. }
  425. tok := &oauth2.Token{}
  426. err = json.NewDecoder(f).Decode(tok)
  427. return tok, err
  428. }
  429. // Saves a token to a file path.
  430. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  431. logger.Printf("Saving credential file to: %s\n", path)
  432. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  433. defer f.Close()
  434. if err != nil {
  435. logger.Fatalf("Unable to cache oauth token: %v", err)
  436. }
  437. json.NewEncoder(f).Encode(token)
  438. }