Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 

688 wiersze
16 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "mime"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  18. "golang.org/x/net/context"
  19. "golang.org/x/oauth2"
  20. "golang.org/x/oauth2/google"
  21. "google.golang.org/api/drive/v3"
  22. "google.golang.org/api/googleapi"
  23. "storj.io/common/storj"
  24. "storj.io/uplink"
  25. )
  26. type Storage interface {
  27. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  28. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  29. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  30. Delete(token string, filename string) error
  31. IsNotExist(err error) bool
  32. Type() string
  33. }
  34. type LocalStorage struct {
  35. Storage
  36. basedir string
  37. logger *log.Logger
  38. }
  39. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  40. return &LocalStorage{basedir: basedir, logger: logger}, nil
  41. }
  42. func (s *LocalStorage) Type() string {
  43. return "local"
  44. }
  45. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  46. path := filepath.Join(s.basedir, token, filename)
  47. var fi os.FileInfo
  48. if fi, err = os.Lstat(path); err != nil {
  49. return
  50. }
  51. contentLength = uint64(fi.Size())
  52. contentType = mime.TypeByExtension(filepath.Ext(filename))
  53. return
  54. }
  55. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  56. path := filepath.Join(s.basedir, token, filename)
  57. // content type , content length
  58. if reader, err = os.Open(path); err != nil {
  59. return
  60. }
  61. var fi os.FileInfo
  62. if fi, err = os.Lstat(path); err != nil {
  63. return
  64. }
  65. contentLength = uint64(fi.Size())
  66. contentType = mime.TypeByExtension(filepath.Ext(filename))
  67. return
  68. }
  69. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  70. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  71. os.Remove(metadata)
  72. path := filepath.Join(s.basedir, token, filename)
  73. err = os.Remove(path)
  74. return
  75. }
  76. func (s *LocalStorage) IsNotExist(err error) bool {
  77. if err == nil {
  78. return false
  79. }
  80. return os.IsNotExist(err)
  81. }
  82. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  83. var f io.WriteCloser
  84. var err error
  85. path := filepath.Join(s.basedir, token)
  86. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  87. return err
  88. }
  89. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  90. return err
  91. }
  92. defer f.Close()
  93. if _, err = io.Copy(f, reader); err != nil {
  94. return err
  95. }
  96. return nil
  97. }
  98. type S3Storage struct {
  99. Storage
  100. bucket string
  101. session *session.Session
  102. s3 *s3.S3
  103. logger *log.Logger
  104. noMultipart bool
  105. }
  106. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  107. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  108. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  109. }
  110. func (s *S3Storage) Type() string {
  111. return "s3"
  112. }
  113. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  114. key := fmt.Sprintf("%s/%s", token, filename)
  115. headRequest := &s3.HeadObjectInput{
  116. Bucket: aws.String(s.bucket),
  117. Key: aws.String(key),
  118. }
  119. // content type , content length
  120. response, err := s.s3.HeadObject(headRequest)
  121. if err != nil {
  122. return
  123. }
  124. if response.ContentType != nil {
  125. contentType = *response.ContentType
  126. }
  127. if response.ContentLength != nil {
  128. contentLength = uint64(*response.ContentLength)
  129. }
  130. return
  131. }
  132. func (s *S3Storage) IsNotExist(err error) bool {
  133. if err == nil {
  134. return false
  135. }
  136. if aerr, ok := err.(awserr.Error); ok {
  137. switch aerr.Code() {
  138. case s3.ErrCodeNoSuchKey:
  139. return true
  140. }
  141. }
  142. return false
  143. }
  144. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  145. key := fmt.Sprintf("%s/%s", token, filename)
  146. getRequest := &s3.GetObjectInput{
  147. Bucket: aws.String(s.bucket),
  148. Key: aws.String(key),
  149. }
  150. response, err := s.s3.GetObject(getRequest)
  151. if err != nil {
  152. return
  153. }
  154. if response.ContentType != nil {
  155. contentType = *response.ContentType
  156. }
  157. if response.ContentLength != nil {
  158. contentLength = uint64(*response.ContentLength)
  159. }
  160. reader = response.Body
  161. return
  162. }
  163. func (s *S3Storage) Delete(token string, filename string) (err error) {
  164. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  165. deleteRequest := &s3.DeleteObjectInput{
  166. Bucket: aws.String(s.bucket),
  167. Key: aws.String(metadata),
  168. }
  169. _, err = s.s3.DeleteObject(deleteRequest)
  170. if err != nil {
  171. return
  172. }
  173. key := fmt.Sprintf("%s/%s", token, filename)
  174. deleteRequest = &s3.DeleteObjectInput{
  175. Bucket: aws.String(s.bucket),
  176. Key: aws.String(key),
  177. }
  178. _, err = s.s3.DeleteObject(deleteRequest)
  179. return
  180. }
  181. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  182. key := fmt.Sprintf("%s/%s", token, filename)
  183. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  184. var concurrency int
  185. if !s.noMultipart {
  186. concurrency = 20
  187. } else {
  188. concurrency = 1
  189. }
  190. // Create an uploader with the session and custom options
  191. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  192. u.Concurrency = concurrency // default is 5
  193. u.LeavePartsOnError = false
  194. })
  195. _, err = uploader.Upload(&s3manager.UploadInput{
  196. Bucket: aws.String(s.bucket),
  197. Key: aws.String(key),
  198. Body: reader,
  199. })
  200. return
  201. }
  202. type GDrive struct {
  203. service *drive.Service
  204. rootId string
  205. basedir string
  206. localConfigPath string
  207. chunkSize int
  208. logger *log.Logger
  209. }
  210. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  211. b, err := ioutil.ReadFile(clientJsonFilepath)
  212. if err != nil {
  213. return nil, err
  214. }
  215. // If modifying these scopes, delete your previously saved client_secret.json.
  216. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  217. if err != nil {
  218. return nil, err
  219. }
  220. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  221. if err != nil {
  222. return nil, err
  223. }
  224. chunkSize = chunkSize * 1024 * 1024
  225. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  226. err = storage.setupRoot()
  227. if err != nil {
  228. return nil, err
  229. }
  230. return storage, nil
  231. }
  232. const GDriveRootConfigFile = "root_id.conf"
  233. const GDriveTokenJsonFile = "token.json"
  234. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  235. func (s *GDrive) setupRoot() error {
  236. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  237. rootId, err := ioutil.ReadFile(rootFileConfig)
  238. if err != nil && !os.IsNotExist(err) {
  239. return err
  240. }
  241. if string(rootId) != "" {
  242. s.rootId = string(rootId)
  243. return nil
  244. }
  245. dir := &drive.File{
  246. Name: s.basedir,
  247. MimeType: GDriveDirectoryMimeType,
  248. }
  249. di, err := s.service.Files.Create(dir).Fields("id").Do()
  250. if err != nil {
  251. return err
  252. }
  253. s.rootId = di.Id
  254. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  255. if err != nil {
  256. return err
  257. }
  258. return nil
  259. }
  260. func (s *GDrive) hasChecksum(f *drive.File) bool {
  261. return f.Md5Checksum != ""
  262. }
  263. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  264. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  265. }
  266. func (s *GDrive) findId(filename string, token string) (string, error) {
  267. filename = strings.Replace(filename, `'`, `\'`, -1)
  268. filename = strings.Replace(filename, `"`, `\"`, -1)
  269. fileId, tokenId, nextPageToken := "", "", ""
  270. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  271. l, err := s.list(nextPageToken, q)
  272. if err != nil {
  273. return "", err
  274. }
  275. for 0 < len(l.Files) {
  276. for _, fi := range l.Files {
  277. tokenId = fi.Id
  278. break
  279. }
  280. if l.NextPageToken == "" {
  281. break
  282. }
  283. l, err = s.list(l.NextPageToken, q)
  284. }
  285. if filename == "" {
  286. return tokenId, nil
  287. } else if tokenId == "" {
  288. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  289. }
  290. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  291. l, err = s.list(nextPageToken, q)
  292. if err != nil {
  293. return "", err
  294. }
  295. for 0 < len(l.Files) {
  296. for _, fi := range l.Files {
  297. fileId = fi.Id
  298. break
  299. }
  300. if l.NextPageToken == "" {
  301. break
  302. }
  303. l, err = s.list(l.NextPageToken, q)
  304. }
  305. if fileId == "" {
  306. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  307. }
  308. return fileId, nil
  309. }
  310. func (s *GDrive) Type() string {
  311. return "gdrive"
  312. }
  313. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  314. var fileId string
  315. fileId, err = s.findId(filename, token)
  316. if err != nil {
  317. return
  318. }
  319. var fi *drive.File
  320. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  321. return
  322. }
  323. contentLength = uint64(fi.Size)
  324. contentType = fi.MimeType
  325. return
  326. }
  327. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  328. var fileId string
  329. fileId, err = s.findId(filename, token)
  330. if err != nil {
  331. return
  332. }
  333. var fi *drive.File
  334. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  335. if !s.hasChecksum(fi) {
  336. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  337. return
  338. }
  339. contentLength = uint64(fi.Size)
  340. contentType = fi.MimeType
  341. ctx := context.Background()
  342. var res *http.Response
  343. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  344. if err != nil {
  345. return
  346. }
  347. reader = res.Body
  348. return
  349. }
  350. func (s *GDrive) Delete(token string, filename string) (err error) {
  351. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  352. s.service.Files.Delete(metadata).Do()
  353. var fileId string
  354. fileId, err = s.findId(filename, token)
  355. if err != nil {
  356. return
  357. }
  358. err = s.service.Files.Delete(fileId).Do()
  359. return
  360. }
  361. func (s *GDrive) IsNotExist(err error) bool {
  362. if err != nil {
  363. if e, ok := err.(*googleapi.Error); ok {
  364. return e.Code == http.StatusNotFound
  365. }
  366. }
  367. return false
  368. }
  369. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  370. dirId, err := s.findId("", token)
  371. if err != nil {
  372. return err
  373. }
  374. if dirId == "" {
  375. dir := &drive.File{
  376. Name: token,
  377. Parents: []string{s.rootId},
  378. MimeType: GDriveDirectoryMimeType,
  379. }
  380. di, err := s.service.Files.Create(dir).Fields("id").Do()
  381. if err != nil {
  382. return err
  383. }
  384. dirId = di.Id
  385. }
  386. // Instantiate empty drive file
  387. dst := &drive.File{
  388. Name: filename,
  389. Parents: []string{dirId},
  390. MimeType: contentType,
  391. }
  392. ctx := context.Background()
  393. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  394. if err != nil {
  395. return err
  396. }
  397. return nil
  398. }
  399. // Retrieve a token, saves the token, then returns the generated client.
  400. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  401. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  402. tok, err := gDriveTokenFromFile(tokenFile)
  403. if err != nil {
  404. tok = getGDriveTokenFromWeb(config, logger)
  405. saveGDriveToken(tokenFile, tok, logger)
  406. }
  407. return config.Client(context.Background(), tok)
  408. }
  409. // Request a token from the web, then returns the retrieved token.
  410. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  411. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  412. fmt.Printf("Go to the following link in your browser then type the "+
  413. "authorization code: \n%v\n", authURL)
  414. var authCode string
  415. if _, err := fmt.Scan(&authCode); err != nil {
  416. logger.Fatalf("Unable to read authorization code %v", err)
  417. }
  418. tok, err := config.Exchange(context.TODO(), authCode)
  419. if err != nil {
  420. logger.Fatalf("Unable to retrieve token from web %v", err)
  421. }
  422. return tok
  423. }
  424. // Retrieves a token from a local file.
  425. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  426. f, err := os.Open(file)
  427. defer f.Close()
  428. if err != nil {
  429. return nil, err
  430. }
  431. tok := &oauth2.Token{}
  432. err = json.NewDecoder(f).Decode(tok)
  433. return tok, err
  434. }
  435. // Saves a token to a file path.
  436. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  437. logger.Printf("Saving credential file to: %s\n", path)
  438. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  439. defer f.Close()
  440. if err != nil {
  441. logger.Fatalf("Unable to cache oauth token: %v", err)
  442. }
  443. json.NewEncoder(f).Encode(token)
  444. }
  445. type StorjStorage struct {
  446. Storage
  447. project *uplink.Project
  448. bucket *uplink.Bucket
  449. logger *log.Logger
  450. }
  451. func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, error) {
  452. var instance StorjStorage
  453. var err error
  454. ctx := context.TODO()
  455. parsedAccess, err := uplink.ParseAccess(access)
  456. if err != nil {
  457. return nil, err
  458. }
  459. instance.project, err = uplink.OpenProject(ctx, parsedAccess)
  460. if err != nil {
  461. return nil, err
  462. }
  463. instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
  464. if err != nil {
  465. //Ignoring the error to return the one that occurred first, but try to clean up.
  466. _ = instance.project.Close()
  467. return nil, err
  468. }
  469. instance.logger = logger
  470. return &instance, nil
  471. }
  472. func (s *StorjStorage) Type() string {
  473. return "storj"
  474. }
  475. func (s *StorjStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  476. key := storj.JoinPaths(token, filename)
  477. ctx := context.TODO()
  478. obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
  479. if err != nil {
  480. return "", 0, err
  481. }
  482. customMeta := obj.Custom.Clone()
  483. contentType = customMeta["contentType"]
  484. contentLength = uint64(obj.System.ContentLength)
  485. return
  486. }
  487. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  488. key := storj.JoinPaths(token, filename)
  489. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  490. ctx := context.TODO()
  491. download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
  492. if err != nil {
  493. return nil, "", 0, err
  494. }
  495. customMeta := download.Info().Custom.Clone()
  496. contentType = customMeta["contentType"]
  497. contentLength = uint64(download.Info().System.ContentLength)
  498. reader = download
  499. return
  500. }
  501. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  502. key := storj.JoinPaths(token, filename)
  503. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  504. ctx := context.TODO()
  505. _, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
  506. return
  507. }
  508. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  509. key := storj.JoinPaths(token, filename)
  510. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  511. ctx := context.TODO()
  512. writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, &uplink.UploadOptions{})
  513. if err != nil {
  514. return err
  515. }
  516. _, err = io.Copy(writer, reader)
  517. if err != nil {
  518. //Ignoring the error to return the one that occurred first, but try to clean up.
  519. _ = writer.Abort()
  520. return err
  521. }
  522. err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"ContentType": contentType})
  523. if err != nil {
  524. //Ignoring the error to return the one that occurred first, but try to clean up.
  525. _ = writer.Abort()
  526. return err
  527. }
  528. err = writer.Commit()
  529. return err
  530. }
  531. func (s *StorjStorage) IsNotExist(err error) bool {
  532. return uplink.ErrObjectNotFound.Has(err)
  533. }