You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

645 lines
15 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "mime"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "github.com/goamz/goamz/s3"
  17. "golang.org/x/net/context"
  18. "golang.org/x/oauth2"
  19. "golang.org/x/oauth2/google"
  20. "google.golang.org/api/drive/v3"
  21. "google.golang.org/api/googleapi"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. logger *log.Logger
  35. }
  36. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  37. return &LocalStorage{basedir: basedir, logger: logger}, nil
  38. }
  39. func (s *LocalStorage) Type() string {
  40. return "local"
  41. }
  42. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  43. path := filepath.Join(s.basedir, token, filename)
  44. var fi os.FileInfo
  45. if fi, err = os.Lstat(path); err != nil {
  46. return
  47. }
  48. contentLength = uint64(fi.Size())
  49. contentType = mime.TypeByExtension(filepath.Ext(filename))
  50. return
  51. }
  52. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  53. path := filepath.Join(s.basedir, token, filename)
  54. // content type , content length
  55. if reader, err = os.Open(path); err != nil {
  56. return
  57. }
  58. var fi os.FileInfo
  59. if fi, err = os.Lstat(path); err != nil {
  60. return
  61. }
  62. contentLength = uint64(fi.Size())
  63. contentType = mime.TypeByExtension(filepath.Ext(filename))
  64. return
  65. }
  66. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  67. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  68. os.Remove(metadata)
  69. path := filepath.Join(s.basedir, token, filename)
  70. err = os.Remove(path)
  71. return
  72. }
  73. func (s *LocalStorage) IsNotExist(err error) bool {
  74. if err == nil {
  75. return false
  76. }
  77. return os.IsNotExist(err)
  78. }
  79. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  80. var f io.WriteCloser
  81. var err error
  82. path := filepath.Join(s.basedir, token)
  83. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  84. return err
  85. }
  86. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. logger *log.Logger
  99. noMultipart bool
  100. }
  101. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger, disableMultipart bool) (*S3Storage, error) {
  102. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return &S3Storage{bucket: bucket, logger: logger, noMultipart: disableMultipart}, nil
  107. }
  108. func (s *S3Storage) Type() string {
  109. return "s3"
  110. }
  111. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  112. key := fmt.Sprintf("%s/%s", token, filename)
  113. // content type , content length
  114. response, err := s.bucket.Head(key, map[string][]string{})
  115. if err != nil {
  116. return
  117. }
  118. contentType = response.Header.Get("Content-Type")
  119. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  120. if err != nil {
  121. return
  122. }
  123. return
  124. }
  125. func (s *S3Storage) IsNotExist(err error) bool {
  126. if err == nil {
  127. return false
  128. }
  129. s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
  130. b := (err.Error() == "The specified key does not exist.")
  131. b = b || (err.Error() == "Access Denied")
  132. return b
  133. }
  134. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  135. key := fmt.Sprintf("%s/%s", token, filename)
  136. // content type , content length
  137. response, err := s.bucket.GetResponse(key)
  138. if err != nil {
  139. return
  140. }
  141. contentType = response.Header.Get("Content-Type")
  142. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  143. if err != nil {
  144. return
  145. }
  146. reader = response.Body
  147. return
  148. }
  149. func (s *S3Storage) Delete(token string, filename string) (err error) {
  150. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  151. s.bucket.Del(metadata)
  152. key := fmt.Sprintf("%s/%s", token, filename)
  153. err = s.bucket.Del(key)
  154. return
  155. }
  156. func (s *S3Storage) putMulti(key string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  157. var (
  158. multi *s3.Multi
  159. parts []s3.Part
  160. )
  161. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  162. s.logger.Printf(err.Error())
  163. return
  164. }
  165. // 20 mb parts
  166. partsChan := make(chan interface{})
  167. // partsChan := make(chan s3.Part)
  168. go func() {
  169. // maximize to 20 threads
  170. sem := make(chan int, 20)
  171. index := 1
  172. var wg sync.WaitGroup
  173. for {
  174. // buffered in memory because goamz s3 multi needs seekable reader
  175. var (
  176. buffer []byte = make([]byte, (1<<20)*10)
  177. count int
  178. err error
  179. )
  180. // Amazon expects parts of at least 5MB, except for the last one
  181. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  182. s.logger.Printf(err.Error())
  183. return
  184. }
  185. // always send minimal 1 part
  186. if err == io.EOF && index > 1 {
  187. s.logger.Printf("Waiting for all parts to finish uploading.")
  188. // wait for all parts to be finished uploading
  189. wg.Wait()
  190. // and close the channel
  191. close(partsChan)
  192. return
  193. }
  194. wg.Add(1)
  195. sem <- 1
  196. // using goroutines because of retries when upload fails
  197. go func(multi *s3.Multi, buffer []byte, index int) {
  198. s.logger.Printf("Uploading part %d %d", index, len(buffer))
  199. defer func() {
  200. s.logger.Printf("Finished part %d %d", index, len(buffer))
  201. wg.Done()
  202. <-sem
  203. }()
  204. partReader := bytes.NewReader(buffer)
  205. var part s3.Part
  206. if part, err = multi.PutPart(index, partReader); err != nil {
  207. s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  208. partsChan <- err
  209. return
  210. }
  211. s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
  212. partsChan <- part
  213. }(multi, buffer[:count], index)
  214. index++
  215. }
  216. }()
  217. // wait for all parts to be uploaded
  218. for part := range partsChan {
  219. switch part.(type) {
  220. case s3.Part:
  221. parts = append(parts, part.(s3.Part))
  222. case error:
  223. // abort multi upload
  224. s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
  225. err = part.(error)
  226. multi.Abort()
  227. return
  228. }
  229. }
  230. s.logger.Printf("Completing upload %d parts", len(parts))
  231. if err = multi.Complete(parts); err != nil {
  232. s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  233. return
  234. }
  235. s.logger.Printf("Completed uploading %d", len(parts))
  236. return
  237. }
  238. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  239. key := fmt.Sprintf("%s/%s", token, filename)
  240. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  241. if !s.noMultipart {
  242. err = s.putMulti(key, reader, contentType, contentLength)
  243. } else {
  244. err = s.bucket.PutReader(key, reader, int64(contentLength), contentType, s3.Private, s3.Options{})
  245. }
  246. if err != nil {
  247. return
  248. }
  249. s.logger.Printf("Completed uploading %s", filename)
  250. return
  251. }
  252. type GDrive struct {
  253. service *drive.Service
  254. rootId string
  255. basedir string
  256. localConfigPath string
  257. chunkSize int
  258. logger *log.Logger
  259. }
  260. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  261. b, err := ioutil.ReadFile(clientJsonFilepath)
  262. if err != nil {
  263. return nil, err
  264. }
  265. // If modifying these scopes, delete your previously saved client_secret.json.
  266. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  267. if err != nil {
  268. return nil, err
  269. }
  270. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  271. if err != nil {
  272. return nil, err
  273. }
  274. chunkSize = chunkSize * 1024 * 1024
  275. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  276. err = storage.setupRoot()
  277. if err != nil {
  278. return nil, err
  279. }
  280. return storage, nil
  281. }
  282. const GDriveRootConfigFile = "root_id.conf"
  283. const GDriveTokenJsonFile = "token.json"
  284. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  285. func (s *GDrive) setupRoot() error {
  286. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  287. rootId, err := ioutil.ReadFile(rootFileConfig)
  288. if err != nil && !os.IsNotExist(err) {
  289. return err
  290. }
  291. if string(rootId) != "" {
  292. s.rootId = string(rootId)
  293. return nil
  294. }
  295. dir := &drive.File{
  296. Name: s.basedir,
  297. MimeType: GDriveDirectoryMimeType,
  298. }
  299. di, err := s.service.Files.Create(dir).Fields("id").Do()
  300. if err != nil {
  301. return err
  302. }
  303. s.rootId = di.Id
  304. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  305. if err != nil {
  306. return err
  307. }
  308. return nil
  309. }
  310. func (s *GDrive) hasChecksum(f *drive.File) bool {
  311. return f.Md5Checksum != ""
  312. }
  313. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  314. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  315. }
  316. func (s *GDrive) findId(filename string, token string) (string, error) {
  317. filename = strings.Replace(filename, `'`, `\'`, -1)
  318. filename = strings.Replace(filename, `"`, `\"`, -1)
  319. fileId, tokenId, nextPageToken := "", "", ""
  320. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  321. l, err := s.list(nextPageToken, q)
  322. if err != nil {
  323. return "", err
  324. }
  325. for 0 < len(l.Files) {
  326. for _, fi := range l.Files {
  327. tokenId = fi.Id
  328. break
  329. }
  330. if l.NextPageToken == "" {
  331. break
  332. }
  333. l, err = s.list(l.NextPageToken, q)
  334. }
  335. if filename == "" {
  336. return tokenId, nil
  337. } else if tokenId == "" {
  338. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  339. }
  340. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  341. l, err = s.list(nextPageToken, q)
  342. if err != nil {
  343. return "", err
  344. }
  345. for 0 < len(l.Files) {
  346. for _, fi := range l.Files {
  347. fileId = fi.Id
  348. break
  349. }
  350. if l.NextPageToken == "" {
  351. break
  352. }
  353. l, err = s.list(l.NextPageToken, q)
  354. }
  355. if fileId == "" {
  356. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  357. }
  358. return fileId, nil
  359. }
  360. func (s *GDrive) Type() string {
  361. return "gdrive"
  362. }
  363. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  364. var fileId string
  365. fileId, err = s.findId(filename, token)
  366. if err != nil {
  367. return
  368. }
  369. var fi *drive.File
  370. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  371. return
  372. }
  373. contentLength = uint64(fi.Size)
  374. contentType = fi.MimeType
  375. return
  376. }
  377. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  378. var fileId string
  379. fileId, err = s.findId(filename, token)
  380. if err != nil {
  381. return
  382. }
  383. var fi *drive.File
  384. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  385. if !s.hasChecksum(fi) {
  386. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  387. return
  388. }
  389. contentLength = uint64(fi.Size)
  390. contentType = fi.MimeType
  391. ctx := context.Background()
  392. var res *http.Response
  393. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  394. if err != nil {
  395. return
  396. }
  397. reader = res.Body
  398. return
  399. }
  400. func (s *GDrive) Delete(token string, filename string) (err error) {
  401. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  402. s.service.Files.Delete(metadata).Do()
  403. var fileId string
  404. fileId, err = s.findId(filename, token)
  405. if err != nil {
  406. return
  407. }
  408. err = s.service.Files.Delete(fileId).Do()
  409. return
  410. }
  411. func (s *GDrive) IsNotExist(err error) bool {
  412. if err == nil {
  413. return false
  414. }
  415. if err != nil {
  416. if e, ok := err.(*googleapi.Error); ok {
  417. return e.Code == http.StatusNotFound
  418. }
  419. }
  420. return false
  421. }
  422. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  423. dirId, err := s.findId("", token)
  424. if err != nil {
  425. return err
  426. }
  427. if dirId == "" {
  428. dir := &drive.File{
  429. Name: token,
  430. Parents: []string{s.rootId},
  431. MimeType: GDriveDirectoryMimeType,
  432. }
  433. di, err := s.service.Files.Create(dir).Fields("id").Do()
  434. if err != nil {
  435. return err
  436. }
  437. dirId = di.Id
  438. }
  439. // Instantiate empty drive file
  440. dst := &drive.File{
  441. Name: filename,
  442. Parents: []string{dirId},
  443. MimeType: contentType,
  444. }
  445. ctx := context.Background()
  446. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  447. if err != nil {
  448. return err
  449. }
  450. return nil
  451. }
  452. // Retrieve a token, saves the token, then returns the generated client.
  453. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  454. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  455. tok, err := gDriveTokenFromFile(tokenFile)
  456. if err != nil {
  457. tok = getGDriveTokenFromWeb(config, logger)
  458. saveGDriveToken(tokenFile, tok, logger)
  459. }
  460. return config.Client(context.Background(), tok)
  461. }
  462. // Request a token from the web, then returns the retrieved token.
  463. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  464. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  465. fmt.Printf("Go to the following link in your browser then type the "+
  466. "authorization code: \n%v\n", authURL)
  467. var authCode string
  468. if _, err := fmt.Scan(&authCode); err != nil {
  469. logger.Fatalf("Unable to read authorization code %v", err)
  470. }
  471. tok, err := config.Exchange(context.TODO(), authCode)
  472. if err != nil {
  473. logger.Fatalf("Unable to retrieve token from web %v", err)
  474. }
  475. return tok
  476. }
  477. // Retrieves a token from a local file.
  478. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  479. f, err := os.Open(file)
  480. defer f.Close()
  481. if err != nil {
  482. return nil, err
  483. }
  484. tok := &oauth2.Token{}
  485. err = json.NewDecoder(f).Decode(tok)
  486. return tok, err
  487. }
  488. // Saves a token to a file path.
  489. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  490. logger.Printf("Saving credential file to: %s\n", path)
  491. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  492. defer f.Close()
  493. if err != nil {
  494. logger.Fatalf("Unable to cache oauth token: %v", err)
  495. }
  496. json.NewEncoder(f).Encode(token)
  497. }