You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

626 lines
14 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "encoding/json"
  13. "github.com/goamz/goamz/s3"
  14. "golang.org/x/net/context"
  15. "golang.org/x/oauth2"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "io/ioutil"
  20. "net/http"
  21. "strings"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. logger *log.Logger
  35. }
  36. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  37. return &LocalStorage{basedir: basedir, logger: logger}, nil
  38. }
  39. func (s *LocalStorage) Type() string {
  40. return "local"
  41. }
  42. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  43. path := filepath.Join(s.basedir, token, filename)
  44. var fi os.FileInfo
  45. if fi, err = os.Lstat(path); err != nil {
  46. return
  47. }
  48. contentLength = uint64(fi.Size())
  49. contentType = mime.TypeByExtension(filepath.Ext(filename))
  50. return
  51. }
  52. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  53. path := filepath.Join(s.basedir, token, filename)
  54. // content type , content length
  55. if reader, err = os.Open(path); err != nil {
  56. return
  57. }
  58. var fi os.FileInfo
  59. if fi, err = os.Lstat(path); err != nil {
  60. return
  61. }
  62. contentLength = uint64(fi.Size())
  63. contentType = mime.TypeByExtension(filepath.Ext(filename))
  64. return
  65. }
  66. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  67. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  68. os.Remove(metadata)
  69. path := filepath.Join(s.basedir, token, filename)
  70. err = os.Remove(path)
  71. return
  72. }
  73. func (s *LocalStorage) IsNotExist(err error) bool {
  74. if err == nil {
  75. return false
  76. }
  77. return os.IsNotExist(err)
  78. }
  79. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  80. var f io.WriteCloser
  81. var err error
  82. path := filepath.Join(s.basedir, token)
  83. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  84. return err
  85. }
  86. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. logger *log.Logger
  99. }
  100. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger) (*S3Storage, error) {
  101. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  102. if err != nil {
  103. return nil, err
  104. }
  105. return &S3Storage{bucket: bucket, logger: logger}, nil
  106. }
  107. func (s *S3Storage) Type() string {
  108. return "s3"
  109. }
  110. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  111. key := fmt.Sprintf("%s/%s", token, filename)
  112. // content type , content length
  113. response, err := s.bucket.Head(key, map[string][]string{})
  114. if err != nil {
  115. return
  116. }
  117. contentType = response.Header.Get("Content-Type")
  118. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  119. if err != nil {
  120. return
  121. }
  122. return
  123. }
  124. func (s *S3Storage) IsNotExist(err error) bool {
  125. if err == nil {
  126. return false
  127. }
  128. s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err)
  129. b := (err.Error() == "The specified key does not exist.")
  130. b = b || (err.Error() == "Access Denied")
  131. return b
  132. }
  133. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  134. key := fmt.Sprintf("%s/%s", token, filename)
  135. // content type , content length
  136. response, err := s.bucket.GetResponse(key)
  137. if err != nil {
  138. return
  139. }
  140. contentType = response.Header.Get("Content-Type")
  141. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  142. if err != nil {
  143. return
  144. }
  145. reader = response.Body
  146. return
  147. }
  148. func (s *S3Storage) Delete(token string, filename string) (err error) {
  149. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  150. s.bucket.Del(metadata)
  151. key := fmt.Sprintf("%s/%s", token, filename)
  152. err = s.bucket.Del(key)
  153. return
  154. }
  155. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  156. key := fmt.Sprintf("%s/%s", token, filename)
  157. var (
  158. multi *s3.Multi
  159. parts []s3.Part
  160. )
  161. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  162. s.logger.Printf(err.Error())
  163. return
  164. }
  165. // 20 mb parts
  166. partsChan := make(chan interface{})
  167. // partsChan := make(chan s3.Part)
  168. go func() {
  169. // maximize to 20 threads
  170. sem := make(chan int, 20)
  171. index := 1
  172. var wg sync.WaitGroup
  173. for {
  174. // buffered in memory because goamz s3 multi needs seekable reader
  175. var (
  176. buffer []byte = make([]byte, (1<<20)*10)
  177. count int
  178. err error
  179. )
  180. // Amazon expects parts of at least 5MB, except for the last one
  181. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  182. s.logger.Printf(err.Error())
  183. return
  184. }
  185. // always send minimal 1 part
  186. if err == io.EOF && index > 1 {
  187. s.logger.Printf("Waiting for all parts to finish uploading.")
  188. // wait for all parts to be finished uploading
  189. wg.Wait()
  190. // and close the channel
  191. close(partsChan)
  192. return
  193. }
  194. wg.Add(1)
  195. sem <- 1
  196. // using goroutines because of retries when upload fails
  197. go func(multi *s3.Multi, buffer []byte, index int) {
  198. s.logger.Printf("Uploading part %d %d", index, len(buffer))
  199. defer func() {
  200. s.logger.Printf("Finished part %d %d", index, len(buffer))
  201. wg.Done()
  202. <-sem
  203. }()
  204. partReader := bytes.NewReader(buffer)
  205. var part s3.Part
  206. if part, err = multi.PutPart(index, partReader); err != nil {
  207. s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  208. partsChan <- err
  209. return
  210. }
  211. s.logger.Printf("Finished uploading part %d %d", index, len(buffer))
  212. partsChan <- part
  213. }(multi, buffer[:count], index)
  214. index++
  215. }
  216. }()
  217. // wait for all parts to be uploaded
  218. for part := range partsChan {
  219. switch part.(type) {
  220. case s3.Part:
  221. parts = append(parts, part.(s3.Part))
  222. case error:
  223. // abort multi upload
  224. s.logger.Printf("Error during upload, aborting %s.", part.(error).Error())
  225. err = part.(error)
  226. multi.Abort()
  227. return
  228. }
  229. }
  230. s.logger.Printf("Completing upload %d parts", len(parts))
  231. if err = multi.Complete(parts); err != nil {
  232. s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  233. return
  234. }
  235. s.logger.Printf("Completed uploading %d", len(parts))
  236. return
  237. }
  238. type GDrive struct {
  239. service *drive.Service
  240. rootId string
  241. basedir string
  242. localConfigPath string
  243. logger *log.Logger
  244. }
  245. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, logger *log.Logger) (*GDrive, error) {
  246. b, err := ioutil.ReadFile(clientJsonFilepath)
  247. if err != nil {
  248. return nil, err
  249. }
  250. // If modifying these scopes, delete your previously saved client_secret.json.
  251. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  252. if err != nil {
  253. return nil, err
  254. }
  255. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  256. if err != nil {
  257. return nil, err
  258. }
  259. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, logger: logger}
  260. err = storage.setupRoot()
  261. if err != nil {
  262. return nil, err
  263. }
  264. return storage, nil
  265. }
  266. const GDriveRootConfigFile = "root_id.conf"
  267. const GDriveTokenJsonFile = "token.json"
  268. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  269. func (s *GDrive) setupRoot() error {
  270. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  271. rootId, err := ioutil.ReadFile(rootFileConfig)
  272. if err != nil && !os.IsNotExist(err) {
  273. return err
  274. }
  275. if string(rootId) != "" {
  276. s.rootId = string(rootId)
  277. return nil
  278. }
  279. dir := &drive.File{
  280. Name: s.basedir,
  281. MimeType: GDriveDirectoryMimeType,
  282. }
  283. di, err := s.service.Files.Create(dir).Fields("id").Do()
  284. if err != nil {
  285. return err
  286. }
  287. s.rootId = di.Id
  288. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  289. if err != nil {
  290. return err
  291. }
  292. return nil
  293. }
  294. func (s *GDrive) hasChecksum(f *drive.File) bool {
  295. return f.Md5Checksum != ""
  296. }
  297. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  298. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  299. }
  300. func (s *GDrive) findId(filename string, token string) (string, error) {
  301. filename = strings.Replace(filename, `'`, `\'`, -1)
  302. filename = strings.Replace(filename, `"`, `\"`, -1)
  303. fileId, tokenId, nextPageToken := "", "", ""
  304. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  305. l, err := s.list(nextPageToken, q)
  306. if err != nil {
  307. return "", err
  308. }
  309. for 0 < len(l.Files) {
  310. for _, fi := range l.Files {
  311. tokenId = fi.Id
  312. break
  313. }
  314. if l.NextPageToken == "" {
  315. break
  316. }
  317. l, err = s.list(l.NextPageToken, q)
  318. }
  319. if filename == "" {
  320. return tokenId, nil
  321. } else if tokenId == "" {
  322. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  323. }
  324. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  325. l, err = s.list(nextPageToken, q)
  326. if err != nil {
  327. return "", err
  328. }
  329. for 0 < len(l.Files) {
  330. for _, fi := range l.Files {
  331. fileId = fi.Id
  332. break
  333. }
  334. if l.NextPageToken == "" {
  335. break
  336. }
  337. l, err = s.list(l.NextPageToken, q)
  338. }
  339. if fileId == "" {
  340. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  341. }
  342. return fileId, nil
  343. }
  344. func (s *GDrive) Type() string {
  345. return "gdrive"
  346. }
  347. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  348. var fileId string
  349. fileId, err = s.findId(filename, token)
  350. if err != nil {
  351. return
  352. }
  353. var fi *drive.File
  354. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  355. return
  356. }
  357. contentLength = uint64(fi.Size)
  358. contentType = fi.MimeType
  359. return
  360. }
  361. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  362. var fileId string
  363. fileId, err = s.findId(filename, token)
  364. if err != nil {
  365. return
  366. }
  367. var fi *drive.File
  368. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  369. if !s.hasChecksum(fi) {
  370. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  371. return
  372. }
  373. contentLength = uint64(fi.Size)
  374. contentType = fi.MimeType
  375. ctx := context.Background()
  376. var res *http.Response
  377. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  378. if err != nil {
  379. return
  380. }
  381. reader = res.Body
  382. return
  383. }
  384. func (s *GDrive) Delete(token string, filename string) (err error) {
  385. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  386. s.service.Files.Delete(metadata).Do()
  387. var fileId string
  388. fileId, err = s.findId(filename, token)
  389. if err != nil {
  390. return
  391. }
  392. err = s.service.Files.Delete(fileId).Do()
  393. return
  394. }
  395. func (s *GDrive) IsNotExist(err error) bool {
  396. if err == nil {
  397. return false
  398. }
  399. if err != nil {
  400. if e, ok := err.(*googleapi.Error); ok {
  401. return e.Code == http.StatusNotFound
  402. }
  403. }
  404. return false
  405. }
  406. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  407. dirId, err := s.findId("", token)
  408. if err != nil {
  409. return err
  410. }
  411. if dirId == "" {
  412. dir := &drive.File{
  413. Name: token,
  414. Parents: []string{s.rootId},
  415. MimeType: GDriveDirectoryMimeType,
  416. }
  417. di, err := s.service.Files.Create(dir).Fields("id").Do()
  418. if err != nil {
  419. return err
  420. }
  421. dirId = di.Id
  422. }
  423. // Instantiate empty drive file
  424. dst := &drive.File{
  425. Name: filename,
  426. Parents: []string{dirId},
  427. MimeType: contentType,
  428. }
  429. ctx := context.Background()
  430. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader).Do()
  431. if err != nil {
  432. return err
  433. }
  434. return nil
  435. }
  436. // Retrieve a token, saves the token, then returns the generated client.
  437. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  438. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  439. tok, err := gDriveTokenFromFile(tokenFile)
  440. if err != nil {
  441. tok = getGDriveTokenFromWeb(config, logger)
  442. saveGDriveToken(tokenFile, tok, logger)
  443. }
  444. return config.Client(context.Background(), tok)
  445. }
  446. // Request a token from the web, then returns the retrieved token.
  447. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  448. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  449. fmt.Printf("Go to the following link in your browser then type the "+
  450. "authorization code: \n%v\n", authURL)
  451. var authCode string
  452. if _, err := fmt.Scan(&authCode); err != nil {
  453. logger.Fatalf("Unable to read authorization code %v", err)
  454. }
  455. tok, err := config.Exchange(context.TODO(), authCode)
  456. if err != nil {
  457. logger.Fatalf("Unable to retrieve token from web %v", err)
  458. }
  459. return tok
  460. }
  461. // Retrieves a token from a local file.
  462. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  463. f, err := os.Open(file)
  464. defer f.Close()
  465. if err != nil {
  466. return nil, err
  467. }
  468. tok := &oauth2.Token{}
  469. err = json.NewDecoder(f).Decode(tok)
  470. return tok, err
  471. }
  472. // Saves a token to a file path.
  473. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  474. logger.Printf("Saving credential file to: %s\n", path)
  475. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  476. defer f.Close()
  477. if err != nil {
  478. logger.Fatalf("Unable to cache oauth token: %v", err)
  479. }
  480. json.NewEncoder(f).Encode(token)
  481. }