Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 

682 lignes
16 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "mime"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  18. "github.com/zeebo/errs"
  19. "golang.org/x/net/context"
  20. "golang.org/x/oauth2"
  21. "golang.org/x/oauth2/google"
  22. "google.golang.org/api/drive/v3"
  23. "google.golang.org/api/googleapi"
  24. "storj.io/storj/lib/uplink"
  25. "storj.io/storj/pkg/storj"
  26. )
  27. type Storage interface {
  28. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  29. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  30. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  31. Delete(token string, filename string) error
  32. IsNotExist(err error) bool
  33. Type() string
  34. }
  35. type LocalStorage struct {
  36. Storage
  37. basedir string
  38. logger *log.Logger
  39. }
  40. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  41. return &LocalStorage{basedir: basedir, logger: logger}, nil
  42. }
  43. func (s *LocalStorage) Type() string {
  44. return "local"
  45. }
  46. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  47. path := filepath.Join(s.basedir, token, filename)
  48. var fi os.FileInfo
  49. if fi, err = os.Lstat(path); err != nil {
  50. return
  51. }
  52. contentLength = uint64(fi.Size())
  53. contentType = mime.TypeByExtension(filepath.Ext(filename))
  54. return
  55. }
  56. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  57. path := filepath.Join(s.basedir, token, filename)
  58. // content type , content length
  59. if reader, err = os.Open(path); err != nil {
  60. return
  61. }
  62. var fi os.FileInfo
  63. if fi, err = os.Lstat(path); err != nil {
  64. return
  65. }
  66. contentLength = uint64(fi.Size())
  67. contentType = mime.TypeByExtension(filepath.Ext(filename))
  68. return
  69. }
  70. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  71. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  72. os.Remove(metadata)
  73. path := filepath.Join(s.basedir, token, filename)
  74. err = os.Remove(path)
  75. return
  76. }
  77. func (s *LocalStorage) IsNotExist(err error) bool {
  78. if err == nil {
  79. return false
  80. }
  81. return os.IsNotExist(err)
  82. }
  83. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  84. var f io.WriteCloser
  85. var err error
  86. path := filepath.Join(s.basedir, token)
  87. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  88. return err
  89. }
  90. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  91. return err
  92. }
  93. defer f.Close()
  94. if _, err = io.Copy(f, reader); err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. type S3Storage struct {
  100. Storage
  101. bucket string
  102. session *session.Session
  103. s3 *s3.S3
  104. logger *log.Logger
  105. noMultipart bool
  106. }
  107. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  108. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  109. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  110. }
  111. func (s *S3Storage) Type() string {
  112. return "s3"
  113. }
  114. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  115. key := fmt.Sprintf("%s/%s", token, filename)
  116. headRequest := &s3.HeadObjectInput{
  117. Bucket: aws.String(s.bucket),
  118. Key: aws.String(key),
  119. }
  120. // content type , content length
  121. response, err := s.s3.HeadObject(headRequest)
  122. if err != nil {
  123. return
  124. }
  125. if response.ContentType != nil {
  126. contentType = *response.ContentType
  127. }
  128. if response.ContentLength != nil {
  129. contentLength = uint64(*response.ContentLength)
  130. }
  131. return
  132. }
  133. func (s *S3Storage) IsNotExist(err error) bool {
  134. if err == nil {
  135. return false
  136. }
  137. if aerr, ok := err.(awserr.Error); ok {
  138. switch aerr.Code() {
  139. case s3.ErrCodeNoSuchKey:
  140. return true
  141. }
  142. }
  143. return false
  144. }
  145. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  146. key := fmt.Sprintf("%s/%s", token, filename)
  147. getRequest := &s3.GetObjectInput{
  148. Bucket: aws.String(s.bucket),
  149. Key: aws.String(key),
  150. }
  151. response, err := s.s3.GetObject(getRequest)
  152. if err != nil {
  153. return
  154. }
  155. if response.ContentType != nil {
  156. contentType = *response.ContentType
  157. }
  158. if response.ContentLength != nil {
  159. contentLength = uint64(*response.ContentLength)
  160. }
  161. reader = response.Body
  162. return
  163. }
  164. func (s *S3Storage) Delete(token string, filename string) (err error) {
  165. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  166. deleteRequest := &s3.DeleteObjectInput{
  167. Bucket: aws.String(s.bucket),
  168. Key: aws.String(metadata),
  169. }
  170. _, err = s.s3.DeleteObject(deleteRequest)
  171. if err != nil {
  172. return
  173. }
  174. key := fmt.Sprintf("%s/%s", token, filename)
  175. deleteRequest = &s3.DeleteObjectInput{
  176. Bucket: aws.String(s.bucket),
  177. Key: aws.String(key),
  178. }
  179. _, err = s.s3.DeleteObject(deleteRequest)
  180. return
  181. }
  182. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  183. key := fmt.Sprintf("%s/%s", token, filename)
  184. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  185. var concurrency int
  186. if !s.noMultipart {
  187. concurrency = 20
  188. } else {
  189. concurrency = 1
  190. }
  191. // Create an uploader with the session and custom options
  192. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  193. u.Concurrency = concurrency // default is 5
  194. u.LeavePartsOnError = false
  195. })
  196. _, err = uploader.Upload(&s3manager.UploadInput{
  197. Bucket: aws.String(s.bucket),
  198. Key: aws.String(key),
  199. Body: reader,
  200. })
  201. return
  202. }
  203. type GDrive struct {
  204. service *drive.Service
  205. rootId string
  206. basedir string
  207. localConfigPath string
  208. chunkSize int
  209. logger *log.Logger
  210. }
  211. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  212. b, err := ioutil.ReadFile(clientJsonFilepath)
  213. if err != nil {
  214. return nil, err
  215. }
  216. // If modifying these scopes, delete your previously saved client_secret.json.
  217. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  218. if err != nil {
  219. return nil, err
  220. }
  221. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  222. if err != nil {
  223. return nil, err
  224. }
  225. chunkSize = chunkSize * 1024 * 1024
  226. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  227. err = storage.setupRoot()
  228. if err != nil {
  229. return nil, err
  230. }
  231. return storage, nil
  232. }
  233. const GDriveRootConfigFile = "root_id.conf"
  234. const GDriveTokenJsonFile = "token.json"
  235. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  236. func (s *GDrive) setupRoot() error {
  237. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  238. rootId, err := ioutil.ReadFile(rootFileConfig)
  239. if err != nil && !os.IsNotExist(err) {
  240. return err
  241. }
  242. if string(rootId) != "" {
  243. s.rootId = string(rootId)
  244. return nil
  245. }
  246. dir := &drive.File{
  247. Name: s.basedir,
  248. MimeType: GDriveDirectoryMimeType,
  249. }
  250. di, err := s.service.Files.Create(dir).Fields("id").Do()
  251. if err != nil {
  252. return err
  253. }
  254. s.rootId = di.Id
  255. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  256. if err != nil {
  257. return err
  258. }
  259. return nil
  260. }
  261. func (s *GDrive) hasChecksum(f *drive.File) bool {
  262. return f.Md5Checksum != ""
  263. }
  264. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  265. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  266. }
  267. func (s *GDrive) findId(filename string, token string) (string, error) {
  268. filename = strings.Replace(filename, `'`, `\'`, -1)
  269. filename = strings.Replace(filename, `"`, `\"`, -1)
  270. fileId, tokenId, nextPageToken := "", "", ""
  271. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  272. l, err := s.list(nextPageToken, q)
  273. if err != nil {
  274. return "", err
  275. }
  276. for 0 < len(l.Files) {
  277. for _, fi := range l.Files {
  278. tokenId = fi.Id
  279. break
  280. }
  281. if l.NextPageToken == "" {
  282. break
  283. }
  284. l, err = s.list(l.NextPageToken, q)
  285. }
  286. if filename == "" {
  287. return tokenId, nil
  288. } else if tokenId == "" {
  289. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  290. }
  291. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  292. l, err = s.list(nextPageToken, q)
  293. if err != nil {
  294. return "", err
  295. }
  296. for 0 < len(l.Files) {
  297. for _, fi := range l.Files {
  298. fileId = fi.Id
  299. break
  300. }
  301. if l.NextPageToken == "" {
  302. break
  303. }
  304. l, err = s.list(l.NextPageToken, q)
  305. }
  306. if fileId == "" {
  307. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  308. }
  309. return fileId, nil
  310. }
  311. func (s *GDrive) Type() string {
  312. return "gdrive"
  313. }
  314. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  315. var fileId string
  316. fileId, err = s.findId(filename, token)
  317. if err != nil {
  318. return
  319. }
  320. var fi *drive.File
  321. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  322. return
  323. }
  324. contentLength = uint64(fi.Size)
  325. contentType = fi.MimeType
  326. return
  327. }
  328. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  329. var fileId string
  330. fileId, err = s.findId(filename, token)
  331. if err != nil {
  332. return
  333. }
  334. var fi *drive.File
  335. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  336. if !s.hasChecksum(fi) {
  337. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  338. return
  339. }
  340. contentLength = uint64(fi.Size)
  341. contentType = fi.MimeType
  342. ctx := context.Background()
  343. var res *http.Response
  344. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  345. if err != nil {
  346. return
  347. }
  348. reader = res.Body
  349. return
  350. }
  351. func (s *GDrive) Delete(token string, filename string) (err error) {
  352. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  353. s.service.Files.Delete(metadata).Do()
  354. var fileId string
  355. fileId, err = s.findId(filename, token)
  356. if err != nil {
  357. return
  358. }
  359. err = s.service.Files.Delete(fileId).Do()
  360. return
  361. }
  362. func (s *GDrive) IsNotExist(err error) bool {
  363. if err != nil {
  364. if e, ok := err.(*googleapi.Error); ok {
  365. return e.Code == http.StatusNotFound
  366. }
  367. }
  368. return false
  369. }
  370. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  371. dirId, err := s.findId("", token)
  372. if err != nil {
  373. return err
  374. }
  375. if dirId == "" {
  376. dir := &drive.File{
  377. Name: token,
  378. Parents: []string{s.rootId},
  379. MimeType: GDriveDirectoryMimeType,
  380. }
  381. di, err := s.service.Files.Create(dir).Fields("id").Do()
  382. if err != nil {
  383. return err
  384. }
  385. dirId = di.Id
  386. }
  387. // Instantiate empty drive file
  388. dst := &drive.File{
  389. Name: filename,
  390. Parents: []string{dirId},
  391. MimeType: contentType,
  392. }
  393. ctx := context.Background()
  394. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  395. if err != nil {
  396. return err
  397. }
  398. return nil
  399. }
  400. // Retrieve a token, saves the token, then returns the generated client.
  401. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  402. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  403. tok, err := gDriveTokenFromFile(tokenFile)
  404. if err != nil {
  405. tok = getGDriveTokenFromWeb(config, logger)
  406. saveGDriveToken(tokenFile, tok, logger)
  407. }
  408. return config.Client(context.Background(), tok)
  409. }
  410. // Request a token from the web, then returns the retrieved token.
  411. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  412. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  413. fmt.Printf("Go to the following link in your browser then type the "+
  414. "authorization code: \n%v\n", authURL)
  415. var authCode string
  416. if _, err := fmt.Scan(&authCode); err != nil {
  417. logger.Fatalf("Unable to read authorization code %v", err)
  418. }
  419. tok, err := config.Exchange(context.TODO(), authCode)
  420. if err != nil {
  421. logger.Fatalf("Unable to retrieve token from web %v", err)
  422. }
  423. return tok
  424. }
  425. // Retrieves a token from a local file.
  426. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  427. f, err := os.Open(file)
  428. defer f.Close()
  429. if err != nil {
  430. return nil, err
  431. }
  432. tok := &oauth2.Token{}
  433. err = json.NewDecoder(f).Decode(tok)
  434. return tok, err
  435. }
  436. // Saves a token to a file path.
  437. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  438. logger.Printf("Saving credential file to: %s\n", path)
  439. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  440. defer f.Close()
  441. if err != nil {
  442. logger.Fatalf("Unable to cache oauth token: %v", err)
  443. }
  444. json.NewEncoder(f).Encode(token)
  445. }
  446. var uplinkFailure = errs.Class("storj failure")
  447. type StorjStorage struct {
  448. Storage
  449. uplink *uplink.Uplink
  450. project *uplink.Project
  451. bucket *uplink.Bucket
  452. logger *log.Logger
  453. }
  454. func NewStorjStorage(endpoint, apiKey, bucket, encKey string, skipCA bool, logger *log.Logger) (*StorjStorage, error) {
  455. var instance StorjStorage
  456. var err error
  457. ctx := context.TODO()
  458. config := uplink.Config{}
  459. if skipCA {
  460. config.Volatile.TLS.SkipPeerCAWhitelist = true
  461. }
  462. instance.uplink, err = uplink.NewUplink(ctx, &config)
  463. if err != nil {
  464. return nil, uplinkFailure.Wrap(err)
  465. }
  466. key, err := uplink.ParseAPIKey(apiKey)
  467. if err != nil {
  468. return nil, uplinkFailure.Wrap(err)
  469. }
  470. instance.project, err = instance.uplink.OpenProject(ctx, endpoint, key)
  471. if err != nil {
  472. return nil, uplinkFailure.Wrap(err)
  473. }
  474. saltEncKey, err := instance.project.SaltedKeyFromPassphrase(ctx, encKey)
  475. if err != nil {
  476. return nil, uplinkFailure.Wrap(err)
  477. }
  478. access := uplink.NewEncryptionAccessWithDefaultKey(*saltEncKey)
  479. instance.bucket, err = instance.project.OpenBucket(ctx, bucket, access)
  480. if err != nil {
  481. return nil, uplinkFailure.Wrap(err)
  482. }
  483. instance.logger = logger
  484. return &instance, nil
  485. }
  486. func (s *StorjStorage) Type() string {
  487. return "storj"
  488. }
  489. func (s *StorjStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  490. key := storj.JoinPaths(token, filename)
  491. ctx := context.TODO()
  492. obj, err := s.bucket.OpenObject(ctx, key)
  493. if err != nil {
  494. return "", 0, uplinkFailure.Wrap(err)
  495. }
  496. contentType = obj.Meta.ContentType
  497. contentLength = uint64(obj.Meta.Size)
  498. return
  499. }
  500. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  501. key := storj.JoinPaths(token, filename)
  502. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  503. ctx := context.TODO()
  504. obj, err := s.bucket.OpenObject(ctx, key)
  505. if err != nil {
  506. return nil, "", 0, uplinkFailure.Wrap(err)
  507. }
  508. contentType = obj.Meta.ContentType
  509. contentLength = uint64(obj.Meta.Size)
  510. reader, err = obj.DownloadRange(ctx, 0, -1)
  511. return
  512. }
  513. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  514. key := storj.JoinPaths(token, filename)
  515. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  516. ctx := context.TODO()
  517. err = s.bucket.DeleteObject(ctx, key)
  518. return
  519. }
  520. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  521. key := storj.JoinPaths(token, filename)
  522. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  523. ctx := context.TODO()
  524. err = s.bucket.UploadObject(ctx, key, reader, &uplink.UploadOptions{ContentType: contentType})
  525. if err != nil {
  526. return uplinkFailure.Wrap(err)
  527. }
  528. return nil
  529. }