Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 

669 рядки
15 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/awserr"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  17. "github.com/zeebo/errs"
  18. "golang.org/x/net/context"
  19. "golang.org/x/oauth2"
  20. "golang.org/x/oauth2/google"
  21. "google.golang.org/api/drive/v3"
  22. "google.golang.org/api/googleapi"
  23. "storj.io/common/storj"
  24. "storj.io/uplink"
  25. )
  26. type Storage interface {
  27. Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
  28. Head(token string, filename string) (contentLength uint64, err error)
  29. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  30. Delete(token string, filename string) error
  31. IsNotExist(err error) bool
  32. Type() string
  33. }
  34. type LocalStorage struct {
  35. Storage
  36. basedir string
  37. logger *log.Logger
  38. }
  39. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  40. return &LocalStorage{basedir: basedir, logger: logger}, nil
  41. }
  42. func (s *LocalStorage) Type() string {
  43. return "local"
  44. }
  45. func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
  46. path := filepath.Join(s.basedir, token, filename)
  47. var fi os.FileInfo
  48. if fi, err = os.Lstat(path); err != nil {
  49. return
  50. }
  51. contentLength = uint64(fi.Size())
  52. return
  53. }
  54. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  55. path := filepath.Join(s.basedir, token, filename)
  56. // content type , content length
  57. if reader, err = os.Open(path); err != nil {
  58. return
  59. }
  60. var fi os.FileInfo
  61. if fi, err = os.Lstat(path); err != nil {
  62. return
  63. }
  64. contentLength = uint64(fi.Size())
  65. return
  66. }
  67. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  68. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  69. os.Remove(metadata)
  70. path := filepath.Join(s.basedir, token, filename)
  71. err = os.Remove(path)
  72. return
  73. }
  74. func (s *LocalStorage) IsNotExist(err error) bool {
  75. if err == nil {
  76. return false
  77. }
  78. return os.IsNotExist(err)
  79. }
  80. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  81. var f io.WriteCloser
  82. var err error
  83. path := filepath.Join(s.basedir, token)
  84. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  85. return err
  86. }
  87. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  88. return err
  89. }
  90. defer f.Close()
  91. if _, err = io.Copy(f, reader); err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. type S3Storage struct {
  97. Storage
  98. bucket string
  99. session *session.Session
  100. s3 *s3.S3
  101. logger *log.Logger
  102. noMultipart bool
  103. }
  104. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  105. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  106. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  107. }
  108. func (s *S3Storage) Type() string {
  109. return "s3"
  110. }
  111. func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
  112. key := fmt.Sprintf("%s/%s", token, filename)
  113. headRequest := &s3.HeadObjectInput{
  114. Bucket: aws.String(s.bucket),
  115. Key: aws.String(key),
  116. }
  117. // content type , content length
  118. response, err := s.s3.HeadObject(headRequest)
  119. if err != nil {
  120. return
  121. }
  122. if response.ContentLength != nil {
  123. contentLength = uint64(*response.ContentLength)
  124. }
  125. return
  126. }
  127. func (s *S3Storage) IsNotExist(err error) bool {
  128. if err == nil {
  129. return false
  130. }
  131. if aerr, ok := err.(awserr.Error); ok {
  132. switch aerr.Code() {
  133. case s3.ErrCodeNoSuchKey:
  134. return true
  135. }
  136. }
  137. return false
  138. }
  139. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  140. key := fmt.Sprintf("%s/%s", token, filename)
  141. getRequest := &s3.GetObjectInput{
  142. Bucket: aws.String(s.bucket),
  143. Key: aws.String(key),
  144. }
  145. response, err := s.s3.GetObject(getRequest)
  146. if err != nil {
  147. return
  148. }
  149. if response.ContentLength != nil {
  150. contentLength = uint64(*response.ContentLength)
  151. }
  152. reader = response.Body
  153. return
  154. }
  155. func (s *S3Storage) Delete(token string, filename string) (err error) {
  156. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  157. deleteRequest := &s3.DeleteObjectInput{
  158. Bucket: aws.String(s.bucket),
  159. Key: aws.String(metadata),
  160. }
  161. _, err = s.s3.DeleteObject(deleteRequest)
  162. if err != nil {
  163. return
  164. }
  165. key := fmt.Sprintf("%s/%s", token, filename)
  166. deleteRequest = &s3.DeleteObjectInput{
  167. Bucket: aws.String(s.bucket),
  168. Key: aws.String(key),
  169. }
  170. _, err = s.s3.DeleteObject(deleteRequest)
  171. return
  172. }
  173. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  174. key := fmt.Sprintf("%s/%s", token, filename)
  175. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  176. var concurrency int
  177. if !s.noMultipart {
  178. concurrency = 20
  179. } else {
  180. concurrency = 1
  181. }
  182. // Create an uploader with the session and custom options
  183. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  184. u.Concurrency = concurrency // default is 5
  185. u.LeavePartsOnError = false
  186. })
  187. _, err = uploader.Upload(&s3manager.UploadInput{
  188. Bucket: aws.String(s.bucket),
  189. Key: aws.String(key),
  190. Body: reader,
  191. })
  192. return
  193. }
  194. type GDrive struct {
  195. service *drive.Service
  196. rootId string
  197. basedir string
  198. localConfigPath string
  199. chunkSize int
  200. logger *log.Logger
  201. }
  202. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  203. b, err := ioutil.ReadFile(clientJsonFilepath)
  204. if err != nil {
  205. return nil, err
  206. }
  207. // If modifying these scopes, delete your previously saved client_secret.json.
  208. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  209. if err != nil {
  210. return nil, err
  211. }
  212. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  213. if err != nil {
  214. return nil, err
  215. }
  216. chunkSize = chunkSize * 1024 * 1024
  217. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  218. err = storage.setupRoot()
  219. if err != nil {
  220. return nil, err
  221. }
  222. return storage, nil
  223. }
  224. const GDriveRootConfigFile = "root_id.conf"
  225. const GDriveTokenJsonFile = "token.json"
  226. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  227. func (s *GDrive) setupRoot() error {
  228. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  229. rootId, err := ioutil.ReadFile(rootFileConfig)
  230. if err != nil && !os.IsNotExist(err) {
  231. return err
  232. }
  233. if string(rootId) != "" {
  234. s.rootId = string(rootId)
  235. return nil
  236. }
  237. dir := &drive.File{
  238. Name: s.basedir,
  239. MimeType: GDriveDirectoryMimeType,
  240. }
  241. di, err := s.service.Files.Create(dir).Fields("id").Do()
  242. if err != nil {
  243. return err
  244. }
  245. s.rootId = di.Id
  246. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  247. if err != nil {
  248. return err
  249. }
  250. return nil
  251. }
  252. func (s *GDrive) hasChecksum(f *drive.File) bool {
  253. return f.Md5Checksum != ""
  254. }
  255. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  256. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  257. }
  258. func (s *GDrive) findId(filename string, token string) (string, error) {
  259. filename = strings.Replace(filename, `'`, `\'`, -1)
  260. filename = strings.Replace(filename, `"`, `\"`, -1)
  261. fileId, tokenId, nextPageToken := "", "", ""
  262. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  263. l, err := s.list(nextPageToken, q)
  264. if err != nil {
  265. return "", err
  266. }
  267. for 0 < len(l.Files) {
  268. for _, fi := range l.Files {
  269. tokenId = fi.Id
  270. break
  271. }
  272. if l.NextPageToken == "" {
  273. break
  274. }
  275. l, err = s.list(l.NextPageToken, q)
  276. }
  277. if filename == "" {
  278. return tokenId, nil
  279. } else if tokenId == "" {
  280. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  281. }
  282. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  283. l, err = s.list(nextPageToken, q)
  284. if err != nil {
  285. return "", err
  286. }
  287. for 0 < len(l.Files) {
  288. for _, fi := range l.Files {
  289. fileId = fi.Id
  290. break
  291. }
  292. if l.NextPageToken == "" {
  293. break
  294. }
  295. l, err = s.list(l.NextPageToken, q)
  296. }
  297. if fileId == "" {
  298. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  299. }
  300. return fileId, nil
  301. }
  302. func (s *GDrive) Type() string {
  303. return "gdrive"
  304. }
  305. func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
  306. var fileId string
  307. fileId, err = s.findId(filename, token)
  308. if err != nil {
  309. return
  310. }
  311. var fi *drive.File
  312. if fi, err = s.service.Files.Get(fileId).Fields("size").Do(); err != nil {
  313. return
  314. }
  315. contentLength = uint64(fi.Size)
  316. return
  317. }
  318. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  319. var fileId string
  320. fileId, err = s.findId(filename, token)
  321. if err != nil {
  322. return
  323. }
  324. var fi *drive.File
  325. fi, err = s.service.Files.Get(fileId).Fields("size", "md5Checksum").Do()
  326. if !s.hasChecksum(fi) {
  327. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  328. return
  329. }
  330. contentLength = uint64(fi.Size)
  331. ctx := context.Background()
  332. var res *http.Response
  333. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  334. if err != nil {
  335. return
  336. }
  337. reader = res.Body
  338. return
  339. }
  340. func (s *GDrive) Delete(token string, filename string) (err error) {
  341. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  342. s.service.Files.Delete(metadata).Do()
  343. var fileId string
  344. fileId, err = s.findId(filename, token)
  345. if err != nil {
  346. return
  347. }
  348. err = s.service.Files.Delete(fileId).Do()
  349. return
  350. }
  351. func (s *GDrive) IsNotExist(err error) bool {
  352. if err != nil {
  353. if e, ok := err.(*googleapi.Error); ok {
  354. return e.Code == http.StatusNotFound
  355. }
  356. }
  357. return false
  358. }
  359. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  360. dirId, err := s.findId("", token)
  361. if err != nil {
  362. return err
  363. }
  364. if dirId == "" {
  365. dir := &drive.File{
  366. Name: token,
  367. Parents: []string{s.rootId},
  368. MimeType: GDriveDirectoryMimeType,
  369. }
  370. di, err := s.service.Files.Create(dir).Fields("id").Do()
  371. if err != nil {
  372. return err
  373. }
  374. dirId = di.Id
  375. }
  376. // Instantiate empty drive file
  377. dst := &drive.File{
  378. Name: filename,
  379. Parents: []string{dirId},
  380. MimeType: contentType,
  381. }
  382. ctx := context.Background()
  383. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  384. if err != nil {
  385. return err
  386. }
  387. return nil
  388. }
  389. // Retrieve a token, saves the token, then returns the generated client.
  390. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  391. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  392. tok, err := gDriveTokenFromFile(tokenFile)
  393. if err != nil {
  394. tok = getGDriveTokenFromWeb(config, logger)
  395. saveGDriveToken(tokenFile, tok, logger)
  396. }
  397. return config.Client(context.Background(), tok)
  398. }
  399. // Request a token from the web, then returns the retrieved token.
  400. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  401. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  402. fmt.Printf("Go to the following link in your browser then type the "+
  403. "authorization code: \n%v\n", authURL)
  404. var authCode string
  405. if _, err := fmt.Scan(&authCode); err != nil {
  406. logger.Fatalf("Unable to read authorization code %v", err)
  407. }
  408. tok, err := config.Exchange(context.TODO(), authCode)
  409. if err != nil {
  410. logger.Fatalf("Unable to retrieve token from web %v", err)
  411. }
  412. return tok
  413. }
  414. // Retrieves a token from a local file.
  415. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  416. f, err := os.Open(file)
  417. defer f.Close()
  418. if err != nil {
  419. return nil, err
  420. }
  421. tok := &oauth2.Token{}
  422. err = json.NewDecoder(f).Decode(tok)
  423. return tok, err
  424. }
  425. // Saves a token to a file path.
  426. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  427. logger.Printf("Saving credential file to: %s\n", path)
  428. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  429. defer f.Close()
  430. if err != nil {
  431. logger.Fatalf("Unable to cache oauth token: %v", err)
  432. }
  433. json.NewEncoder(f).Encode(token)
  434. }
  435. type StorjStorage struct {
  436. Storage
  437. project *uplink.Project
  438. bucket *uplink.Bucket
  439. logger *log.Logger
  440. }
  441. func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, error) {
  442. var instance StorjStorage
  443. var err error
  444. ctx := context.TODO()
  445. parsedAccess, err := uplink.ParseAccess(access)
  446. if err != nil {
  447. return nil, err
  448. }
  449. instance.project, err = uplink.OpenProject(ctx, parsedAccess)
  450. if err != nil {
  451. return nil, err
  452. }
  453. instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
  454. if err != nil {
  455. //Ignoring the error to return the one that occurred first, but try to clean up.
  456. _ = instance.project.Close()
  457. return nil, err
  458. }
  459. instance.logger = logger
  460. return &instance, nil
  461. }
  462. func (s *StorjStorage) Type() string {
  463. return "storj"
  464. }
  465. func (s *StorjStorage) Head(token string, filename string) (contentLength uint64, err error) {
  466. key := storj.JoinPaths(token, filename)
  467. ctx := context.TODO()
  468. obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
  469. if err != nil {
  470. return 0, err
  471. }
  472. contentLength = uint64(obj.System.ContentLength)
  473. return
  474. }
  475. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  476. key := storj.JoinPaths(token, filename)
  477. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  478. ctx := context.TODO()
  479. download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
  480. if err != nil {
  481. return nil, 0, err
  482. }
  483. contentLength = uint64(download.Info().System.ContentLength)
  484. reader = download
  485. return
  486. }
  487. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  488. key := storj.JoinPaths(token, filename)
  489. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  490. ctx := context.TODO()
  491. _, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
  492. return
  493. }
  494. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  495. key := storj.JoinPaths(token, filename)
  496. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  497. ctx := context.TODO()
  498. writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, nil)
  499. if err != nil {
  500. return err
  501. }
  502. n, err := io.Copy(writer, reader)
  503. if err != nil || uint64(n) != contentLength {
  504. //Ignoring the error to return the one that occurred first, but try to clean up.
  505. _ = writer.Abort()
  506. return err
  507. }
  508. err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
  509. if err != nil {
  510. //Ignoring the error to return the one that occurred first, but try to clean up.
  511. _ = writer.Abort()
  512. return err
  513. }
  514. err = writer.Commit()
  515. return err
  516. }
  517. func (s *StorjStorage) IsNotExist(err error) bool {
  518. return errs.Is(err, uplink.ErrObjectNotFound)
  519. }