You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

644 lines
15 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "encoding/json"
  13. "github.com/goamz/goamz/s3"
  14. "golang.org/x/net/context"
  15. "golang.org/x/oauth2"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "io/ioutil"
  20. "net/http"
  21. "strings"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. PutMulti(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  28. Delete(token string, filename string) error
  29. IsNotExist(err error) bool
  30. Type() string
  31. }
  32. type LocalStorage struct {
  33. Storage
  34. basedir string
  35. logger *log.Logger
  36. }
  37. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  38. return &LocalStorage{basedir: basedir, logger: logger}, nil
  39. }
  40. func (s *LocalStorage) Type() string {
  41. return "local"
  42. }
  43. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  44. path := filepath.Join(s.basedir, token, filename)
  45. var fi os.FileInfo
  46. if fi, err = os.Lstat(path); err != nil {
  47. return
  48. }
  49. contentLength = uint64(fi.Size())
  50. contentType = mime.TypeByExtension(filepath.Ext(filename))
  51. return
  52. }
  53. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  54. path := filepath.Join(s.basedir, token, filename)
  55. // content type , content length
  56. if reader, err = os.Open(path); err != nil {
  57. return
  58. }
  59. var fi os.FileInfo
  60. if fi, err = os.Lstat(path); err != nil {
  61. return
  62. }
  63. contentLength = uint64(fi.Size())
  64. contentType = mime.TypeByExtension(filepath.Ext(filename))
  65. return
  66. }
  67. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  68. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  69. os.Remove(metadata)
  70. path := filepath.Join(s.basedir, token, filename)
  71. err = os.Remove(path)
  72. return
  73. }
  74. func (s *LocalStorage) IsNotExist(err error) bool {
  75. if err == nil {
  76. return false
  77. }
  78. return os.IsNotExist(err)
  79. }
  80. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  81. var f io.WriteCloser
  82. var err error
  83. path := filepath.Join(s.basedir, token)
  84. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  85. return err
  86. }
  87. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  88. return err
  89. }
  90. defer f.Close()
  91. if _, err = io.Copy(f, reader); err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. type S3Storage struct {
  97. Storage
  98. bucket *s3.Bucket
  99. logger *log.Logger
  100. }
  101. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger) (*S3Storage, error) {
  102. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return &S3Storage{bucket: bucket, logger: logger}, nil
  107. }
  108. func (s *S3Storage) Type() string {
  109. return "s3"
  110. }
  111. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  112. key := fmt.Sprintf("%s/%s", token, filename)
  113. // content type , content length
  114. response, err := s.bucket.Head(key, map[string][]string{})
  115. if err != nil {
  116. return
  117. }
  118. contentType = response.Header.Get("Content-Type")
  119. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  120. if err != nil {
  121. return
  122. }
  123. return
  124. }
  125. func (s *S3Storage) IsNotExist(err error) bool {
  126. if err == nil {
  127. return false
  128. }
  129. s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
  130. b := (err.Error() == "The specified key does not exist.")
  131. b = b || (err.Error() == "Access Denied")
  132. return b
  133. }
  134. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  135. key := fmt.Sprintf("%s/%s", token, filename)
  136. // content type , content length
  137. response, err := s.bucket.GetResponse(key)
  138. if err != nil {
  139. return
  140. }
  141. contentType = response.Header.Get("Content-Type")
  142. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  143. if err != nil {
  144. return
  145. }
  146. reader = response.Body
  147. return
  148. }
  149. func (s *S3Storage) Delete(token string, filename string) (err error) {
  150. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  151. s.bucket.Del(metadata)
  152. key := fmt.Sprintf("%s/%s", token, filename)
  153. err = s.bucket.Del(key)
  154. return
  155. }
  156. func (s *S3Storage) PutMulti(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  157. key := fmt.Sprintf("%s/%s", token, filename)
  158. var (
  159. multi *s3.Multi
  160. parts []s3.Part
  161. )
  162. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  163. s.logger.Printf(err.Error())
  164. return
  165. }
  166. // 20 mb parts
  167. partsChan := make(chan interface{})
  168. // partsChan := make(chan s3.Part)
  169. go func() {
  170. // maximize to 20 threads
  171. sem := make(chan int, 20)
  172. index := 1
  173. var wg sync.WaitGroup
  174. for {
  175. // buffered in memory because goamz s3 multi needs seekable reader
  176. var (
  177. buffer []byte = make([]byte, (1<<20)*10)
  178. count int
  179. err error
  180. )
  181. // Amazon expects parts of at least 5MB, except for the last one
  182. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  183. s.logger.Printf(err.Error())
  184. return
  185. }
  186. // always send minimal 1 part
  187. if err == io.EOF && index > 1 {
  188. s.logger.Printf("Waiting for all parts to finish uploading.")
  189. // wait for all parts to be finished uploading
  190. wg.Wait()
  191. // and close the channel
  192. close(partsChan)
  193. return
  194. }
  195. wg.Add(1)
  196. sem <- 1
  197. // using goroutines because of retries when upload fails
  198. go func(multi *s3.Multi, buffer []byte, index int) {
  199. s.logger.Printf("Uploading part %d %d", index, len(buffer))
  200. defer func() {
  201. s.logger.Printf("Finished part %d %d", index, len(buffer))
  202. wg.Done()
  203. <-sem
  204. }()
  205. partReader := bytes.NewReader(buffer)
  206. var part s3.Part
  207. if part, err = multi.PutPart(index, partReader); err != nil {
  208. s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  209. partsChan <- err
  210. return
  211. }
  212. s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
  213. partsChan <- part
  214. }(multi, buffer[:count], index)
  215. index++
  216. }
  217. }()
  218. // wait for all parts to be uploaded
  219. for part := range partsChan {
  220. switch part.(type) {
  221. case s3.Part:
  222. parts = append(parts, part.(s3.Part))
  223. case error:
  224. // abort multi upload
  225. s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
  226. err = part.(error)
  227. multi.Abort()
  228. return
  229. }
  230. }
  231. s.logger.Printf("Completing upload %d parts", len(parts))
  232. if err = multi.Complete(parts); err != nil {
  233. s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  234. return
  235. }
  236. s.logger.Printf("Completed uploading %d", len(parts))
  237. return
  238. }
  239. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  240. key := fmt.Sprintf("%s/%s", token, filename)
  241. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  242. err = s.bucket.PutReader(key, reader, contentType, s3.Private, s3.Options{})
  243. if err != nil {
  244. return
  245. }
  246. s.logger.Printf("Completed uploading %s", filename)
  247. return
  248. }
  249. type GDrive struct {
  250. service *drive.Service
  251. rootId string
  252. basedir string
  253. localConfigPath string
  254. chunkSize int
  255. logger *log.Logger
  256. }
  257. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  258. b, err := ioutil.ReadFile(clientJsonFilepath)
  259. if err != nil {
  260. return nil, err
  261. }
  262. // If modifying these scopes, delete your previously saved client_secret.json.
  263. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  264. if err != nil {
  265. return nil, err
  266. }
  267. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  268. if err != nil {
  269. return nil, err
  270. }
  271. chunkSize = chunkSize * 1024 * 1024
  272. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  273. err = storage.setupRoot()
  274. if err != nil {
  275. return nil, err
  276. }
  277. return storage, nil
  278. }
  279. const GDriveRootConfigFile = "root_id.conf"
  280. const GDriveTokenJsonFile = "token.json"
  281. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  282. func (s *GDrive) setupRoot() error {
  283. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  284. rootId, err := ioutil.ReadFile(rootFileConfig)
  285. if err != nil && !os.IsNotExist(err) {
  286. return err
  287. }
  288. if string(rootId) != "" {
  289. s.rootId = string(rootId)
  290. return nil
  291. }
  292. dir := &drive.File{
  293. Name: s.basedir,
  294. MimeType: GDriveDirectoryMimeType,
  295. }
  296. di, err := s.service.Files.Create(dir).Fields("id").Do()
  297. if err != nil {
  298. return err
  299. }
  300. s.rootId = di.Id
  301. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  302. if err != nil {
  303. return err
  304. }
  305. return nil
  306. }
  307. func (s *GDrive) hasChecksum(f *drive.File) bool {
  308. return f.Md5Checksum != ""
  309. }
  310. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  311. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  312. }
  313. func (s *GDrive) findId(filename string, token string) (string, error) {
  314. filename = strings.Replace(filename, `'`, `\'`, -1)
  315. filename = strings.Replace(filename, `"`, `\"`, -1)
  316. fileId, tokenId, nextPageToken := "", "", ""
  317. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  318. l, err := s.list(nextPageToken, q)
  319. if err != nil {
  320. return "", err
  321. }
  322. for 0 < len(l.Files) {
  323. for _, fi := range l.Files {
  324. tokenId = fi.Id
  325. break
  326. }
  327. if l.NextPageToken == "" {
  328. break
  329. }
  330. l, err = s.list(l.NextPageToken, q)
  331. }
  332. if filename == "" {
  333. return tokenId, nil
  334. } else if tokenId == "" {
  335. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  336. }
  337. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  338. l, err = s.list(nextPageToken, q)
  339. if err != nil {
  340. return "", err
  341. }
  342. for 0 < len(l.Files) {
  343. for _, fi := range l.Files {
  344. fileId = fi.Id
  345. break
  346. }
  347. if l.NextPageToken == "" {
  348. break
  349. }
  350. l, err = s.list(l.NextPageToken, q)
  351. }
  352. if fileId == "" {
  353. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  354. }
  355. return fileId, nil
  356. }
  357. func (s *GDrive) Type() string {
  358. return "gdrive"
  359. }
  360. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  361. var fileId string
  362. fileId, err = s.findId(filename, token)
  363. if err != nil {
  364. return
  365. }
  366. var fi *drive.File
  367. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  368. return
  369. }
  370. contentLength = uint64(fi.Size)
  371. contentType = fi.MimeType
  372. return
  373. }
  374. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  375. var fileId string
  376. fileId, err = s.findId(filename, token)
  377. if err != nil {
  378. return
  379. }
  380. var fi *drive.File
  381. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  382. if !s.hasChecksum(fi) {
  383. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  384. return
  385. }
  386. contentLength = uint64(fi.Size)
  387. contentType = fi.MimeType
  388. ctx := context.Background()
  389. var res *http.Response
  390. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  391. if err != nil {
  392. return
  393. }
  394. reader = res.Body
  395. return
  396. }
  397. func (s *GDrive) Delete(token string, filename string) (err error) {
  398. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  399. s.service.Files.Delete(metadata).Do()
  400. var fileId string
  401. fileId, err = s.findId(filename, token)
  402. if err != nil {
  403. return
  404. }
  405. err = s.service.Files.Delete(fileId).Do()
  406. return
  407. }
  408. func (s *GDrive) IsNotExist(err error) bool {
  409. if err == nil {
  410. return false
  411. }
  412. if err != nil {
  413. if e, ok := err.(*googleapi.Error); ok {
  414. return e.Code == http.StatusNotFound
  415. }
  416. }
  417. return false
  418. }
  419. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  420. dirId, err := s.findId("", token)
  421. if err != nil {
  422. return err
  423. }
  424. if dirId == "" {
  425. dir := &drive.File{
  426. Name: token,
  427. Parents: []string{s.rootId},
  428. MimeType: GDriveDirectoryMimeType,
  429. }
  430. di, err := s.service.Files.Create(dir).Fields("id").Do()
  431. if err != nil {
  432. return err
  433. }
  434. dirId = di.Id
  435. }
  436. // Instantiate empty drive file
  437. dst := &drive.File{
  438. Name: filename,
  439. Parents: []string{dirId},
  440. MimeType: contentType,
  441. }
  442. ctx := context.Background()
  443. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  444. if err != nil {
  445. return err
  446. }
  447. return nil
  448. }
  449. // Retrieve a token, saves the token, then returns the generated client.
  450. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  451. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  452. tok, err := gDriveTokenFromFile(tokenFile)
  453. if err != nil {
  454. tok = getGDriveTokenFromWeb(config, logger)
  455. saveGDriveToken(tokenFile, tok, logger)
  456. }
  457. return config.Client(context.Background(), tok)
  458. }
  459. // Request a token from the web, then returns the retrieved token.
  460. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  461. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  462. fmt.Printf("Go to the following link in your browser then type the "+
  463. "authorization code: \n%v\n", authURL)
  464. var authCode string
  465. if _, err := fmt.Scan(&authCode); err != nil {
  466. logger.Fatalf("Unable to read authorization code %v", err)
  467. }
  468. tok, err := config.Exchange(context.TODO(), authCode)
  469. if err != nil {
  470. logger.Fatalf("Unable to retrieve token from web %v", err)
  471. }
  472. return tok
  473. }
  474. // Retrieves a token from a local file.
  475. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  476. f, err := os.Open(file)
  477. defer f.Close()
  478. if err != nil {
  479. return nil, err
  480. }
  481. tok := &oauth2.Token{}
  482. err = json.NewDecoder(f).Decode(tok)
  483. return tok, err
  484. }
  485. // Saves a token to a file path.
  486. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  487. logger.Printf("Saving credential file to: %s\n", path)
  488. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  489. defer f.Close()
  490. if err != nil {
  491. logger.Fatalf("Unable to cache oauth token: %v", err)
  492. }
  493. json.NewEncoder(f).Encode(token)
  494. }