You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

723 lines
16 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "github.com/goamz/goamz/s3"
  13. "encoding/json"
  14. "golang.org/x/oauth2"
  15. "golang.org/x/net/context"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "net/http"
  20. "io/ioutil"
  21. "time"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. }
  35. func NewLocalStorage(basedir string) (*LocalStorage, error) {
  36. return &LocalStorage{basedir: basedir}, nil
  37. }
  38. func (s *LocalStorage) Type() string {
  39. return "local"
  40. }
  41. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  42. path := filepath.Join(s.basedir, token, filename)
  43. var fi os.FileInfo
  44. if fi, err = os.Lstat(path); err != nil {
  45. return
  46. }
  47. contentLength = uint64(fi.Size())
  48. contentType = mime.TypeByExtension(filepath.Ext(filename))
  49. return
  50. }
  51. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  52. path := filepath.Join(s.basedir, token, filename)
  53. // content type , content length
  54. if reader, err = os.Open(path); err != nil {
  55. return
  56. }
  57. var fi os.FileInfo
  58. if fi, err = os.Lstat(path); err != nil {
  59. return
  60. }
  61. contentLength = uint64(fi.Size())
  62. contentType = mime.TypeByExtension(filepath.Ext(filename))
  63. return
  64. }
  65. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  66. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  67. os.Remove(metadata);
  68. path := filepath.Join(s.basedir, token, filename)
  69. err = os.Remove(path);
  70. return
  71. }
  72. func (s *LocalStorage) IsNotExist(err error) bool {
  73. if err == nil {
  74. return false
  75. }
  76. return os.IsNotExist(err)
  77. }
  78. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  79. var f io.WriteCloser
  80. var err error
  81. path := filepath.Join(s.basedir, token)
  82. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  83. return err
  84. }
  85. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  86. fmt.Printf("%s", err)
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. }
  99. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string) (*S3Storage, error) {
  100. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  101. if err != nil {
  102. return nil, err
  103. }
  104. return &S3Storage{bucket: bucket}, nil
  105. }
  106. func (s *S3Storage) Type() string {
  107. return "s3"
  108. }
  109. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  110. key := fmt.Sprintf("%s/%s", token, filename)
  111. // content type , content length
  112. response, err := s.bucket.Head(key, map[string][]string{})
  113. if err != nil {
  114. return
  115. }
  116. contentType = response.Header.Get("Content-Type")
  117. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  118. if err != nil {
  119. return
  120. }
  121. return
  122. }
  123. func (s *S3Storage) IsNotExist(err error) bool {
  124. if err == nil {
  125. return false
  126. }
  127. log.Printf("IsNotExist: %s, %#v", err.Error(), err)
  128. b := (err.Error() == "The specified key does not exist.")
  129. b = b || (err.Error() == "Access Denied")
  130. return b
  131. }
  132. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  133. key := fmt.Sprintf("%s/%s", token, filename)
  134. // content type , content length
  135. response, err := s.bucket.GetResponse(key)
  136. if err != nil {
  137. return
  138. }
  139. contentType = response.Header.Get("Content-Type")
  140. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  141. if err != nil {
  142. return
  143. }
  144. reader = response.Body
  145. return
  146. }
  147. func (s *S3Storage) Delete(token string, filename string) (err error) {
  148. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  149. s.bucket.Del(metadata)
  150. key := fmt.Sprintf("%s/%s", token, filename)
  151. err = s.bucket.Del(key)
  152. return
  153. }
  154. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  155. key := fmt.Sprintf("%s/%s", token, filename)
  156. var (
  157. multi *s3.Multi
  158. parts []s3.Part
  159. )
  160. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  161. log.Printf(err.Error())
  162. return
  163. }
  164. // 20 mb parts
  165. partsChan := make(chan interface{})
  166. // partsChan := make(chan s3.Part)
  167. go func() {
  168. // maximize to 20 threads
  169. sem := make(chan int, 20)
  170. index := 1
  171. var wg sync.WaitGroup
  172. for {
  173. // buffered in memory because goamz s3 multi needs seekable reader
  174. var (
  175. buffer []byte = make([]byte, (1<<20)*10)
  176. count int
  177. err error
  178. )
  179. // Amazon expects parts of at least 5MB, except for the last one
  180. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  181. log.Printf(err.Error())
  182. return
  183. }
  184. // always send minimal 1 part
  185. if err == io.EOF && index > 1 {
  186. log.Printf("Waiting for all parts to finish uploading.")
  187. // wait for all parts to be finished uploading
  188. wg.Wait()
  189. // and close the channel
  190. close(partsChan)
  191. return
  192. }
  193. wg.Add(1)
  194. sem <- 1
  195. // using goroutines because of retries when upload fails
  196. go func(multi *s3.Multi, buffer []byte, index int) {
  197. log.Printf("Uploading part %d %d", index, len(buffer))
  198. defer func() {
  199. log.Printf("Finished part %d %d", index, len(buffer))
  200. wg.Done()
  201. <-sem
  202. }()
  203. partReader := bytes.NewReader(buffer)
  204. var part s3.Part
  205. if part, err = multi.PutPart(index, partReader); err != nil {
  206. log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  207. partsChan <- err
  208. return
  209. }
  210. log.Printf("Finished uploading part %d %d", index, len(buffer))
  211. partsChan <- part
  212. }(multi, buffer[:count], index)
  213. index++
  214. }
  215. }()
  216. // wait for all parts to be uploaded
  217. for part := range partsChan {
  218. switch part.(type) {
  219. case s3.Part:
  220. parts = append(parts, part.(s3.Part))
  221. case error:
  222. // abort multi upload
  223. log.Printf("Error during upload, aborting %s.", part.(error).Error())
  224. err = part.(error)
  225. multi.Abort()
  226. return
  227. }
  228. }
  229. log.Printf("Completing upload %d parts", len(parts))
  230. if err = multi.Complete(parts); err != nil {
  231. log.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  232. return
  233. }
  234. log.Printf("Completed uploading %d", len(parts))
  235. return
  236. }
  237. type GDrive struct {
  238. service *drive.Service
  239. rootId string
  240. basedir string
  241. localConfigPath string
  242. }
  243. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string) (*GDrive, error) {
  244. b, err := ioutil.ReadFile(clientJsonFilepath)
  245. if err != nil {
  246. return nil, err
  247. }
  248. // If modifying these scopes, delete your previously saved client_secret.json.
  249. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  250. if err != nil {
  251. return nil, err
  252. }
  253. srv, err := drive.New(getGDriveClient(config))
  254. if err != nil {
  255. return nil, err
  256. }
  257. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath:localConfigPath}
  258. err = storage.setupRoot()
  259. if err != nil {
  260. return nil, err
  261. }
  262. return storage, nil
  263. }
  264. const GDriveRootConfigFile = "root_id.conf"
  265. const GDriveTimeoutTimerInterval = time.Second * 10
  266. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  267. type gDriveTimeoutReaderWrapper func(io.Reader) io.Reader
  268. func (s *GDrive) setupRoot() error {
  269. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  270. rootId, err := ioutil.ReadFile(rootFileConfig)
  271. if err != nil && !os.IsNotExist(err) {
  272. return err
  273. }
  274. if string(rootId) != "" {
  275. s.rootId = string(rootId)
  276. return nil
  277. }
  278. dir := &drive.File{
  279. Name: s.basedir,
  280. MimeType: GDriveDirectoryMimeType,
  281. }
  282. di, err := s.service.Files.Create(dir).Fields("id").Do()
  283. if err != nil {
  284. return err
  285. }
  286. s.rootId = di.Id
  287. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  288. if err != nil {
  289. return err
  290. }
  291. return nil
  292. }
  293. func (s *GDrive) getTimeoutReader(r io.Reader, cancel context.CancelFunc, timeout time.Duration) io.Reader {
  294. return &GDriveTimeoutReader{
  295. reader: r,
  296. cancel: cancel,
  297. mutex: &sync.Mutex{},
  298. maxIdleTimeout: timeout,
  299. }
  300. }
  301. type GDriveTimeoutReader struct {
  302. reader io.Reader
  303. cancel context.CancelFunc
  304. lastActivity time.Time
  305. timer *time.Timer
  306. mutex *sync.Mutex
  307. maxIdleTimeout time.Duration
  308. done bool
  309. }
  310. func (r *GDriveTimeoutReader) Read(p []byte) (int, error) {
  311. if r.timer == nil {
  312. r.startTimer()
  313. }
  314. r.mutex.Lock()
  315. // Read
  316. n, err := r.reader.Read(p)
  317. r.lastActivity = time.Now()
  318. r.done = (err != nil)
  319. r.mutex.Unlock()
  320. if r.done {
  321. r.stopTimer()
  322. }
  323. return n, err
  324. }
  325. func (r *GDriveTimeoutReader) Close() error {
  326. return r.reader.(io.ReadCloser).Close()
  327. }
  328. func (r *GDriveTimeoutReader) startTimer() {
  329. r.mutex.Lock()
  330. defer r.mutex.Unlock()
  331. if !r.done {
  332. r.timer = time.AfterFunc(GDriveTimeoutTimerInterval, r.timeout)
  333. }
  334. }
  335. func (r *GDriveTimeoutReader) stopTimer() {
  336. r.mutex.Lock()
  337. defer r.mutex.Unlock()
  338. if r.timer != nil {
  339. r.timer.Stop()
  340. }
  341. }
  342. func (r *GDriveTimeoutReader) timeout() {
  343. r.mutex.Lock()
  344. if r.done {
  345. r.mutex.Unlock()
  346. return
  347. }
  348. if time.Since(r.lastActivity) > r.maxIdleTimeout {
  349. r.cancel()
  350. r.mutex.Unlock()
  351. return
  352. }
  353. r.mutex.Unlock()
  354. r.startTimer()
  355. }
  356. func (s *GDrive) getTimeoutReaderWrapperContext(timeout time.Duration) (gDriveTimeoutReaderWrapper, context.Context) {
  357. ctx, cancel := context.WithCancel(context.TODO())
  358. wrapper := func(r io.Reader) io.Reader {
  359. // Return untouched reader if timeout is 0
  360. if timeout == 0 {
  361. return r
  362. }
  363. return s.getTimeoutReader(r, cancel, timeout)
  364. }
  365. return wrapper, ctx
  366. }
  367. func (s *GDrive) hasChecksum(f *drive.File) bool {
  368. return f.Md5Checksum != ""
  369. }
  370. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error){
  371. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  372. }
  373. func (s *GDrive) findId(filename string, token string) (string, error) {
  374. fileId, tokenId, nextPageToken := "", "", ""
  375. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  376. l, err := s.list(nextPageToken, q)
  377. for 0 < len(l.Files) {
  378. if err != nil {
  379. return "", err
  380. }
  381. for _, fi := range l.Files {
  382. tokenId = fi.Id
  383. break
  384. }
  385. if l.NextPageToken == "" {
  386. break
  387. }
  388. l, err = s.list(l.NextPageToken, q)
  389. }
  390. if filename == "" {
  391. return tokenId, nil
  392. } else if tokenId == "" {
  393. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  394. }
  395. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  396. l, err = s.list(nextPageToken, q)
  397. for 0 < len(l.Files) {
  398. if err != nil {
  399. return "", err
  400. }
  401. for _, fi := range l.Files {
  402. fileId = fi.Id
  403. break
  404. }
  405. if l.NextPageToken == "" {
  406. break
  407. }
  408. l, err = s.list(l.NextPageToken, q)
  409. }
  410. if fileId == "" {
  411. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  412. }
  413. return fileId, nil
  414. }
  415. func (s *GDrive) Type() string {
  416. return "gdrive"
  417. }
  418. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  419. var fileId string
  420. fileId, err = s.findId(filename, token)
  421. if err != nil {
  422. return
  423. }
  424. var fi *drive.File
  425. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  426. return
  427. }
  428. contentLength = uint64(fi.Size)
  429. contentType = fi.MimeType
  430. return
  431. }
  432. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  433. var fileId string
  434. fileId, err = s.findId(filename, token)
  435. if err != nil {
  436. return
  437. }
  438. var fi *drive.File
  439. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  440. if !s.hasChecksum(fi) {
  441. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  442. return
  443. }
  444. contentLength = uint64(fi.Size)
  445. contentType = fi.MimeType
  446. // Get timeout reader wrapper and context
  447. timeoutReaderWrapper, ctx := s.getTimeoutReaderWrapperContext(time.Duration(GDriveTimeoutTimerInterval))
  448. var res *http.Response
  449. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  450. if err != nil {
  451. return
  452. }
  453. reader = timeoutReaderWrapper(res.Body).(io.ReadCloser)
  454. return
  455. }
  456. func (s *GDrive) Delete(token string, filename string) (err error) {
  457. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  458. s.service.Files.Delete(metadata).Do()
  459. var fileId string
  460. fileId, err = s.findId(filename, token)
  461. if err != nil {
  462. return
  463. }
  464. err = s.service.Files.Delete(fileId).Do()
  465. return
  466. }
  467. func (s *GDrive) IsNotExist(err error) bool {
  468. if err == nil {
  469. return false
  470. }
  471. if err != nil {
  472. if e, ok := err.(*googleapi.Error); ok {
  473. return e.Code == http.StatusNotFound
  474. }
  475. }
  476. return false
  477. }
  478. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  479. dirId, err := s.findId("", token)
  480. if err != nil {
  481. return err
  482. }
  483. if dirId == "" {
  484. dir := &drive.File{
  485. Name: token,
  486. Parents: []string{s.rootId},
  487. MimeType: GDriveDirectoryMimeType,
  488. }
  489. di, err := s.service.Files.Create(dir).Fields("id").Do()
  490. if err != nil {
  491. return err
  492. }
  493. dirId = di.Id
  494. }
  495. // Wrap reader in timeout reader
  496. timeoutReaderWrapper, ctx := s.getTimeoutReaderWrapperContext(time.Duration(GDriveTimeoutTimerInterval))
  497. // Instantiate empty drive file
  498. dst := &drive.File{
  499. Name: filename,
  500. Parents: []string{dirId},
  501. MimeType: contentType,
  502. }
  503. _, err = s.service.Files.Create(dst).Context(ctx).Media(timeoutReaderWrapper(reader)).Do()
  504. if err != nil {
  505. return err
  506. }
  507. return nil
  508. }
  509. // Retrieve a token, saves the token, then returns the generated client.
  510. func getGDriveClient(config *oauth2.Config) *http.Client {
  511. tokenFile := "token.json"
  512. tok, err := gDriveTokenFromFile(tokenFile)
  513. if err != nil {
  514. tok = getGDriveTokenFromWeb(config)
  515. saveGDriveToken(tokenFile, tok)
  516. }
  517. return config.Client(context.Background(), tok)
  518. }
  519. // Request a token from the web, then returns the retrieved token.
  520. func getGDriveTokenFromWeb(config *oauth2.Config) *oauth2.Token {
  521. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  522. fmt.Printf("Go to the following link in your browser then type the "+
  523. "authorization code: \n%v\n", authURL)
  524. var authCode string
  525. if _, err := fmt.Scan(&authCode); err != nil {
  526. log.Fatalf("Unable to read authorization code %v", err)
  527. }
  528. tok, err := config.Exchange(oauth2.NoContext, authCode)
  529. if err != nil {
  530. log.Fatalf("Unable to retrieve token from web %v", err)
  531. }
  532. return tok
  533. }
  534. // Retrieves a token from a local file.
  535. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  536. f, err := os.Open(file)
  537. defer f.Close()
  538. if err != nil {
  539. return nil, err
  540. }
  541. tok := &oauth2.Token{}
  542. err = json.NewDecoder(f).Decode(tok)
  543. return tok, err
  544. }
  545. // Saves a token to a file path.
  546. func saveGDriveToken(path string, token *oauth2.Token) {
  547. fmt.Printf("Saving credential file to: %s\n", path)
  548. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  549. defer f.Close()
  550. if err != nil {
  551. log.Fatalf("Unable to cache oauth token: %v", err)
  552. }
  553. json.NewEncoder(f).Encode(token)
  554. }