25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 

675 satır
16 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "mime"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  18. "github.com/zeebo/errs"
  19. "golang.org/x/net/context"
  20. "golang.org/x/oauth2"
  21. "golang.org/x/oauth2/google"
  22. "google.golang.org/api/drive/v3"
  23. "google.golang.org/api/googleapi"
  24. "storj.io/storj/lib/uplink"
  25. )
  26. type Storage interface {
  27. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  28. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  29. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  30. Delete(token string, filename string) error
  31. IsNotExist(err error) bool
  32. Type() string
  33. }
  34. type LocalStorage struct {
  35. Storage
  36. basedir string
  37. logger *log.Logger
  38. }
  39. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  40. return &LocalStorage{basedir: basedir, logger: logger}, nil
  41. }
  42. func (s *LocalStorage) Type() string {
  43. return "local"
  44. }
  45. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  46. path := filepath.Join(s.basedir, token, filename)
  47. var fi os.FileInfo
  48. if fi, err = os.Lstat(path); err != nil {
  49. return
  50. }
  51. contentLength = uint64(fi.Size())
  52. contentType = mime.TypeByExtension(filepath.Ext(filename))
  53. return
  54. }
  55. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  56. path := filepath.Join(s.basedir, token, filename)
  57. // content type , content length
  58. if reader, err = os.Open(path); err != nil {
  59. return
  60. }
  61. var fi os.FileInfo
  62. if fi, err = os.Lstat(path); err != nil {
  63. return
  64. }
  65. contentLength = uint64(fi.Size())
  66. contentType = mime.TypeByExtension(filepath.Ext(filename))
  67. return
  68. }
  69. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  70. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  71. os.Remove(metadata)
  72. path := filepath.Join(s.basedir, token, filename)
  73. err = os.Remove(path)
  74. return
  75. }
  76. func (s *LocalStorage) IsNotExist(err error) bool {
  77. if err == nil {
  78. return false
  79. }
  80. return os.IsNotExist(err)
  81. }
  82. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  83. var f io.WriteCloser
  84. var err error
  85. path := filepath.Join(s.basedir, token)
  86. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  87. return err
  88. }
  89. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  90. return err
  91. }
  92. defer f.Close()
  93. if _, err = io.Copy(f, reader); err != nil {
  94. return err
  95. }
  96. return nil
  97. }
  98. type S3Storage struct {
  99. Storage
  100. bucket string
  101. session *session.Session
  102. s3 *s3.S3
  103. logger *log.Logger
  104. noMultipart bool
  105. }
  106. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  107. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  108. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  109. }
  110. func (s *S3Storage) Type() string {
  111. return "s3"
  112. }
  113. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  114. key := fmt.Sprintf("%s/%s", token, filename)
  115. headRequest := &s3.HeadObjectInput{
  116. Bucket: aws.String(s.bucket),
  117. Key: aws.String(key),
  118. }
  119. // content type , content length
  120. response, err := s.s3.HeadObject(headRequest)
  121. if err != nil {
  122. return
  123. }
  124. if response.ContentType != nil {
  125. contentType = *response.ContentType
  126. }
  127. if response.ContentLength != nil {
  128. contentLength = uint64(*response.ContentLength)
  129. }
  130. return
  131. }
  132. func (s *S3Storage) IsNotExist(err error) bool {
  133. if err == nil {
  134. return false
  135. }
  136. if aerr, ok := err.(awserr.Error); ok {
  137. switch aerr.Code() {
  138. case s3.ErrCodeNoSuchKey:
  139. return true
  140. }
  141. }
  142. return false
  143. }
  144. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  145. key := fmt.Sprintf("%s/%s", token, filename)
  146. getRequest := &s3.GetObjectInput{
  147. Bucket: aws.String(s.bucket),
  148. Key: aws.String(key),
  149. }
  150. response, err := s.s3.GetObject(getRequest)
  151. if err != nil {
  152. return
  153. }
  154. if response.ContentType != nil {
  155. contentType = *response.ContentType
  156. }
  157. if response.ContentLength != nil {
  158. contentLength = uint64(*response.ContentLength)
  159. }
  160. reader = response.Body
  161. return
  162. }
  163. func (s *S3Storage) Delete(token string, filename string) (err error) {
  164. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  165. deleteRequest := &s3.DeleteObjectInput{
  166. Bucket: aws.String(s.bucket),
  167. Key: aws.String(metadata),
  168. }
  169. _, err = s.s3.DeleteObject(deleteRequest)
  170. if err != nil {
  171. return
  172. }
  173. key := fmt.Sprintf("%s/%s", token, filename)
  174. deleteRequest = &s3.DeleteObjectInput{
  175. Bucket: aws.String(s.bucket),
  176. Key: aws.String(key),
  177. }
  178. _, err = s.s3.DeleteObject(deleteRequest)
  179. return
  180. }
  181. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  182. key := fmt.Sprintf("%s/%s", token, filename)
  183. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  184. var concurrency int
  185. if !s.noMultipart {
  186. concurrency = 20
  187. } else {
  188. concurrency = 1
  189. }
  190. // Create an uploader with the session and custom options
  191. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  192. u.Concurrency = concurrency // default is 5
  193. u.LeavePartsOnError = false
  194. })
  195. _, err = uploader.Upload(&s3manager.UploadInput{
  196. Bucket: aws.String(s.bucket),
  197. Key: aws.String(key),
  198. Body: reader,
  199. })
  200. return
  201. }
  202. type GDrive struct {
  203. service *drive.Service
  204. rootId string
  205. basedir string
  206. localConfigPath string
  207. chunkSize int
  208. logger *log.Logger
  209. }
  210. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  211. b, err := ioutil.ReadFile(clientJsonFilepath)
  212. if err != nil {
  213. return nil, err
  214. }
  215. // If modifying these scopes, delete your previously saved client_secret.json.
  216. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  217. if err != nil {
  218. return nil, err
  219. }
  220. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  221. if err != nil {
  222. return nil, err
  223. }
  224. chunkSize = chunkSize * 1024 * 1024
  225. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  226. err = storage.setupRoot()
  227. if err != nil {
  228. return nil, err
  229. }
  230. return storage, nil
  231. }
  232. const GDriveRootConfigFile = "root_id.conf"
  233. const GDriveTokenJsonFile = "token.json"
  234. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  235. func (s *GDrive) setupRoot() error {
  236. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  237. rootId, err := ioutil.ReadFile(rootFileConfig)
  238. if err != nil && !os.IsNotExist(err) {
  239. return err
  240. }
  241. if string(rootId) != "" {
  242. s.rootId = string(rootId)
  243. return nil
  244. }
  245. dir := &drive.File{
  246. Name: s.basedir,
  247. MimeType: GDriveDirectoryMimeType,
  248. }
  249. di, err := s.service.Files.Create(dir).Fields("id").Do()
  250. if err != nil {
  251. return err
  252. }
  253. s.rootId = di.Id
  254. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  255. if err != nil {
  256. return err
  257. }
  258. return nil
  259. }
  260. func (s *GDrive) hasChecksum(f *drive.File) bool {
  261. return f.Md5Checksum != ""
  262. }
  263. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  264. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  265. }
  266. func (s *GDrive) findId(filename string, token string) (string, error) {
  267. filename = strings.Replace(filename, `'`, `\'`, -1)
  268. filename = strings.Replace(filename, `"`, `\"`, -1)
  269. fileId, tokenId, nextPageToken := "", "", ""
  270. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  271. l, err := s.list(nextPageToken, q)
  272. if err != nil {
  273. return "", err
  274. }
  275. for 0 < len(l.Files) {
  276. for _, fi := range l.Files {
  277. tokenId = fi.Id
  278. break
  279. }
  280. if l.NextPageToken == "" {
  281. break
  282. }
  283. l, err = s.list(l.NextPageToken, q)
  284. }
  285. if filename == "" {
  286. return tokenId, nil
  287. } else if tokenId == "" {
  288. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  289. }
  290. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  291. l, err = s.list(nextPageToken, q)
  292. if err != nil {
  293. return "", err
  294. }
  295. for 0 < len(l.Files) {
  296. for _, fi := range l.Files {
  297. fileId = fi.Id
  298. break
  299. }
  300. if l.NextPageToken == "" {
  301. break
  302. }
  303. l, err = s.list(l.NextPageToken, q)
  304. }
  305. if fileId == "" {
  306. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  307. }
  308. return fileId, nil
  309. }
  310. func (s *GDrive) Type() string {
  311. return "gdrive"
  312. }
  313. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  314. var fileId string
  315. fileId, err = s.findId(filename, token)
  316. if err != nil {
  317. return
  318. }
  319. var fi *drive.File
  320. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  321. return
  322. }
  323. contentLength = uint64(fi.Size)
  324. contentType = fi.MimeType
  325. return
  326. }
  327. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  328. var fileId string
  329. fileId, err = s.findId(filename, token)
  330. if err != nil {
  331. return
  332. }
  333. var fi *drive.File
  334. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  335. if !s.hasChecksum(fi) {
  336. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  337. return
  338. }
  339. contentLength = uint64(fi.Size)
  340. contentType = fi.MimeType
  341. ctx := context.Background()
  342. var res *http.Response
  343. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  344. if err != nil {
  345. return
  346. }
  347. reader = res.Body
  348. return
  349. }
  350. func (s *GDrive) Delete(token string, filename string) (err error) {
  351. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  352. s.service.Files.Delete(metadata).Do()
  353. var fileId string
  354. fileId, err = s.findId(filename, token)
  355. if err != nil {
  356. return
  357. }
  358. err = s.service.Files.Delete(fileId).Do()
  359. return
  360. }
  361. func (s *GDrive) IsNotExist(err error) bool {
  362. if err != nil {
  363. if e, ok := err.(*googleapi.Error); ok {
  364. return e.Code == http.StatusNotFound
  365. }
  366. }
  367. return false
  368. }
  369. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  370. dirId, err := s.findId("", token)
  371. if err != nil {
  372. return err
  373. }
  374. if dirId == "" {
  375. dir := &drive.File{
  376. Name: token,
  377. Parents: []string{s.rootId},
  378. MimeType: GDriveDirectoryMimeType,
  379. }
  380. di, err := s.service.Files.Create(dir).Fields("id").Do()
  381. if err != nil {
  382. return err
  383. }
  384. dirId = di.Id
  385. }
  386. // Instantiate empty drive file
  387. dst := &drive.File{
  388. Name: filename,
  389. Parents: []string{dirId},
  390. MimeType: contentType,
  391. }
  392. ctx := context.Background()
  393. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  394. if err != nil {
  395. return err
  396. }
  397. return nil
  398. }
  399. // Retrieve a token, saves the token, then returns the generated client.
  400. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  401. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  402. tok, err := gDriveTokenFromFile(tokenFile)
  403. if err != nil {
  404. tok = getGDriveTokenFromWeb(config, logger)
  405. saveGDriveToken(tokenFile, tok, logger)
  406. }
  407. return config.Client(context.Background(), tok)
  408. }
  409. // Request a token from the web, then returns the retrieved token.
  410. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  411. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  412. fmt.Printf("Go to the following link in your browser then type the "+
  413. "authorization code: \n%v\n", authURL)
  414. var authCode string
  415. if _, err := fmt.Scan(&authCode); err != nil {
  416. logger.Fatalf("Unable to read authorization code %v", err)
  417. }
  418. tok, err := config.Exchange(context.TODO(), authCode)
  419. if err != nil {
  420. logger.Fatalf("Unable to retrieve token from web %v", err)
  421. }
  422. return tok
  423. }
  424. // Retrieves a token from a local file.
  425. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  426. f, err := os.Open(file)
  427. defer f.Close()
  428. if err != nil {
  429. return nil, err
  430. }
  431. tok := &oauth2.Token{}
  432. err = json.NewDecoder(f).Decode(tok)
  433. return tok, err
  434. }
  435. // Saves a token to a file path.
  436. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  437. logger.Printf("Saving credential file to: %s\n", path)
  438. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  439. defer f.Close()
  440. if err != nil {
  441. logger.Fatalf("Unable to cache oauth token: %v", err)
  442. }
  443. json.NewEncoder(f).Encode(token)
  444. }
  445. var uplinkFailure = errs.Class("storj failure")
  446. type StorjStorage struct {
  447. Storage
  448. uplink *uplink.Uplink
  449. project *uplink.Project
  450. bucket *uplink.Bucket
  451. logger *log.Logger
  452. }
  453. func NewStorjStorage(scope, bucket string, skipCA bool, logger *log.Logger) (*StorjStorage, error) {
  454. var instance StorjStorage
  455. var err error
  456. ctx := context.TODO()
  457. config := uplink.Config{}
  458. if skipCA {
  459. config.Volatile.TLS.SkipPeerCAWhitelist = true
  460. }
  461. parsedScope, err := uplink.ParseScope(scope)
  462. if err != nil {
  463. return nil, uplinkFailure.Wrap(err)
  464. }
  465. instance.uplink, err = uplink.NewUplink(ctx, &config)
  466. if err != nil {
  467. return nil, uplinkFailure.Wrap(err)
  468. }
  469. instance.project, err = instance.uplink.OpenProject(ctx, parsedScope.SatelliteAddr, parsedScope.APIKey)
  470. if err != nil {
  471. return nil, uplinkFailure.Wrap(err)
  472. }
  473. instance.bucket, err = instance.project.OpenBucket(ctx, bucket, parsedScope.EncryptionAccess)
  474. if err != nil {
  475. return nil, uplinkFailure.Wrap(err)
  476. }
  477. instance.logger = logger
  478. return &instance, nil
  479. }
  480. func (s *StorjStorage) Type() string {
  481. return "storj"
  482. }
  483. func (s *StorjStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  484. key := filepath.Join(token, filename)
  485. ctx := context.TODO()
  486. obj, err := s.bucket.OpenObject(ctx, key)
  487. if err != nil {
  488. return "", 0, uplinkFailure.Wrap(err)
  489. }
  490. contentType = obj.Meta.ContentType
  491. contentLength = uint64(obj.Meta.Size)
  492. return
  493. }
  494. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  495. key := filepath.Join(token, filename)
  496. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  497. ctx := context.TODO()
  498. obj, err := s.bucket.OpenObject(ctx, key)
  499. if err != nil {
  500. return nil, "", 0, uplinkFailure.Wrap(err)
  501. }
  502. contentType = obj.Meta.ContentType
  503. contentLength = uint64(obj.Meta.Size)
  504. reader, err = obj.DownloadRange(ctx, 0, -1)
  505. return
  506. }
  507. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  508. key := filepath.Join(token, filename)
  509. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  510. ctx := context.TODO()
  511. err = s.bucket.DeleteObject(ctx, key)
  512. return
  513. }
  514. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  515. key := filepath.Join(token, filename)
  516. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  517. ctx := context.TODO()
  518. err = s.bucket.UploadObject(ctx, key, reader, &uplink.UploadOptions{ContentType: contentType})
  519. if err != nil {
  520. return uplinkFailure.Wrap(err)
  521. }
  522. return nil
  523. }