You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

624 lines
14 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "encoding/json"
  13. "github.com/goamz/goamz/s3"
  14. "golang.org/x/net/context"
  15. "golang.org/x/oauth2"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "io/ioutil"
  20. "net/http"
  21. "strings"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  25. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. }
  35. func NewLocalStorage(basedir string) (*LocalStorage, error) {
  36. return &LocalStorage{basedir: basedir}, nil
  37. }
  38. func (s *LocalStorage) Type() string {
  39. return "local"
  40. }
  41. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  42. path := filepath.Join(s.basedir, token, filename)
  43. var fi os.FileInfo
  44. if fi, err = os.Lstat(path); err != nil {
  45. return
  46. }
  47. contentLength = uint64(fi.Size())
  48. contentType = mime.TypeByExtension(filepath.Ext(filename))
  49. return
  50. }
  51. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  52. path := filepath.Join(s.basedir, token, filename)
  53. // content type , content length
  54. if reader, err = os.Open(path); err != nil {
  55. return
  56. }
  57. var fi os.FileInfo
  58. if fi, err = os.Lstat(path); err != nil {
  59. return
  60. }
  61. contentLength = uint64(fi.Size())
  62. contentType = mime.TypeByExtension(filepath.Ext(filename))
  63. return
  64. }
  65. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  66. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  67. os.Remove(metadata)
  68. path := filepath.Join(s.basedir, token, filename)
  69. err = os.Remove(path)
  70. return
  71. }
  72. func (s *LocalStorage) IsNotExist(err error) bool {
  73. if err == nil {
  74. return false
  75. }
  76. return os.IsNotExist(err)
  77. }
  78. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  79. var f io.WriteCloser
  80. var err error
  81. path := filepath.Join(s.basedir, token)
  82. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  83. return err
  84. }
  85. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  86. fmt.Printf("%s", err)
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket *s3.Bucket
  98. }
  99. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string) (*S3Storage, error) {
  100. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  101. if err != nil {
  102. return nil, err
  103. }
  104. return &S3Storage{bucket: bucket}, nil
  105. }
  106. func (s *S3Storage) Type() string {
  107. return "s3"
  108. }
  109. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  110. key := fmt.Sprintf("%s/%s", token, filename)
  111. // content type , content length
  112. response, err := s.bucket.Head(key, map[string][]string{})
  113. if err != nil {
  114. return
  115. }
  116. contentType = response.Header.Get("Content-Type")
  117. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  118. if err != nil {
  119. return
  120. }
  121. return
  122. }
  123. func (s *S3Storage) IsNotExist(err error) bool {
  124. if err == nil {
  125. return false
  126. }
  127. log.Printf("IsNotExist: %s, %#v", err.Error(), err)
  128. b := (err.Error() == "The specified key does not exist.")
  129. b = b || (err.Error() == "Access Denied")
  130. return b
  131. }
  132. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  133. key := fmt.Sprintf("%s/%s", token, filename)
  134. // content type , content length
  135. response, err := s.bucket.GetResponse(key)
  136. if err != nil {
  137. return
  138. }
  139. contentType = response.Header.Get("Content-Type")
  140. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  141. if err != nil {
  142. return
  143. }
  144. reader = response.Body
  145. return
  146. }
  147. func (s *S3Storage) Delete(token string, filename string) (err error) {
  148. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  149. s.bucket.Del(metadata)
  150. key := fmt.Sprintf("%s/%s", token, filename)
  151. err = s.bucket.Del(key)
  152. return
  153. }
  154. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  155. key := fmt.Sprintf("%s/%s", token, filename)
  156. var (
  157. multi *s3.Multi
  158. parts []s3.Part
  159. )
  160. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  161. log.Printf(err.Error())
  162. return
  163. }
  164. // 20 mb parts
  165. partsChan := make(chan interface{})
  166. // partsChan := make(chan s3.Part)
  167. go func() {
  168. // maximize to 20 threads
  169. sem := make(chan int, 20)
  170. index := 1
  171. var wg sync.WaitGroup
  172. for {
  173. // buffered in memory because goamz s3 multi needs seekable reader
  174. var (
  175. buffer []byte = make([]byte, (1<<20)*10)
  176. count int
  177. err error
  178. )
  179. // Amazon expects parts of at least 5MB, except for the last one
  180. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  181. log.Printf(err.Error())
  182. return
  183. }
  184. // always send minimal 1 part
  185. if err == io.EOF && index > 1 {
  186. log.Printf("Waiting for all parts to finish uploading.")
  187. // wait for all parts to be finished uploading
  188. wg.Wait()
  189. // and close the channel
  190. close(partsChan)
  191. return
  192. }
  193. wg.Add(1)
  194. sem <- 1
  195. // using goroutines because of retries when upload fails
  196. go func(multi *s3.Multi, buffer []byte, index int) {
  197. log.Printf("Uploading part %d %d", index, len(buffer))
  198. defer func() {
  199. log.Printf("Finished part %d %d", index, len(buffer))
  200. wg.Done()
  201. <-sem
  202. }()
  203. partReader := bytes.NewReader(buffer)
  204. var part s3.Part
  205. if part, err = multi.PutPart(index, partReader); err != nil {
  206. log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  207. partsChan <- err
  208. return
  209. }
  210. log.Printf("Finished uploading part %d %d", index, len(buffer))
  211. partsChan <- part
  212. }(multi, buffer[:count], index)
  213. index++
  214. }
  215. }()
  216. // wait for all parts to be uploaded
  217. for part := range partsChan {
  218. switch part.(type) {
  219. case s3.Part:
  220. parts = append(parts, part.(s3.Part))
  221. case error:
  222. // abort multi upload
  223. log.Printf("Error during upload, aborting %s.", part.(error).Error())
  224. err = part.(error)
  225. multi.Abort()
  226. return
  227. }
  228. }
  229. log.Printf("Completing upload %d parts", len(parts))
  230. if err = multi.Complete(parts); err != nil {
  231. log.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  232. return
  233. }
  234. log.Printf("Completed uploading %d", len(parts))
  235. return
  236. }
  237. type GDrive struct {
  238. service *drive.Service
  239. rootId string
  240. basedir string
  241. localConfigPath string
  242. }
  243. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string) (*GDrive, error) {
  244. b, err := ioutil.ReadFile(clientJsonFilepath)
  245. if err != nil {
  246. return nil, err
  247. }
  248. // If modifying these scopes, delete your previously saved client_secret.json.
  249. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  250. if err != nil {
  251. return nil, err
  252. }
  253. srv, err := drive.New(getGDriveClient(config, localConfigPath))
  254. if err != nil {
  255. return nil, err
  256. }
  257. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath}
  258. err = storage.setupRoot()
  259. if err != nil {
  260. return nil, err
  261. }
  262. return storage, nil
  263. }
  264. const GDriveRootConfigFile = "root_id.conf"
  265. const GDriveTokenJsonFile = "token.json"
  266. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  267. func (s *GDrive) setupRoot() error {
  268. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  269. rootId, err := ioutil.ReadFile(rootFileConfig)
  270. if err != nil && !os.IsNotExist(err) {
  271. return err
  272. }
  273. if string(rootId) != "" {
  274. s.rootId = string(rootId)
  275. return nil
  276. }
  277. dir := &drive.File{
  278. Name: s.basedir,
  279. MimeType: GDriveDirectoryMimeType,
  280. }
  281. di, err := s.service.Files.Create(dir).Fields("id").Do()
  282. if err != nil {
  283. return err
  284. }
  285. s.rootId = di.Id
  286. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  287. if err != nil {
  288. return err
  289. }
  290. return nil
  291. }
  292. func (s *GDrive) hasChecksum(f *drive.File) bool {
  293. return f.Md5Checksum != ""
  294. }
  295. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  296. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  297. }
  298. func (s *GDrive) findId(filename string, token string) (string, error) {
  299. filename = strings.Replace(filename, `'`, `\'`, -1)
  300. filename = strings.Replace(filename, `"`, `\"`, -1)
  301. fileId, tokenId, nextPageToken := "", "", ""
  302. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  303. l, err := s.list(nextPageToken, q)
  304. for 0 < len(l.Files) {
  305. if err != nil {
  306. return "", err
  307. }
  308. for _, fi := range l.Files {
  309. tokenId = fi.Id
  310. break
  311. }
  312. if l.NextPageToken == "" {
  313. break
  314. }
  315. l, err = s.list(l.NextPageToken, q)
  316. }
  317. if filename == "" {
  318. return tokenId, nil
  319. } else if tokenId == "" {
  320. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  321. }
  322. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  323. l, err = s.list(nextPageToken, q)
  324. for 0 < len(l.Files) {
  325. if err != nil {
  326. return "", err
  327. }
  328. for _, fi := range l.Files {
  329. fileId = fi.Id
  330. break
  331. }
  332. if l.NextPageToken == "" {
  333. break
  334. }
  335. l, err = s.list(l.NextPageToken, q)
  336. }
  337. if fileId == "" {
  338. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  339. }
  340. return fileId, nil
  341. }
  342. func (s *GDrive) Type() string {
  343. return "gdrive"
  344. }
  345. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  346. var fileId string
  347. fileId, err = s.findId(filename, token)
  348. if err != nil {
  349. return
  350. }
  351. var fi *drive.File
  352. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  353. return
  354. }
  355. contentLength = uint64(fi.Size)
  356. contentType = fi.MimeType
  357. return
  358. }
  359. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  360. var fileId string
  361. fileId, err = s.findId(filename, token)
  362. if err != nil {
  363. return
  364. }
  365. var fi *drive.File
  366. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  367. if !s.hasChecksum(fi) {
  368. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  369. return
  370. }
  371. contentLength = uint64(fi.Size)
  372. contentType = fi.MimeType
  373. ctx := context.Background()
  374. var res *http.Response
  375. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  376. if err != nil {
  377. return
  378. }
  379. reader = res.Body
  380. return
  381. }
  382. func (s *GDrive) Delete(token string, filename string) (err error) {
  383. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  384. s.service.Files.Delete(metadata).Do()
  385. var fileId string
  386. fileId, err = s.findId(filename, token)
  387. if err != nil {
  388. return
  389. }
  390. err = s.service.Files.Delete(fileId).Do()
  391. return
  392. }
  393. func (s *GDrive) IsNotExist(err error) bool {
  394. if err == nil {
  395. return false
  396. }
  397. if err != nil {
  398. if e, ok := err.(*googleapi.Error); ok {
  399. return e.Code == http.StatusNotFound
  400. }
  401. }
  402. return false
  403. }
  404. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  405. dirId, err := s.findId("", token)
  406. if err != nil {
  407. return err
  408. }
  409. if dirId == "" {
  410. dir := &drive.File{
  411. Name: token,
  412. Parents: []string{s.rootId},
  413. MimeType: GDriveDirectoryMimeType,
  414. }
  415. di, err := s.service.Files.Create(dir).Fields("id").Do()
  416. if err != nil {
  417. return err
  418. }
  419. dirId = di.Id
  420. }
  421. // Instantiate empty drive file
  422. dst := &drive.File{
  423. Name: filename,
  424. Parents: []string{dirId},
  425. MimeType: contentType,
  426. }
  427. ctx := context.Background()
  428. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader).Do()
  429. if err != nil {
  430. return err
  431. }
  432. return nil
  433. }
  434. // Retrieve a token, saves the token, then returns the generated client.
  435. func getGDriveClient(config *oauth2.Config, localConfigPath string) *http.Client {
  436. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  437. tok, err := gDriveTokenFromFile(tokenFile)
  438. if err != nil {
  439. tok = getGDriveTokenFromWeb(config)
  440. saveGDriveToken(tokenFile, tok)
  441. }
  442. return config.Client(context.Background(), tok)
  443. }
  444. // Request a token from the web, then returns the retrieved token.
  445. func getGDriveTokenFromWeb(config *oauth2.Config) *oauth2.Token {
  446. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  447. fmt.Printf("Go to the following link in your browser then type the "+
  448. "authorization code: \n%v\n", authURL)
  449. var authCode string
  450. if _, err := fmt.Scan(&authCode); err != nil {
  451. log.Fatalf("Unable to read authorization code %v", err)
  452. }
  453. tok, err := config.Exchange(context.TODO(), authCode)
  454. if err != nil {
  455. log.Fatalf("Unable to retrieve token from web %v", err)
  456. }
  457. return tok
  458. }
  459. // Retrieves a token from a local file.
  460. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  461. f, err := os.Open(file)
  462. defer f.Close()
  463. if err != nil {
  464. return nil, err
  465. }
  466. tok := &oauth2.Token{}
  467. err = json.NewDecoder(f).Decode(tok)
  468. return tok, err
  469. }
  470. // Saves a token to a file path.
  471. func saveGDriveToken(path string, token *oauth2.Token) {
  472. fmt.Printf("Saving credential file to: %s\n", path)
  473. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  474. defer f.Close()
  475. if err != nil {
  476. log.Fatalf("Unable to cache oauth token: %v", err)
  477. }
  478. json.NewEncoder(f).Encode(token)
  479. }