You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

622 lines
14 KiB

  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "log"
  7. "mime"
  8. "os"
  9. "path/filepath"
  10. "strconv"
  11. "sync"
  12. "github.com/goamz/goamz/s3"
  13. "encoding/json"
  14. "golang.org/x/oauth2"
  15. "golang.org/x/net/context"
  16. "golang.org/x/oauth2/google"
  17. "google.golang.org/api/drive/v3"
  18. "google.golang.org/api/googleapi"
  19. "net/http"
  20. "io/ioutil"
  21. )
  22. type Storage interface {
  23. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  24. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  25. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  26. Delete(token string, filename string) error
  27. IsNotExist(err error) bool
  28. Type() string
  29. }
  30. type LocalStorage struct {
  31. Storage
  32. basedir string
  33. }
  34. func NewLocalStorage(basedir string) (*LocalStorage, error) {
  35. return &LocalStorage{basedir: basedir}, nil
  36. }
  37. func (s *LocalStorage) Type() string {
  38. return "local"
  39. }
  40. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  41. path := filepath.Join(s.basedir, token, filename)
  42. var fi os.FileInfo
  43. if fi, err = os.Lstat(path); err != nil {
  44. return
  45. }
  46. contentLength = uint64(fi.Size())
  47. contentType = mime.TypeByExtension(filepath.Ext(filename))
  48. return
  49. }
  50. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  51. path := filepath.Join(s.basedir, token, filename)
  52. // content type , content length
  53. if reader, err = os.Open(path); err != nil {
  54. return
  55. }
  56. var fi os.FileInfo
  57. if fi, err = os.Lstat(path); err != nil {
  58. return
  59. }
  60. contentLength = uint64(fi.Size())
  61. contentType = mime.TypeByExtension(filepath.Ext(filename))
  62. return
  63. }
  64. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  65. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  66. os.Remove(metadata);
  67. path := filepath.Join(s.basedir, token, filename)
  68. err = os.Remove(path);
  69. return
  70. }
  71. func (s *LocalStorage) IsNotExist(err error) bool {
  72. if err == nil {
  73. return false
  74. }
  75. return os.IsNotExist(err)
  76. }
  77. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  78. var f io.WriteCloser
  79. var err error
  80. path := filepath.Join(s.basedir, token)
  81. if err = os.Mkdir(path, 0700); err != nil && !os.IsExist(err) {
  82. return err
  83. }
  84. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  85. fmt.Printf("%s", err)
  86. return err
  87. }
  88. defer f.Close()
  89. if _, err = io.Copy(f, reader); err != nil {
  90. return err
  91. }
  92. return nil
  93. }
  94. type S3Storage struct {
  95. Storage
  96. bucket *s3.Bucket
  97. }
  98. func NewS3Storage(accessKey, secretKey, bucketName, endpoint string) (*S3Storage, error) {
  99. bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint)
  100. if err != nil {
  101. return nil, err
  102. }
  103. return &S3Storage{bucket: bucket}, nil
  104. }
  105. func (s *S3Storage) Type() string {
  106. return "s3"
  107. }
  108. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  109. key := fmt.Sprintf("%s/%s", token, filename)
  110. // content type , content length
  111. response, err := s.bucket.Head(key, map[string][]string{})
  112. if err != nil {
  113. return
  114. }
  115. contentType = response.Header.Get("Content-Type")
  116. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  117. if err != nil {
  118. return
  119. }
  120. return
  121. }
  122. func (s *S3Storage) IsNotExist(err error) bool {
  123. if err == nil {
  124. return false
  125. }
  126. log.Printf("IsNotExist: %s, %#v", err.Error(), err)
  127. b := (err.Error() == "The specified key does not exist.")
  128. b = b || (err.Error() == "Access Denied")
  129. return b
  130. }
  131. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  132. key := fmt.Sprintf("%s/%s", token, filename)
  133. // content type , content length
  134. response, err := s.bucket.GetResponse(key)
  135. if err != nil {
  136. return
  137. }
  138. contentType = response.Header.Get("Content-Type")
  139. contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
  140. if err != nil {
  141. return
  142. }
  143. reader = response.Body
  144. return
  145. }
  146. func (s *S3Storage) Delete(token string, filename string) (err error) {
  147. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  148. s.bucket.Del(metadata)
  149. key := fmt.Sprintf("%s/%s", token, filename)
  150. err = s.bucket.Del(key)
  151. return
  152. }
  153. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  154. key := fmt.Sprintf("%s/%s", token, filename)
  155. var (
  156. multi *s3.Multi
  157. parts []s3.Part
  158. )
  159. if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
  160. log.Printf(err.Error())
  161. return
  162. }
  163. // 20 mb parts
  164. partsChan := make(chan interface{})
  165. // partsChan := make(chan s3.Part)
  166. go func() {
  167. // maximize to 20 threads
  168. sem := make(chan int, 20)
  169. index := 1
  170. var wg sync.WaitGroup
  171. for {
  172. // buffered in memory because goamz s3 multi needs seekable reader
  173. var (
  174. buffer []byte = make([]byte, (1<<20)*10)
  175. count int
  176. err error
  177. )
  178. // Amazon expects parts of at least 5MB, except for the last one
  179. if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
  180. log.Printf(err.Error())
  181. return
  182. }
  183. // always send minimal 1 part
  184. if err == io.EOF && index > 1 {
  185. log.Printf("Waiting for all parts to finish uploading.")
  186. // wait for all parts to be finished uploading
  187. wg.Wait()
  188. // and close the channel
  189. close(partsChan)
  190. return
  191. }
  192. wg.Add(1)
  193. sem <- 1
  194. // using goroutines because of retries when upload fails
  195. go func(multi *s3.Multi, buffer []byte, index int) {
  196. log.Printf("Uploading part %d %d", index, len(buffer))
  197. defer func() {
  198. log.Printf("Finished part %d %d", index, len(buffer))
  199. wg.Done()
  200. <-sem
  201. }()
  202. partReader := bytes.NewReader(buffer)
  203. var part s3.Part
  204. if part, err = multi.PutPart(index, partReader); err != nil {
  205. log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
  206. partsChan <- err
  207. return
  208. }
  209. log.Printf("Finished uploading part %d %d", index, len(buffer))
  210. partsChan <- part
  211. }(multi, buffer[:count], index)
  212. index++
  213. }
  214. }()
  215. // wait for all parts to be uploaded
  216. for part := range partsChan {
  217. switch part.(type) {
  218. case s3.Part:
  219. parts = append(parts, part.(s3.Part))
  220. case error:
  221. // abort multi upload
  222. log.Printf("Error during upload, aborting %s.", part.(error).Error())
  223. err = part.(error)
  224. multi.Abort()
  225. return
  226. }
  227. }
  228. log.Printf("Completing upload %d parts", len(parts))
  229. if err = multi.Complete(parts); err != nil {
  230. log.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
  231. return
  232. }
  233. log.Printf("Completed uploading %d", len(parts))
  234. return
  235. }
  236. type GDrive struct {
  237. service *drive.Service
  238. rootId string
  239. basedir string
  240. localConfigPath string
  241. }
  242. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string) (*GDrive, error) {
  243. b, err := ioutil.ReadFile(clientJsonFilepath)
  244. if err != nil {
  245. return nil, err
  246. }
  247. // If modifying these scopes, delete your previously saved client_secret.json.
  248. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  249. if err != nil {
  250. return nil, err
  251. }
  252. srv, err := drive.New(getGDriveClient(config))
  253. if err != nil {
  254. return nil, err
  255. }
  256. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath:localConfigPath}
  257. err = storage.setupRoot()
  258. if err != nil {
  259. return nil, err
  260. }
  261. return storage, nil
  262. }
  263. const GDriveRootConfigFile = "root_id.conf"
  264. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  265. func (s *GDrive) setupRoot() error {
  266. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  267. rootId, err := ioutil.ReadFile(rootFileConfig)
  268. if err != nil && !os.IsNotExist(err) {
  269. return err
  270. }
  271. if string(rootId) != "" {
  272. s.rootId = string(rootId)
  273. return nil
  274. }
  275. dir := &drive.File{
  276. Name: s.basedir,
  277. MimeType: GDriveDirectoryMimeType,
  278. }
  279. di, err := s.service.Files.Create(dir).Fields("id").Do()
  280. if err != nil {
  281. return err
  282. }
  283. s.rootId = di.Id
  284. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  285. if err != nil {
  286. return err
  287. }
  288. return nil
  289. }
  290. func (s *GDrive) hasChecksum(f *drive.File) bool {
  291. return f.Md5Checksum != ""
  292. }
  293. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error){
  294. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  295. }
  296. func (s *GDrive) findId(filename string, token string) (string, error) {
  297. fileId, tokenId, nextPageToken := "", "", ""
  298. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  299. l, err := s.list(nextPageToken, q)
  300. for 0 < len(l.Files) {
  301. if err != nil {
  302. return "", err
  303. }
  304. for _, fi := range l.Files {
  305. tokenId = fi.Id
  306. break
  307. }
  308. if l.NextPageToken == "" {
  309. break
  310. }
  311. l, err = s.list(l.NextPageToken, q)
  312. }
  313. if filename == "" {
  314. return tokenId, nil
  315. } else if tokenId == "" {
  316. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  317. }
  318. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  319. l, err = s.list(nextPageToken, q)
  320. for 0 < len(l.Files) {
  321. if err != nil {
  322. return "", err
  323. }
  324. for _, fi := range l.Files {
  325. fileId = fi.Id
  326. break
  327. }
  328. if l.NextPageToken == "" {
  329. break
  330. }
  331. l, err = s.list(l.NextPageToken, q)
  332. }
  333. if fileId == "" {
  334. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  335. }
  336. return fileId, nil
  337. }
  338. func (s *GDrive) Type() string {
  339. return "gdrive"
  340. }
  341. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  342. var fileId string
  343. fileId, err = s.findId(filename, token)
  344. if err != nil {
  345. return
  346. }
  347. var fi *drive.File
  348. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  349. return
  350. }
  351. contentLength = uint64(fi.Size)
  352. contentType = fi.MimeType
  353. return
  354. }
  355. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  356. var fileId string
  357. fileId, err = s.findId(filename, token)
  358. if err != nil {
  359. return
  360. }
  361. var fi *drive.File
  362. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  363. if !s.hasChecksum(fi) {
  364. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  365. return
  366. }
  367. contentLength = uint64(fi.Size)
  368. contentType = fi.MimeType
  369. ctx := context.Background()
  370. var res *http.Response
  371. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  372. if err != nil {
  373. return
  374. }
  375. reader = res.Body
  376. return
  377. }
  378. func (s *GDrive) Delete(token string, filename string) (err error) {
  379. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  380. s.service.Files.Delete(metadata).Do()
  381. var fileId string
  382. fileId, err = s.findId(filename, token)
  383. if err != nil {
  384. return
  385. }
  386. err = s.service.Files.Delete(fileId).Do()
  387. return
  388. }
  389. func (s *GDrive) IsNotExist(err error) bool {
  390. if err == nil {
  391. return false
  392. }
  393. if err != nil {
  394. if e, ok := err.(*googleapi.Error); ok {
  395. return e.Code == http.StatusNotFound
  396. }
  397. }
  398. return false
  399. }
  400. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  401. dirId, err := s.findId("", token)
  402. if err != nil {
  403. return err
  404. }
  405. if dirId == "" {
  406. dir := &drive.File{
  407. Name: token,
  408. Parents: []string{s.rootId},
  409. MimeType: GDriveDirectoryMimeType,
  410. }
  411. di, err := s.service.Files.Create(dir).Fields("id").Do()
  412. if err != nil {
  413. return err
  414. }
  415. dirId = di.Id
  416. }
  417. // Instantiate empty drive file
  418. dst := &drive.File{
  419. Name: filename,
  420. Parents: []string{dirId},
  421. MimeType: contentType,
  422. }
  423. ctx := context.Background()
  424. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader).Do()
  425. if err != nil {
  426. return err
  427. }
  428. return nil
  429. }
  430. // Retrieve a token, saves the token, then returns the generated client.
  431. func getGDriveClient(config *oauth2.Config) *http.Client {
  432. tokenFile := "token.json"
  433. tok, err := gDriveTokenFromFile(tokenFile)
  434. if err != nil {
  435. tok = getGDriveTokenFromWeb(config)
  436. saveGDriveToken(tokenFile, tok)
  437. }
  438. return config.Client(context.Background(), tok)
  439. }
  440. // Request a token from the web, then returns the retrieved token.
  441. func getGDriveTokenFromWeb(config *oauth2.Config) *oauth2.Token {
  442. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  443. fmt.Printf("Go to the following link in your browser then type the "+
  444. "authorization code: \n%v\n", authURL)
  445. var authCode string
  446. if _, err := fmt.Scan(&authCode); err != nil {
  447. log.Fatalf("Unable to read authorization code %v", err)
  448. }
  449. tok, err := config.Exchange(oauth2.NoContext, authCode)
  450. if err != nil {
  451. log.Fatalf("Unable to retrieve token from web %v", err)
  452. }
  453. return tok
  454. }
  455. // Retrieves a token from a local file.
  456. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  457. f, err := os.Open(file)
  458. defer f.Close()
  459. if err != nil {
  460. return nil, err
  461. }
  462. tok := &oauth2.Token{}
  463. err = json.NewDecoder(f).Decode(tok)
  464. return tok, err
  465. }
  466. // Saves a token to a file path.
  467. func saveGDriveToken(path string, token *oauth2.Token) {
  468. fmt.Printf("Saving credential file to: %s\n", path)
  469. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  470. defer f.Close()
  471. if err != nil {
  472. log.Fatalf("Unable to cache oauth token: %v", err)
  473. }
  474. json.NewEncoder(f).Encode(token)
  475. }