You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

646 lines
15 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "mime"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "github.com/goamz/goamz/s3"
  17. "golang.org/x/net/context"
  18. "golang.org/x/oauth2"
  19. "golang.org/x/oauth2/google"
  20. "google.golang.org/api/drive/v3"
  21. "google.golang.org/api/googleapi"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. logger *log.Logger
  35. }
  36. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  37. return &LocalStorage{basedir: basedir, logger: logger}, nil
  38. }
  39. func (s *LocalStorage) Type() string {
  40. return "local"
  41. }
  42. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  43. path := filepath.Join(s.basedir, token, filename)
  44. var fi os.FileInfo
  45. if fi, err = os.Lstat(path); err != nil {
  46. return
  47. }
  48. contentLength = uint64(fi.Size())
  49. contentType = mime.TypeByExtension(filepath.Ext(filename))
  50. return
  51. }
  52. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  53. path := filepath.Join(s.basedir, token, filename)
  54. // content type , content length
  55. if reader, err = os.Open(path); err != nil {
  56. return
  57. }
  58. var fi os.FileInfo
  59. if fi, err = os.Lstat(path); err != nil {
  60. return
  61. }
  62. contentLength = uint64(fi.Size())
  63. contentType = mime.TypeByExtension(filepath.Ext(filename))
  64. return
  65. }
  66. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  67. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  68. os.Remove(metadata)
  69. path := filepath.Join(s.basedir, token, filename)
  70. err = os.Remove(path)
  71. return
  72. }
  73. func (s *LocalStorage) IsNotExist(err error) bool {
  74. if err == nil {
  75. return false
  76. }
  77. return os.IsNotExist(err)
  78. }
  79. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  80. var f io.WriteCloser
  81. var err error
  82. path := filepath.Join(s.basedir, token)
  83. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  84. return err
  85. }
  86. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. logger *log.Logger
  99. noMultipart bool
  100. }
  101. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger, multipart bool) (*S3Storage, error) {
  102. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return &S3Storage{bucket: bucket, logger: logger, noMultipart: multipart}, nil
  107. }
  108. func (s *S3Storage) Type() string {
  109. return "s3"
  110. }
  111. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  112. key := fmt.Sprintf("%s/%s", token, filename)
  113. // content type , content length
  114. response, err := s.bucket.Head(key, map[string][]string{})
  115. if err != nil {
  116. return
  117. }
  118. contentType = response.Header.Get("Content-Type")
  119. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  120. if err != nil {
  121. return
  122. }
  123. return
  124. }
  125. func (s *S3Storage) IsNotExist(err error) bool {
  126. if err == nil {
  127. return false
  128. }
  129. s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
  130. b := (err.Error() == "The specified key does not exist.")
  131. b = b || (err.Error() == "Access Denied")
  132. return b
  133. }
  134. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  135. key := fmt.Sprintf("%s/%s", token, filename)
  136. // content type , content length
  137. response, err := s.bucket.GetResponse(key)
  138. if err != nil {
  139. return
  140. }
  141. contentType = response.Header.Get("Content-Type")
  142. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  143. if err != nil {
  144. return
  145. }
  146. reader = response.Body
  147. return
  148. }
  149. func (s *S3Storage) Delete(token string, filename string) (err error) {
  150. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  151. s.bucket.Del(metadata)
  152. key := fmt.Sprintf("%s/%s", token, filename)
  153. err = s.bucket.Del(key)
  154. return
  155. }
  156. func (s *S3Storage) PutMulti(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  157. key := fmt.Sprintf("%s/%s", token, filename)
  158. var (
  159. multi *s3.Multi
  160. parts []s3.Part
  161. )
  162. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  163. s.logger.Printf(err.Error())
  164. return
  165. }
  166. // 20 mb parts
  167. partsChan := make(chan interface{})
  168. // partsChan := make(chan s3.Part)
  169. go func() {
  170. // maximize to 20 threads
  171. sem := make(chan int, 20)
  172. index := 1
  173. var wg sync.WaitGroup
  174. for {
  175. // buffered in memory because goamz s3 multi needs seekable reader
  176. var (
  177. buffer []byte = make([]byte, (1<<20)*10)
  178. count int
  179. err error
  180. )
  181. // Amazon expects parts of at least 5MB, except for the last one
  182. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  183. s.logger.Printf(err.Error())
  184. return
  185. }
  186. // always send minimal 1 part
  187. if err == io.EOF && index > 1 {
  188. s.logger.Printf("Waiting for all parts to finish uploading.")
  189. // wait for all parts to be finished uploading
  190. wg.Wait()
  191. // and close the channel
  192. close(partsChan)
  193. return
  194. }
  195. wg.Add(1)
  196. sem <- 1
  197. // using goroutines because of retries when upload fails
  198. go func(multi *s3.Multi, buffer []byte, index int) {
  199. s.logger.Printf("Uploading part %d %d", index, len(buffer))
  200. defer func() {
  201. s.logger.Printf("Finished part %d %d", index, len(buffer))
  202. wg.Done()
  203. <-sem
  204. }()
  205. partReader := bytes.NewReader(buffer)
  206. var part s3.Part
  207. if part, err = multi.PutPart(index, partReader); err != nil {
  208. s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  209. partsChan <- err
  210. return
  211. }
  212. s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
  213. partsChan <- part
  214. }(multi, buffer[:count], index)
  215. index++
  216. }
  217. }()
  218. // wait for all parts to be uploaded
  219. for part := range partsChan {
  220. switch part.(type) {
  221. case s3.Part:
  222. parts = append(parts, part.(s3.Part))
  223. case error:
  224. // abort multi upload
  225. s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
  226. err = part.(error)
  227. multi.Abort()
  228. return
  229. }
  230. }
  231. s.logger.Printf("Completing upload %d parts", len(parts))
  232. if err = multi.Complete(parts); err != nil {
  233. s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  234. return
  235. }
  236. s.logger.Printf("Completed uploading %d", len(parts))
  237. return
  238. }
  239. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  240. key := fmt.Sprintf("%s/%s", token, filename)
  241. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  242. if s.noMultipart {
  243. err = s.PutMulti(token, filename, reader, contentType, contentLength)
  244. } else {
  245. err = s.bucket.PutReader(key, reader, int64(contentLength), contentType, s3.Private, s3.Options{})
  246. }
  247. if err != nil {
  248. return
  249. }
  250. s.logger.Printf("Completed uploading %s", filename)
  251. return
  252. }
  253. type GDrive struct {
  254. service *drive.Service
  255. rootId string
  256. basedir string
  257. localConfigPath string
  258. chunkSize int
  259. logger *log.Logger
  260. }
  261. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  262. b, err := ioutil.ReadFile(clientJsonFilepath)
  263. if err != nil {
  264. return nil, err
  265. }
  266. // If modifying these scopes, delete your previously saved client_secret.json.
  267. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  268. if err != nil {
  269. return nil, err
  270. }
  271. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  272. if err != nil {
  273. return nil, err
  274. }
  275. chunkSize = chunkSize * 1024 * 1024
  276. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  277. err = storage.setupRoot()
  278. if err != nil {
  279. return nil, err
  280. }
  281. return storage, nil
  282. }
  283. const GDriveRootConfigFile = "root_id.conf"
  284. const GDriveTokenJsonFile = "token.json"
  285. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  286. func (s *GDrive) setupRoot() error {
  287. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  288. rootId, err := ioutil.ReadFile(rootFileConfig)
  289. if err != nil && !os.IsNotExist(err) {
  290. return err
  291. }
  292. if string(rootId) != "" {
  293. s.rootId = string(rootId)
  294. return nil
  295. }
  296. dir := &drive.File{
  297. Name: s.basedir,
  298. MimeType: GDriveDirectoryMimeType,
  299. }
  300. di, err := s.service.Files.Create(dir).Fields("id").Do()
  301. if err != nil {
  302. return err
  303. }
  304. s.rootId = di.Id
  305. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  306. if err != nil {
  307. return err
  308. }
  309. return nil
  310. }
  311. func (s *GDrive) hasChecksum(f *drive.File) bool {
  312. return f.Md5Checksum != ""
  313. }
  314. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  315. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  316. }
  317. func (s *GDrive) findId(filename string, token string) (string, error) {
  318. filename = strings.Replace(filename, `'`, `\'`, -1)
  319. filename = strings.Replace(filename, `"`, `\"`, -1)
  320. fileId, tokenId, nextPageToken := "", "", ""
  321. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  322. l, err := s.list(nextPageToken, q)
  323. if err != nil {
  324. return "", err
  325. }
  326. for 0 < len(l.Files) {
  327. for _, fi := range l.Files {
  328. tokenId = fi.Id
  329. break
  330. }
  331. if l.NextPageToken == "" {
  332. break
  333. }
  334. l, err = s.list(l.NextPageToken, q)
  335. }
  336. if filename == "" {
  337. return tokenId, nil
  338. } else if tokenId == "" {
  339. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  340. }
  341. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  342. l, err = s.list(nextPageToken, q)
  343. if err != nil {
  344. return "", err
  345. }
  346. for 0 < len(l.Files) {
  347. for _, fi := range l.Files {
  348. fileId = fi.Id
  349. break
  350. }
  351. if l.NextPageToken == "" {
  352. break
  353. }
  354. l, err = s.list(l.NextPageToken, q)
  355. }
  356. if fileId == "" {
  357. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  358. }
  359. return fileId, nil
  360. }
  361. func (s *GDrive) Type() string {
  362. return "gdrive"
  363. }
  364. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  365. var fileId string
  366. fileId, err = s.findId(filename, token)
  367. if err != nil {
  368. return
  369. }
  370. var fi *drive.File
  371. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  372. return
  373. }
  374. contentLength = uint64(fi.Size)
  375. contentType = fi.MimeType
  376. return
  377. }
  378. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  379. var fileId string
  380. fileId, err = s.findId(filename, token)
  381. if err != nil {
  382. return
  383. }
  384. var fi *drive.File
  385. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  386. if !s.hasChecksum(fi) {
  387. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  388. return
  389. }
  390. contentLength = uint64(fi.Size)
  391. contentType = fi.MimeType
  392. ctx := context.Background()
  393. var res *http.Response
  394. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  395. if err != nil {
  396. return
  397. }
  398. reader = res.Body
  399. return
  400. }
  401. func (s *GDrive) Delete(token string, filename string) (err error) {
  402. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  403. s.service.Files.Delete(metadata).Do()
  404. var fileId string
  405. fileId, err = s.findId(filename, token)
  406. if err != nil {
  407. return
  408. }
  409. err = s.service.Files.Delete(fileId).Do()
  410. return
  411. }
  412. func (s *GDrive) IsNotExist(err error) bool {
  413. if err == nil {
  414. return false
  415. }
  416. if err != nil {
  417. if e, ok := err.(*googleapi.Error); ok {
  418. return e.Code == http.StatusNotFound
  419. }
  420. }
  421. return false
  422. }
  423. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  424. dirId, err := s.findId("", token)
  425. if err != nil {
  426. return err
  427. }
  428. if dirId == "" {
  429. dir := &drive.File{
  430. Name: token,
  431. Parents: []string{s.rootId},
  432. MimeType: GDriveDirectoryMimeType,
  433. }
  434. di, err := s.service.Files.Create(dir).Fields("id").Do()
  435. if err != nil {
  436. return err
  437. }
  438. dirId = di.Id
  439. }
  440. // Instantiate empty drive file
  441. dst := &drive.File{
  442. Name: filename,
  443. Parents: []string{dirId},
  444. MimeType: contentType,
  445. }
  446. ctx := context.Background()
  447. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  448. if err != nil {
  449. return err
  450. }
  451. return nil
  452. }
  453. // Retrieve a token, saves the token, then returns the generated client.
  454. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  455. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  456. tok, err := gDriveTokenFromFile(tokenFile)
  457. if err != nil {
  458. tok = getGDriveTokenFromWeb(config, logger)
  459. saveGDriveToken(tokenFile, tok, logger)
  460. }
  461. return config.Client(context.Background(), tok)
  462. }
  463. // Request a token from the web, then returns the retrieved token.
  464. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  465. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  466. fmt.Printf("Go to the following link in your browser then type the "+
  467. "authorization code: \n%v\n", authURL)
  468. var authCode string
  469. if _, err := fmt.Scan(&authCode); err != nil {
  470. logger.Fatalf("Unable to read authorization code %v", err)
  471. }
  472. tok, err := config.Exchange(context.TODO(), authCode)
  473. if err != nil {
  474. logger.Fatalf("Unable to retrieve token from web %v", err)
  475. }
  476. return tok
  477. }
  478. // Retrieves a token from a local file.
  479. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  480. f, err := os.Open(file)
  481. defer f.Close()
  482. if err != nil {
  483. return nil, err
  484. }
  485. tok := &oauth2.Token{}
  486. err = json.NewDecoder(f).Decode(tok)
  487. return tok, err
  488. }
  489. // Saves a token to a file path.
  490. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  491. logger.Printf("Saving credential file to: %s\n", path)
  492. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  493. defer f.Close()
  494. if err != nil {
  495. logger.Fatalf("Unable to cache oauth token: %v", err)
  496. }
  497. json.NewEncoder(f).Encode(token)
  498. }