You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

741 lines
17 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/aws/aws-sdk-go/aws"
  7. "github.com/aws/aws-sdk-go/aws/awserr"
  8. "github.com/aws/aws-sdk-go/aws/session"
  9. "github.com/aws/aws-sdk-go/service/s3"
  10. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  11. "golang.org/x/net/context"
  12. "golang.org/x/oauth2"
  13. "golang.org/x/oauth2/google"
  14. "google.golang.org/api/drive/v3"
  15. "google.golang.org/api/googleapi"
  16. "io"
  17. "io/ioutil"
  18. "log"
  19. "net/http"
  20. "os"
  21. "path/filepath"
  22. "strings"
  23. "time"
  24. "storj.io/common/storj"
  25. "storj.io/uplink"
  26. )
  27. type Storage interface {
  28. Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
  29. Head(token string, filename string) (contentLength uint64, err error)
  30. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  31. Delete(token string, filename string) error
  32. IsNotExist(err error) bool
  33. Purge(days time.Duration) error
  34. Type() string
  35. }
  36. type LocalStorage struct {
  37. Storage
  38. basedir string
  39. logger *log.Logger
  40. }
  41. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  42. return &LocalStorage{basedir: basedir, logger: logger}, nil
  43. }
  44. func (s *LocalStorage) Type() string {
  45. return "local"
  46. }
  47. func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
  48. path := filepath.Join(s.basedir, token, filename)
  49. var fi os.FileInfo
  50. if fi, err = os.Lstat(path); err != nil {
  51. return
  52. }
  53. contentLength = uint64(fi.Size())
  54. return
  55. }
  56. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  57. path := filepath.Join(s.basedir, token, filename)
  58. // content type , content length
  59. if reader, err = os.Open(path); err != nil {
  60. return
  61. }
  62. var fi os.FileInfo
  63. if fi, err = os.Lstat(path); err != nil {
  64. return
  65. }
  66. contentLength = uint64(fi.Size())
  67. return
  68. }
  69. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  70. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  71. os.Remove(metadata)
  72. path := filepath.Join(s.basedir, token, filename)
  73. err = os.Remove(path)
  74. return
  75. }
  76. func (s *LocalStorage) Purge(days time.Duration) (err error) {
  77. err = filepath.Walk(s.basedir,
  78. func(path string, info os.FileInfo, err error) error {
  79. if err != nil {
  80. return err
  81. }
  82. if info.IsDir() {
  83. return nil
  84. }
  85. if info.ModTime().Before(time.Now().Add(-1 * days)) {
  86. err = os.Remove(path)
  87. return err
  88. }
  89. return nil
  90. })
  91. return
  92. }
  93. func (s *LocalStorage) IsNotExist(err error) bool {
  94. if err == nil {
  95. return false
  96. }
  97. return os.IsNotExist(err)
  98. }
  99. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  100. var f io.WriteCloser
  101. var err error
  102. path := filepath.Join(s.basedir, token)
  103. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  104. return err
  105. }
  106. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  107. return err
  108. }
  109. defer f.Close()
  110. if _, err = io.Copy(f, reader); err != nil {
  111. return err
  112. }
  113. return nil
  114. }
  115. type S3Storage struct {
  116. Storage
  117. bucket string
  118. session *session.Session
  119. s3 *s3.S3
  120. logger *log.Logger
  121. purgeDays time.Duration
  122. noMultipart bool
  123. }
  124. func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
  125. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  126. return &S3Storage{
  127. bucket: bucketName,
  128. s3: s3.New(sess),
  129. session: sess,
  130. logger: logger,
  131. noMultipart: disableMultipart,
  132. purgeDays: time.Duration(purgeDays*24) * time.Hour,
  133. }, nil
  134. }
  135. func (s *S3Storage) Type() string {
  136. return "s3"
  137. }
  138. func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
  139. key := fmt.Sprintf("%s/%s", token, filename)
  140. headRequest := &s3.HeadObjectInput{
  141. Bucket: aws.String(s.bucket),
  142. Key: aws.String(key),
  143. }
  144. // content type , content length
  145. response, err := s.s3.HeadObject(headRequest)
  146. if err != nil {
  147. return
  148. }
  149. if response.ContentLength != nil {
  150. contentLength = uint64(*response.ContentLength)
  151. }
  152. return
  153. }
  154. func (s *S3Storage) Purge(days time.Duration) (err error) {
  155. // NOOP expiration is set at upload time
  156. return nil
  157. }
  158. func (s *S3Storage) IsNotExist(err error) bool {
  159. if err == nil {
  160. return false
  161. }
  162. if aerr, ok := err.(awserr.Error); ok {
  163. switch aerr.Code() {
  164. case s3.ErrCodeNoSuchKey:
  165. return true
  166. }
  167. }
  168. return false
  169. }
  170. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  171. key := fmt.Sprintf("%s/%s", token, filename)
  172. getRequest := &s3.GetObjectInput{
  173. Bucket: aws.String(s.bucket),
  174. Key: aws.String(key),
  175. }
  176. response, err := s.s3.GetObject(getRequest)
  177. if err != nil {
  178. return
  179. }
  180. if response.ContentLength != nil {
  181. contentLength = uint64(*response.ContentLength)
  182. }
  183. reader = response.Body
  184. return
  185. }
  186. func (s *S3Storage) Delete(token string, filename string) (err error) {
  187. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  188. deleteRequest := &s3.DeleteObjectInput{
  189. Bucket: aws.String(s.bucket),
  190. Key: aws.String(metadata),
  191. }
  192. _, err = s.s3.DeleteObject(deleteRequest)
  193. if err != nil {
  194. return
  195. }
  196. key := fmt.Sprintf("%s/%s", token, filename)
  197. deleteRequest = &s3.DeleteObjectInput{
  198. Bucket: aws.String(s.bucket),
  199. Key: aws.String(key),
  200. }
  201. _, err = s.s3.DeleteObject(deleteRequest)
  202. return
  203. }
  204. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  205. key := fmt.Sprintf("%s/%s", token, filename)
  206. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  207. var concurrency int
  208. if !s.noMultipart {
  209. concurrency = 20
  210. } else {
  211. concurrency = 1
  212. }
  213. // Create an uploader with the session and custom options
  214. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  215. u.Concurrency = concurrency // default is 5
  216. u.LeavePartsOnError = false
  217. })
  218. _, err = uploader.Upload(&s3manager.UploadInput{
  219. Bucket: aws.String(s.bucket),
  220. Key: aws.String(key),
  221. Body: reader,
  222. Expires: aws.Time(time.Now().Add(s.purgeDays)),
  223. })
  224. return
  225. }
  226. type GDrive struct {
  227. service *drive.Service
  228. rootId string
  229. basedir string
  230. localConfigPath string
  231. chunkSize int
  232. logger *log.Logger
  233. }
  234. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  235. b, err := ioutil.ReadFile(clientJsonFilepath)
  236. if err != nil {
  237. return nil, err
  238. }
  239. // If modifying these scopes, delete your previously saved client_secret.json.
  240. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  241. if err != nil {
  242. return nil, err
  243. }
  244. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  245. if err != nil {
  246. return nil, err
  247. }
  248. chunkSize = chunkSize * 1024 * 1024
  249. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  250. err = storage.setupRoot()
  251. if err != nil {
  252. return nil, err
  253. }
  254. return storage, nil
  255. }
  256. const GDriveRootConfigFile = "root_id.conf"
  257. const GDriveTokenJsonFile = "token.json"
  258. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  259. func (s *GDrive) setupRoot() error {
  260. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  261. rootId, err := ioutil.ReadFile(rootFileConfig)
  262. if err != nil && !os.IsNotExist(err) {
  263. return err
  264. }
  265. if string(rootId) != "" {
  266. s.rootId = string(rootId)
  267. return nil
  268. }
  269. dir := &drive.File{
  270. Name: s.basedir,
  271. MimeType: GDriveDirectoryMimeType,
  272. }
  273. di, err := s.service.Files.Create(dir).Fields("id").Do()
  274. if err != nil {
  275. return err
  276. }
  277. s.rootId = di.Id
  278. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  279. if err != nil {
  280. return err
  281. }
  282. return nil
  283. }
  284. func (s *GDrive) hasChecksum(f *drive.File) bool {
  285. return f.Md5Checksum != ""
  286. }
  287. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  288. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  289. }
  290. func (s *GDrive) findId(filename string, token string) (string, error) {
  291. filename = strings.Replace(filename, `'`, `\'`, -1)
  292. filename = strings.Replace(filename, `"`, `\"`, -1)
  293. fileId, tokenId, nextPageToken := "", "", ""
  294. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  295. l, err := s.list(nextPageToken, q)
  296. if err != nil {
  297. return "", err
  298. }
  299. for 0 < len(l.Files) {
  300. for _, fi := range l.Files {
  301. tokenId = fi.Id
  302. break
  303. }
  304. if l.NextPageToken == "" {
  305. break
  306. }
  307. l, err = s.list(l.NextPageToken, q)
  308. }
  309. if filename == "" {
  310. return tokenId, nil
  311. } else if tokenId == "" {
  312. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  313. }
  314. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  315. l, err = s.list(nextPageToken, q)
  316. if err != nil {
  317. return "", err
  318. }
  319. for 0 < len(l.Files) {
  320. for _, fi := range l.Files {
  321. fileId = fi.Id
  322. break
  323. }
  324. if l.NextPageToken == "" {
  325. break
  326. }
  327. l, err = s.list(l.NextPageToken, q)
  328. }
  329. if fileId == "" {
  330. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  331. }
  332. return fileId, nil
  333. }
  334. func (s *GDrive) Type() string {
  335. return "gdrive"
  336. }
  337. func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
  338. var fileId string
  339. fileId, err = s.findId(filename, token)
  340. if err != nil {
  341. return
  342. }
  343. var fi *drive.File
  344. if fi, err = s.service.Files.Get(fileId).Fields("size").Do(); err != nil {
  345. return
  346. }
  347. contentLength = uint64(fi.Size)
  348. return
  349. }
  350. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  351. var fileId string
  352. fileId, err = s.findId(filename, token)
  353. if err != nil {
  354. return
  355. }
  356. var fi *drive.File
  357. fi, err = s.service.Files.Get(fileId).Fields("size", "md5Checksum").Do()
  358. if !s.hasChecksum(fi) {
  359. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  360. return
  361. }
  362. contentLength = uint64(fi.Size)
  363. ctx := context.Background()
  364. var res *http.Response
  365. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  366. if err != nil {
  367. return
  368. }
  369. reader = res.Body
  370. return
  371. }
  372. func (s *GDrive) Delete(token string, filename string) (err error) {
  373. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  374. s.service.Files.Delete(metadata).Do()
  375. var fileId string
  376. fileId, err = s.findId(filename, token)
  377. if err != nil {
  378. return
  379. }
  380. err = s.service.Files.Delete(fileId).Do()
  381. return
  382. }
  383. func (s *GDrive) Purge(days time.Duration) (err error) {
  384. nextPageToken := ""
  385. expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
  386. q := fmt.Sprintf("'%s' in parents and modifiedTime < '%s' and mimeType!='%s' and trashed=false", s.rootId, expirationDate, GDriveDirectoryMimeType)
  387. l, err := s.list(nextPageToken, q)
  388. if err != nil {
  389. return err
  390. }
  391. for 0 < len(l.Files) {
  392. for _, fi := range l.Files {
  393. err = s.service.Files.Delete(fi.Id).Do()
  394. if err != nil {
  395. return
  396. }
  397. }
  398. if l.NextPageToken == "" {
  399. break
  400. }
  401. l, err = s.list(l.NextPageToken, q)
  402. }
  403. return
  404. }
  405. func (s *GDrive) IsNotExist(err error) bool {
  406. if err != nil {
  407. if e, ok := err.(*googleapi.Error); ok {
  408. return e.Code == http.StatusNotFound
  409. }
  410. }
  411. return false
  412. }
  413. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  414. dirId, err := s.findId("", token)
  415. if err != nil {
  416. return err
  417. }
  418. if dirId == "" {
  419. dir := &drive.File{
  420. Name: token,
  421. Parents: []string{s.rootId},
  422. MimeType: GDriveDirectoryMimeType,
  423. }
  424. di, err := s.service.Files.Create(dir).Fields("id").Do()
  425. if err != nil {
  426. return err
  427. }
  428. dirId = di.Id
  429. }
  430. // Instantiate empty drive file
  431. dst := &drive.File{
  432. Name: filename,
  433. Parents: []string{dirId},
  434. MimeType: contentType,
  435. }
  436. ctx := context.Background()
  437. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  438. if err != nil {
  439. return err
  440. }
  441. return nil
  442. }
  443. // Retrieve a token, saves the token, then returns the generated client.
  444. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  445. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  446. tok, err := gDriveTokenFromFile(tokenFile)
  447. if err != nil {
  448. tok = getGDriveTokenFromWeb(config, logger)
  449. saveGDriveToken(tokenFile, tok, logger)
  450. }
  451. return config.Client(context.Background(), tok)
  452. }
  453. // Request a token from the web, then returns the retrieved token.
  454. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  455. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  456. fmt.Printf("Go to the following link in your browser then type the "+
  457. "authorization code: \n%v\n", authURL)
  458. var authCode string
  459. if _, err := fmt.Scan(&authCode); err != nil {
  460. logger.Fatalf("Unable to read authorization code %v", err)
  461. }
  462. tok, err := config.Exchange(context.TODO(), authCode)
  463. if err != nil {
  464. logger.Fatalf("Unable to retrieve token from web %v", err)
  465. }
  466. return tok
  467. }
  468. // Retrieves a token from a local file.
  469. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  470. f, err := os.Open(file)
  471. defer f.Close()
  472. if err != nil {
  473. return nil, err
  474. }
  475. tok := &oauth2.Token{}
  476. err = json.NewDecoder(f).Decode(tok)
  477. return tok, err
  478. }
  479. // Saves a token to a file path.
  480. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  481. logger.Printf("Saving credential file to: %s\n", path)
  482. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  483. defer f.Close()
  484. if err != nil {
  485. logger.Fatalf("Unable to cache oauth token: %v", err)
  486. }
  487. json.NewEncoder(f).Encode(token)
  488. }
  489. type StorjStorage struct {
  490. Storage
  491. project *uplink.Project
  492. bucket *uplink.Bucket
  493. purgeDays time.Duration
  494. logger *log.Logger
  495. }
  496. func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
  497. var instance StorjStorage
  498. var err error
  499. ctx := context.TODO()
  500. parsedAccess, err := uplink.ParseAccess(access)
  501. if err != nil {
  502. return nil, err
  503. }
  504. instance.project, err = uplink.OpenProject(ctx, parsedAccess)
  505. if err != nil {
  506. return nil, err
  507. }
  508. instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
  509. if err != nil {
  510. //Ignoring the error to return the one that occurred first, but try to clean up.
  511. _ = instance.project.Close()
  512. return nil, err
  513. }
  514. instance.purgeDays = time.Duration(purgeDays*24) * time.Hour
  515. instance.logger = logger
  516. return &instance, nil
  517. }
  518. func (s *StorjStorage) Type() string {
  519. return "storj"
  520. }
  521. func (s *StorjStorage) Head(token string, filename string) (contentLength uint64, err error) {
  522. key := storj.JoinPaths(token, filename)
  523. ctx := context.TODO()
  524. obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
  525. if err != nil {
  526. return 0, err
  527. }
  528. contentLength = uint64(obj.System.ContentLength)
  529. return
  530. }
  531. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  532. key := storj.JoinPaths(token, filename)
  533. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  534. ctx := context.TODO()
  535. download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
  536. if err != nil {
  537. return nil, 0, err
  538. }
  539. contentLength = uint64(download.Info().System.ContentLength)
  540. reader = download
  541. return
  542. }
  543. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  544. key := storj.JoinPaths(token, filename)
  545. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  546. ctx := context.TODO()
  547. _, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
  548. return
  549. }
  550. func (s *StorjStorage) Purge(days time.Duration) (err error) {
  551. // NOOP expiration is set at upload time
  552. return nil
  553. }
  554. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  555. key := storj.JoinPaths(token, filename)
  556. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  557. ctx := context.TODO()
  558. writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)})
  559. if err != nil {
  560. return err
  561. }
  562. n, err := io.Copy(writer, reader)
  563. if err != nil || uint64(n) != contentLength {
  564. //Ignoring the error to return the one that occurred first, but try to clean up.
  565. _ = writer.Abort()
  566. return err
  567. }
  568. err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
  569. if err != nil {
  570. //Ignoring the error to return the one that occurred first, but try to clean up.
  571. _ = writer.Abort()
  572. return err
  573. }
  574. err = writer.Commit()
  575. return err
  576. }
  577. func (s *StorjStorage) IsNotExist(err error) bool {
  578. return errors.Is(err, uplink.ErrObjectNotFound)
  579. }