You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

622 lines
14 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "encoding/json"
  13. "github.com/goamz/goamz/s3"
  14. "golang.org/x/net/context"
  15. "golang.org/x/oauth2"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "io/ioutil"
  20. "net/http"
  21. "strings"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. }
  35. func NewLocalStorage(basedir string) (*LocalStorage, error) {
  36. return &LocalStorage{basedir: basedir}, nil
  37. }
  38. func (s *LocalStorage) Type() string {
  39. return "local"
  40. }
  41. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  42. path := filepath.Join(s.basedir, token, filename)
  43. var fi os.FileInfo
  44. if fi, err = os.Lstat(path); err != nil {
  45. return
  46. }
  47. contentLength = uint64(fi.Size())
  48. contentType = mime.TypeByExtension(filepath.Ext(filename))
  49. return
  50. }
  51. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  52. path := filepath.Join(s.basedir, token, filename)
  53. // content type , content length
  54. if reader, err = os.Open(path); err != nil {
  55. return
  56. }
  57. var fi os.FileInfo
  58. if fi, err = os.Lstat(path); err != nil {
  59. return
  60. }
  61. contentLength = uint64(fi.Size())
  62. contentType = mime.TypeByExtension(filepath.Ext(filename))
  63. return
  64. }
  65. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  66. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  67. os.Remove(metadata)
  68. path := filepath.Join(s.basedir, token, filename)
  69. err = os.Remove(path)
  70. return
  71. }
  72. func (s *LocalStorage) IsNotExist(err error) bool {
  73. if err == nil {
  74. return false
  75. }
  76. return os.IsNotExist(err)
  77. }
  78. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  79. var f io.WriteCloser
  80. var err error
  81. path := filepath.Join(s.basedir, token)
  82. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  83. return err
  84. }
  85. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  86. fmt.Printf("%s", err)
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. }
  99. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string) (*S3Storage, error) {
  100. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  101. if err != nil {
  102. return nil, err
  103. }
  104. return &S3Storage{bucket: bucket}, nil
  105. }
  106. func (s *S3Storage) Type() string {
  107. return "s3"
  108. }
  109. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  110. key := fmt.Sprintf("%s/%s", token, filename)
  111. // content type , content length
  112. response, err := s.bucket.Head(key, map[string][]string{})
  113. if err != nil {
  114. return
  115. }
  116. contentType = response.Header.Get("Content-Type")
  117. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  118. if err != nil {
  119. return
  120. }
  121. return
  122. }
  123. func (s *S3Storage) IsNotExist(err error) bool {
  124. if err == nil {
  125. return false
  126. }
  127. log.Printf("IsNotExist: %s, %#v", err.Error(), err)
  128. b := (err.Error() == "The specified key does not exist.")
  129. b = b || (err.Error() == "Access Denied")
  130. return b
  131. }
  132. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  133. key := fmt.Sprintf("%s/%s", token, filename)
  134. // content type , content length
  135. response, err := s.bucket.GetResponse(key)
  136. if err != nil {
  137. return
  138. }
  139. contentType = response.Header.Get("Content-Type")
  140. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  141. if err != nil {
  142. return
  143. }
  144. reader = response.Body
  145. return
  146. }
  147. func (s *S3Storage) Delete(token string, filename string) (err error) {
  148. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  149. s.bucket.Del(metadata)
  150. key := fmt.Sprintf("%s/%s", token, filename)
  151. err = s.bucket.Del(key)
  152. return
  153. }
  154. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  155. key := fmt.Sprintf("%s/%s", token, filename)
  156. var (
  157. multi *s3.Multi
  158. parts []s3.Part
  159. )
  160. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  161. log.Printf(err.Error())
  162. return
  163. }
  164. // 20 mb parts
  165. partsChan := make(chan interface{})
  166. // partsChan := make(chan s3.Part)
  167. go func() {
  168. // maximize to 20 threads
  169. sem := make(chan int, 20)
  170. index := 1
  171. var wg sync.WaitGroup
  172. for {
  173. // buffered in memory because goamz s3 multi needs seekable reader
  174. var (
  175. buffer []byte = make([]byte, (1<<20)*10)
  176. count int
  177. err error
  178. )
  179. // Amazon expects parts of at least 5MB, except for the last one
  180. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  181. log.Printf(err.Error())
  182. return
  183. }
  184. // always send minimal 1 part
  185. if err == io.EOF && index > 1 {
  186. log.Printf("Waiting for all parts to finish uploading.")
  187. // wait for all parts to be finished uploading
  188. wg.Wait()
  189. // and close the channel
  190. close(partsChan)
  191. return
  192. }
  193. wg.Add(1)
  194. sem <- 1
  195. // using goroutines because of retries when upload fails
  196. go func(multi *s3.Multi, buffer []byte, index int) {
  197. log.Printf("Uploading part %d %d", index, len(buffer))
  198. defer func() {
  199. log.Printf("Finished part %d %d", index, len(buffer))
  200. wg.Done()
  201. <-sem
  202. }()
  203. partReader := bytes.NewReader(buffer)
  204. var part s3.Part
  205. if part, err = multi.PutPart(index, partReader); err != nil {
  206. log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  207. partsChan <- err
  208. return
  209. }
  210. log.Printf("Finished uploading part %d %d", index, len(buffer))
  211. partsChan <- part
  212. }(multi, buffer[:count], index)
  213. index++
  214. }
  215. }()
  216. // wait for all parts to be uploaded
  217. for part := range partsChan {
  218. switch part.(type) {
  219. case s3.Part:
  220. parts = append(parts, part.(s3.Part))
  221. case error:
  222. // abort multi upload
  223. log.Printf("Error during upload, aborting %s.", part.(error).Error())
  224. err = part.(error)
  225. multi.Abort()
  226. return
  227. }
  228. }
  229. log.Printf("Completing upload %d parts", len(parts))
  230. if err = multi.Complete(parts); err != nil {
  231. log.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  232. return
  233. }
  234. log.Printf("Completed uploading %d", len(parts))
  235. return
  236. }
  237. type GDrive struct {
  238. service *drive.Service
  239. rootId string
  240. basedir string
  241. localConfigPath string
  242. }
  243. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string) (*GDrive, error) {
  244. b, err := ioutil.ReadFile(clientJsonFilepath)
  245. if err != nil {
  246. return nil, err
  247. }
  248. // If modifying these scopes, delete your previously saved client_secret.json.
  249. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  250. if err != nil {
  251. return nil, err
  252. }
  253. srv, err := drive.New(getGDriveClient(config))
  254. if err != nil {
  255. return nil, err
  256. }
  257. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath}
  258. err = storage.setupRoot()
  259. if err != nil {
  260. return nil, err
  261. }
  262. return storage, nil
  263. }
  264. const GDriveRootConfigFile = "root_id.conf"
  265. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  266. func (s *GDrive) setupRoot() error {
  267. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  268. rootId, err := ioutil.ReadFile(rootFileConfig)
  269. if err != nil && !os.IsNotExist(err) {
  270. return err
  271. }
  272. if string(rootId) != "" {
  273. s.rootId = string(rootId)
  274. return nil
  275. }
  276. dir := &drive.File{
  277. Name: s.basedir,
  278. MimeType: GDriveDirectoryMimeType,
  279. }
  280. di, err := s.service.Files.Create(dir).Fields("id").Do()
  281. if err != nil {
  282. return err
  283. }
  284. s.rootId = di.Id
  285. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  286. if err != nil {
  287. return err
  288. }
  289. return nil
  290. }
  291. func (s *GDrive) hasChecksum(f *drive.File) bool {
  292. return f.Md5Checksum != ""
  293. }
  294. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  295. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  296. }
  297. func (s *GDrive) findId(filename string, token string) (string, error) {
  298. filename = strings.Replace(filename, `'`, `\'`, -1)
  299. filename = strings.Replace(filename, `"`, `\"`, -1)
  300. fileId, tokenId, nextPageToken := "", "", ""
  301. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  302. l, err := s.list(nextPageToken, q)
  303. for 0 < len(l.Files) {
  304. if err != nil {
  305. return "", err
  306. }
  307. for _, fi := range l.Files {
  308. tokenId = fi.Id
  309. break
  310. }
  311. if l.NextPageToken == "" {
  312. break
  313. }
  314. l, err = s.list(l.NextPageToken, q)
  315. }
  316. if filename == "" {
  317. return tokenId, nil
  318. } else if tokenId == "" {
  319. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  320. }
  321. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  322. l, err = s.list(nextPageToken, q)
  323. for 0 < len(l.Files) {
  324. if err != nil {
  325. return "", err
  326. }
  327. for _, fi := range l.Files {
  328. fileId = fi.Id
  329. break
  330. }
  331. if l.NextPageToken == "" {
  332. break
  333. }
  334. l, err = s.list(l.NextPageToken, q)
  335. }
  336. if fileId == "" {
  337. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  338. }
  339. return fileId, nil
  340. }
  341. func (s *GDrive) Type() string {
  342. return "gdrive"
  343. }
  344. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  345. var fileId string
  346. fileId, err = s.findId(filename, token)
  347. if err != nil {
  348. return
  349. }
  350. var fi *drive.File
  351. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  352. return
  353. }
  354. contentLength = uint64(fi.Size)
  355. contentType = fi.MimeType
  356. return
  357. }
  358. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  359. var fileId string
  360. fileId, err = s.findId(filename, token)
  361. if err != nil {
  362. return
  363. }
  364. var fi *drive.File
  365. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  366. if !s.hasChecksum(fi) {
  367. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  368. return
  369. }
  370. contentLength = uint64(fi.Size)
  371. contentType = fi.MimeType
  372. ctx := context.Background()
  373. var res *http.Response
  374. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  375. if err != nil {
  376. return
  377. }
  378. reader = res.Body
  379. return
  380. }
  381. func (s *GDrive) Delete(token string, filename string) (err error) {
  382. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  383. s.service.Files.Delete(metadata).Do()
  384. var fileId string
  385. fileId, err = s.findId(filename, token)
  386. if err != nil {
  387. return
  388. }
  389. err = s.service.Files.Delete(fileId).Do()
  390. return
  391. }
  392. func (s *GDrive) IsNotExist(err error) bool {
  393. if err == nil {
  394. return false
  395. }
  396. if err != nil {
  397. if e, ok := err.(*googleapi.Error); ok {
  398. return e.Code == http.StatusNotFound
  399. }
  400. }
  401. return false
  402. }
  403. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  404. dirId, err := s.findId("", token)
  405. if err != nil {
  406. return err
  407. }
  408. if dirId == "" {
  409. dir := &drive.File{
  410. Name: token,
  411. Parents: []string{s.rootId},
  412. MimeType: GDriveDirectoryMimeType,
  413. }
  414. di, err := s.service.Files.Create(dir).Fields("id").Do()
  415. if err != nil {
  416. return err
  417. }
  418. dirId = di.Id
  419. }
  420. // Instantiate empty drive file
  421. dst := &drive.File{
  422. Name: filename,
  423. Parents: []string{dirId},
  424. MimeType: contentType,
  425. }
  426. ctx := context.Background()
  427. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader).Do()
  428. if err != nil {
  429. return err
  430. }
  431. return nil
  432. }
  433. // Retrieve a token, saves the token, then returns the generated client.
  434. func getGDriveClient(config *oauth2.Config) *http.Client {
  435. tokenFile := "token.json"
  436. tok, err := gDriveTokenFromFile(tokenFile)
  437. if err != nil {
  438. tok = getGDriveTokenFromWeb(config)
  439. saveGDriveToken(tokenFile, tok)
  440. }
  441. return config.Client(context.Background(), tok)
  442. }
  443. // Request a token from the web, then returns the retrieved token.
  444. func getGDriveTokenFromWeb(config *oauth2.Config) *oauth2.Token {
  445. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  446. fmt.Printf("Go to the following link in your browser then type the "+
  447. "authorization code: \n%v\n", authURL)
  448. var authCode string
  449. if _, err := fmt.Scan(&authCode); err != nil {
  450. log.Fatalf("Unable to read authorization code %v", err)
  451. }
  452. tok, err := config.Exchange(oauth2.NoContext, authCode)
  453. if err != nil {
  454. log.Fatalf("Unable to retrieve token from web %v", err)
  455. }
  456. return tok
  457. }
  458. // Retrieves a token from a local file.
  459. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  460. f, err := os.Open(file)
  461. defer f.Close()
  462. if err != nil {
  463. return nil, err
  464. }
  465. tok := &oauth2.Token{}
  466. err = json.NewDecoder(f).Decode(tok)
  467. return tok, err
  468. }
  469. // Saves a token to a file path.
  470. func saveGDriveToken(path string, token *oauth2.Token) {
  471. fmt.Printf("Saving credential file to: %s\n", path)
  472. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  473. defer f.Close()
  474. if err != nil {
  475. log.Fatalf("Unable to cache oauth token: %v", err)
  476. }
  477. json.NewEncoder(f).Encode(token)
  478. }