You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

628 lines
14 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "encoding/json"
  13. "github.com/goamz/goamz/s3"
  14. "golang.org/x/net/context"
  15. "golang.org/x/oauth2"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "io/ioutil"
  20. "net/http"
  21. "strings"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. logger *log.Logger
  35. }
  36. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  37. return &LocalStorage{basedir: basedir, logger: logger}, nil
  38. }
  39. func (s *LocalStorage) Type() string {
  40. return "local"
  41. }
  42. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  43. path := filepath.Join(s.basedir, token, filename)
  44. var fi os.FileInfo
  45. if fi, err = os.Lstat(path); err != nil {
  46. return
  47. }
  48. contentLength = uint64(fi.Size())
  49. contentType = mime.TypeByExtension(filepath.Ext(filename))
  50. return
  51. }
  52. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  53. path := filepath.Join(s.basedir, token, filename)
  54. // content type , content length
  55. if reader, err = os.Open(path); err != nil {
  56. return
  57. }
  58. var fi os.FileInfo
  59. if fi, err = os.Lstat(path); err != nil {
  60. return
  61. }
  62. contentLength = uint64(fi.Size())
  63. contentType = mime.TypeByExtension(filepath.Ext(filename))
  64. return
  65. }
  66. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  67. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  68. os.Remove(metadata)
  69. path := filepath.Join(s.basedir, token, filename)
  70. err = os.Remove(path)
  71. return
  72. }
  73. func (s *LocalStorage) IsNotExist(err error) bool {
  74. if err == nil {
  75. return false
  76. }
  77. return os.IsNotExist(err)
  78. }
  79. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  80. var f io.WriteCloser
  81. var err error
  82. path := filepath.Join(s.basedir, token)
  83. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  84. return err
  85. }
  86. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. logger *log.Logger
  99. }
  100. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger) (*S3Storage, error) {
  101. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  102. if err != nil {
  103. return nil, err
  104. }
  105. return &S3Storage{bucket: bucket, logger: logger}, nil
  106. }
  107. func (s *S3Storage) Type() string {
  108. return "s3"
  109. }
  110. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  111. key := fmt.Sprintf("%s/%s", token, filename)
  112. // content type , content length
  113. response, err := s.bucket.Head(key, map[string][]string{})
  114. if err != nil {
  115. return
  116. }
  117. contentType = response.Header.Get("Content-Type")
  118. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  119. if err != nil {
  120. return
  121. }
  122. return
  123. }
  124. func (s *S3Storage) IsNotExist(err error) bool {
  125. if err == nil {
  126. return false
  127. }
  128. s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
  129. b := (err.Error() == "The specified key does not exist.")
  130. b = b || (err.Error() == "Access Denied")
  131. return b
  132. }
  133. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  134. key := fmt.Sprintf("%s/%s", token, filename)
  135. // content type , content length
  136. response, err := s.bucket.GetResponse(key)
  137. if err != nil {
  138. return
  139. }
  140. contentType = response.Header.Get("Content-Type")
  141. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  142. if err != nil {
  143. return
  144. }
  145. reader = response.Body
  146. return
  147. }
  148. func (s *S3Storage) Delete(token string, filename string) (err error) {
  149. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  150. s.bucket.Del(metadata)
  151. key := fmt.Sprintf("%s/%s", token, filename)
  152. err = s.bucket.Del(key)
  153. return
  154. }
  155. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  156. key := fmt.Sprintf("%s/%s", token, filename)
  157. var (
  158. multi *s3.Multi
  159. parts []s3.Part
  160. )
  161. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  162. s.logger.Printf(err.Error())
  163. return
  164. }
  165. // 20 mb parts
  166. partsChan := make(chan interface{})
  167. // partsChan := make(chan s3.Part)
  168. go func() {
  169. // maximize to 20 threads
  170. sem := make(chan int, 20)
  171. index := 1
  172. var wg sync.WaitGroup
  173. for {
  174. // buffered in memory because goamz s3 multi needs seekable reader
  175. var (
  176. buffer []byte = make([]byte, (1<<20)*10)
  177. count int
  178. err error
  179. )
  180. // Amazon expects parts of at least 5MB, except for the last one
  181. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  182. s.logger.Printf(err.Error())
  183. return
  184. }
  185. // always send minimal 1 part
  186. if err == io.EOF && index > 1 {
  187. s.logger.Printf("Waiting for all parts to finish uploading.")
  188. // wait for all parts to be finished uploading
  189. wg.Wait()
  190. // and close the channel
  191. close(partsChan)
  192. return
  193. }
  194. wg.Add(1)
  195. sem <- 1
  196. // using goroutines because of retries when upload fails
  197. go func(multi *s3.Multi, buffer []byte, index int) {
  198. s.logger.Printf("Uploading part %d %d", index, len(buffer))
  199. defer func() {
  200. s.logger.Printf("Finished part %d %d", index, len(buffer))
  201. wg.Done()
  202. <-sem
  203. }()
  204. partReader := bytes.NewReader(buffer)
  205. var part s3.Part
  206. if part, err = multi.PutPart(index, partReader); err != nil {
  207. s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  208. partsChan <- err
  209. return
  210. }
  211. s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
  212. partsChan <- part
  213. }(multi, buffer[:count], index)
  214. index++
  215. }
  216. }()
  217. // wait for all parts to be uploaded
  218. for part := range partsChan {
  219. switch part.(type) {
  220. case s3.Part:
  221. parts = append(parts, part.(s3.Part))
  222. case error:
  223. // abort multi upload
  224. s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
  225. err = part.(error)
  226. multi.Abort()
  227. return
  228. }
  229. }
  230. s.logger.Printf("Completing upload %d parts", len(parts))
  231. if err = multi.Complete(parts); err != nil {
  232. s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  233. return
  234. }
  235. s.logger.Printf("Completed uploading %d", len(parts))
  236. return
  237. }
  238. type GDrive struct {
  239. service *drive.Service
  240. rootId string
  241. basedir string
  242. localConfigPath string
  243. chunkSize int
  244. logger *log.Logger
  245. }
  246. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  247. b, err := ioutil.ReadFile(clientJsonFilepath)
  248. if err != nil {
  249. return nil, err
  250. }
  251. // If modifying these scopes, delete your previously saved client_secret.json.
  252. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  253. if err != nil {
  254. return nil, err
  255. }
  256. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  257. if err != nil {
  258. return nil, err
  259. }
  260. chunkSize = chunkSize * 1024 * 1024
  261. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  262. err = storage.setupRoot()
  263. if err != nil {
  264. return nil, err
  265. }
  266. return storage, nil
  267. }
  268. const GDriveRootConfigFile = "root_id.conf"
  269. const GDriveTokenJsonFile = "token.json"
  270. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  271. func (s *GDrive) setupRoot() error {
  272. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  273. rootId, err := ioutil.ReadFile(rootFileConfig)
  274. if err != nil && !os.IsNotExist(err) {
  275. return err
  276. }
  277. if string(rootId) != "" {
  278. s.rootId = string(rootId)
  279. return nil
  280. }
  281. dir := &drive.File{
  282. Name: s.basedir,
  283. MimeType: GDriveDirectoryMimeType,
  284. }
  285. di, err := s.service.Files.Create(dir).Fields("id").Do()
  286. if err != nil {
  287. return err
  288. }
  289. s.rootId = di.Id
  290. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  291. if err != nil {
  292. return err
  293. }
  294. return nil
  295. }
  296. func (s *GDrive) hasChecksum(f *drive.File) bool {
  297. return f.Md5Checksum != ""
  298. }
  299. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  300. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  301. }
  302. func (s *GDrive) findId(filename string, token string) (string, error) {
  303. filename = strings.Replace(filename, `'`, `\'`, -1)
  304. filename = strings.Replace(filename, `"`, `\"`, -1)
  305. fileId, tokenId, nextPageToken := "", "", ""
  306. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  307. l, err := s.list(nextPageToken, q)
  308. if err != nil {
  309. return "", err
  310. }
  311. for 0 < len(l.Files) {
  312. for _, fi := range l.Files {
  313. tokenId = fi.Id
  314. break
  315. }
  316. if l.NextPageToken == "" {
  317. break
  318. }
  319. l, err = s.list(l.NextPageToken, q)
  320. }
  321. if filename == "" {
  322. return tokenId, nil
  323. } else if tokenId == "" {
  324. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  325. }
  326. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  327. l, err = s.list(nextPageToken, q)
  328. if err != nil {
  329. return "", err
  330. }
  331. for 0 < len(l.Files) {
  332. for _, fi := range l.Files {
  333. fileId = fi.Id
  334. break
  335. }
  336. if l.NextPageToken == "" {
  337. break
  338. }
  339. l, err = s.list(l.NextPageToken, q)
  340. }
  341. if fileId == "" {
  342. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  343. }
  344. return fileId, nil
  345. }
  346. func (s *GDrive) Type() string {
  347. return "gdrive"
  348. }
  349. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  350. var fileId string
  351. fileId, err = s.findId(filename, token)
  352. if err != nil {
  353. return
  354. }
  355. var fi *drive.File
  356. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  357. return
  358. }
  359. contentLength = uint64(fi.Size)
  360. contentType = fi.MimeType
  361. return
  362. }
  363. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  364. var fileId string
  365. fileId, err = s.findId(filename, token)
  366. if err != nil {
  367. return
  368. }
  369. var fi *drive.File
  370. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  371. if !s.hasChecksum(fi) {
  372. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  373. return
  374. }
  375. contentLength = uint64(fi.Size)
  376. contentType = fi.MimeType
  377. ctx := context.Background()
  378. var res *http.Response
  379. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  380. if err != nil {
  381. return
  382. }
  383. reader = res.Body
  384. return
  385. }
  386. func (s *GDrive) Delete(token string, filename string) (err error) {
  387. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  388. s.service.Files.Delete(metadata).Do()
  389. var fileId string
  390. fileId, err = s.findId(filename, token)
  391. if err != nil {
  392. return
  393. }
  394. err = s.service.Files.Delete(fileId).Do()
  395. return
  396. }
  397. func (s *GDrive) IsNotExist(err error) bool {
  398. if err == nil {
  399. return false
  400. }
  401. if err != nil {
  402. if e, ok := err.(*googleapi.Error); ok {
  403. return e.Code == http.StatusNotFound
  404. }
  405. }
  406. return false
  407. }
  408. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  409. dirId, err := s.findId("", token)
  410. if err != nil {
  411. return err
  412. }
  413. if dirId == "" {
  414. dir := &drive.File{
  415. Name: token,
  416. Parents: []string{s.rootId},
  417. MimeType: GDriveDirectoryMimeType,
  418. }
  419. di, err := s.service.Files.Create(dir).Fields("id").Do()
  420. if err != nil {
  421. return err
  422. }
  423. dirId = di.Id
  424. }
  425. // Instantiate empty drive file
  426. dst := &drive.File{
  427. Name: filename,
  428. Parents: []string{dirId},
  429. MimeType: contentType,
  430. }
  431. ctx := context.Background()
  432. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  433. if err != nil {
  434. return err
  435. }
  436. return nil
  437. }
  438. // Retrieve a token, saves the token, then returns the generated client.
  439. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  440. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  441. tok, err := gDriveTokenFromFile(tokenFile)
  442. if err != nil {
  443. tok = getGDriveTokenFromWeb(config, logger)
  444. saveGDriveToken(tokenFile, tok, logger)
  445. }
  446. return config.Client(context.Background(), tok)
  447. }
  448. // Request a token from the web, then returns the retrieved token.
  449. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  450. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  451. fmt.Printf("Go to the following link in your browser then type the "+
  452. "authorization code: \n%v\n", authURL)
  453. var authCode string
  454. if _, err := fmt.Scan(&authCode); err != nil {
  455. logger.Fatalf("Unable to read authorization code %v", err)
  456. }
  457. tok, err := config.Exchange(context.TODO(), authCode)
  458. if err != nil {
  459. logger.Fatalf("Unable to retrieve token from web %v", err)
  460. }
  461. return tok
  462. }
  463. // Retrieves a token from a local file.
  464. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  465. f, err := os.Open(file)
  466. defer f.Close()
  467. if err != nil {
  468. return nil, err
  469. }
  470. tok := &oauth2.Token{}
  471. err = json.NewDecoder(f).Decode(tok)
  472. return tok, err
  473. }
  474. // Saves a token to a file path.
  475. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  476. logger.Printf("Saving credential file to: %s\n", path)
  477. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  478. defer f.Close()
  479. if err != nil {
  480. logger.Fatalf("Unable to cache oauth token: %v", err)
  481. }
  482. json.NewEncoder(f).Encode(token)
  483. }