You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

670 regels
15 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/awserr"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  17. "golang.org/x/net/context"
  18. "golang.org/x/oauth2"
  19. "golang.org/x/oauth2/google"
  20. "google.golang.org/api/drive/v3"
  21. "google.golang.org/api/googleapi"
  22. "storj.io/common/storj"
  23. "storj.io/uplink"
  24. )
  25. type Storage interface {
  26. Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
  27. Head(token string, filename string) (contentLength uint64, err error)
  28. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  29. Delete(token string, filename string) error
  30. IsNotExist(err error) bool
  31. Type() string
  32. }
  33. type LocalStorage struct {
  34. Storage
  35. basedir string
  36. logger *log.Logger
  37. }
  38. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  39. return &LocalStorage{basedir: basedir, logger: logger}, nil
  40. }
  41. func (s *LocalStorage) Type() string {
  42. return "local"
  43. }
  44. func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
  45. path := filepath.Join(s.basedir, token, filename)
  46. var fi os.FileInfo
  47. if fi, err = os.Lstat(path); err != nil {
  48. return
  49. }
  50. contentLength = uint64(fi.Size())
  51. return
  52. }
  53. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  54. path := filepath.Join(s.basedir, token, filename)
  55. // content type , content length
  56. if reader, err = os.Open(path); err != nil {
  57. return
  58. }
  59. var fi os.FileInfo
  60. if fi, err = os.Lstat(path); err != nil {
  61. return
  62. }
  63. contentLength = uint64(fi.Size())
  64. return
  65. }
  66. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  67. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  68. os.Remove(metadata)
  69. path := filepath.Join(s.basedir, token, filename)
  70. err = os.Remove(path)
  71. return
  72. }
  73. func (s *LocalStorage) IsNotExist(err error) bool {
  74. if err == nil {
  75. return false
  76. }
  77. return os.IsNotExist(err)
  78. }
  79. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  80. var f io.WriteCloser
  81. var err error
  82. path := filepath.Join(s.basedir, token)
  83. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  84. return err
  85. }
  86. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  87. return err
  88. }
  89. defer f.Close()
  90. if _, err = io.Copy(f, reader); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. type S3Storage struct {
  96. Storage
  97. bucket string
  98. session *session.Session
  99. s3 *s3.S3
  100. logger *log.Logger
  101. noMultipart bool
  102. }
  103. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  104. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  105. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  106. }
  107. func (s *S3Storage) Type() string {
  108. return "s3"
  109. }
  110. func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
  111. key := fmt.Sprintf("%s/%s", token, filename)
  112. headRequest := &s3.HeadObjectInput{
  113. Bucket: aws.String(s.bucket),
  114. Key: aws.String(key),
  115. }
  116. // content type , content length
  117. response, err := s.s3.HeadObject(headRequest)
  118. if err != nil {
  119. return
  120. }
  121. if response.ContentLength != nil {
  122. contentLength = uint64(*response.ContentLength)
  123. }
  124. return
  125. }
  126. func (s *S3Storage) IsNotExist(err error) bool {
  127. if err == nil {
  128. return false
  129. }
  130. if aerr, ok := err.(awserr.Error); ok {
  131. switch aerr.Code() {
  132. case s3.ErrCodeNoSuchKey:
  133. return true
  134. }
  135. }
  136. return false
  137. }
  138. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  139. key := fmt.Sprintf("%s/%s", token, filename)
  140. getRequest := &s3.GetObjectInput{
  141. Bucket: aws.String(s.bucket),
  142. Key: aws.String(key),
  143. }
  144. response, err := s.s3.GetObject(getRequest)
  145. if err != nil {
  146. return
  147. }
  148. if response.ContentLength != nil {
  149. contentLength = uint64(*response.ContentLength)
  150. }
  151. reader = response.Body
  152. return
  153. }
  154. func (s *S3Storage) Delete(token string, filename string) (err error) {
  155. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  156. deleteRequest := &s3.DeleteObjectInput{
  157. Bucket: aws.String(s.bucket),
  158. Key: aws.String(metadata),
  159. }
  160. _, err = s.s3.DeleteObject(deleteRequest)
  161. if err != nil {
  162. return
  163. }
  164. key := fmt.Sprintf("%s/%s", token, filename)
  165. deleteRequest = &s3.DeleteObjectInput{
  166. Bucket: aws.String(s.bucket),
  167. Key: aws.String(key),
  168. }
  169. _, err = s.s3.DeleteObject(deleteRequest)
  170. return
  171. }
  172. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  173. key := fmt.Sprintf("%s/%s", token, filename)
  174. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  175. var concurrency int
  176. if !s.noMultipart {
  177. concurrency = 20
  178. } else {
  179. concurrency = 1
  180. }
  181. // Create an uploader with the session and custom options
  182. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  183. u.Concurrency = concurrency // default is 5
  184. u.LeavePartsOnError = false
  185. })
  186. _, err = uploader.Upload(&s3manager.UploadInput{
  187. Bucket: aws.String(s.bucket),
  188. Key: aws.String(key),
  189. Body: reader,
  190. })
  191. return
  192. }
  193. type GDrive struct {
  194. service *drive.Service
  195. rootId string
  196. basedir string
  197. localConfigPath string
  198. chunkSize int
  199. logger *log.Logger
  200. }
  201. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  202. b, err := ioutil.ReadFile(clientJsonFilepath)
  203. if err != nil {
  204. return nil, err
  205. }
  206. // If modifying these scopes, delete your previously saved client_secret.json.
  207. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  208. if err != nil {
  209. return nil, err
  210. }
  211. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  212. if err != nil {
  213. return nil, err
  214. }
  215. chunkSize = chunkSize * 1024 * 1024
  216. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  217. err = storage.setupRoot()
  218. if err != nil {
  219. return nil, err
  220. }
  221. return storage, nil
  222. }
  223. const GDriveRootConfigFile = "root_id.conf"
  224. const GDriveTokenJsonFile = "token.json"
  225. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  226. func (s *GDrive) setupRoot() error {
  227. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  228. rootId, err := ioutil.ReadFile(rootFileConfig)
  229. if err != nil && !os.IsNotExist(err) {
  230. return err
  231. }
  232. if string(rootId) != "" {
  233. s.rootId = string(rootId)
  234. return nil
  235. }
  236. dir := &drive.File{
  237. Name: s.basedir,
  238. MimeType: GDriveDirectoryMimeType,
  239. }
  240. di, err := s.service.Files.Create(dir).Fields("id").Do()
  241. if err != nil {
  242. return err
  243. }
  244. s.rootId = di.Id
  245. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  246. if err != nil {
  247. return err
  248. }
  249. return nil
  250. }
  251. func (s *GDrive) hasChecksum(f *drive.File) bool {
  252. return f.Md5Checksum != ""
  253. }
  254. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  255. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  256. }
  257. func (s *GDrive) findId(filename string, token string) (string, error) {
  258. filename = strings.Replace(filename, `'`, `\'`, -1)
  259. filename = strings.Replace(filename, `"`, `\"`, -1)
  260. fileId, tokenId, nextPageToken := "", "", ""
  261. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  262. l, err := s.list(nextPageToken, q)
  263. if err != nil {
  264. return "", err
  265. }
  266. for 0 < len(l.Files) {
  267. for _, fi := range l.Files {
  268. tokenId = fi.Id
  269. break
  270. }
  271. if l.NextPageToken == "" {
  272. break
  273. }
  274. l, err = s.list(l.NextPageToken, q)
  275. }
  276. if filename == "" {
  277. return tokenId, nil
  278. } else if tokenId == "" {
  279. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  280. }
  281. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  282. l, err = s.list(nextPageToken, q)
  283. if err != nil {
  284. return "", err
  285. }
  286. for 0 < len(l.Files) {
  287. for _, fi := range l.Files {
  288. fileId = fi.Id
  289. break
  290. }
  291. if l.NextPageToken == "" {
  292. break
  293. }
  294. l, err = s.list(l.NextPageToken, q)
  295. }
  296. if fileId == "" {
  297. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  298. }
  299. return fileId, nil
  300. }
  301. func (s *GDrive) Type() string {
  302. return "gdrive"
  303. }
  304. func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
  305. var fileId string
  306. fileId, err = s.findId(filename, token)
  307. if err != nil {
  308. return
  309. }
  310. var fi *drive.File
  311. if fi, err = s.service.Files.Get(fileId).Fields("size").Do(); err != nil {
  312. return
  313. }
  314. contentLength = uint64(fi.Size)
  315. return
  316. }
  317. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  318. var fileId string
  319. fileId, err = s.findId(filename, token)
  320. if err != nil {
  321. return
  322. }
  323. var fi *drive.File
  324. fi, err = s.service.Files.Get(fileId).Fields("size", "md5Checksum").Do()
  325. if !s.hasChecksum(fi) {
  326. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  327. return
  328. }
  329. contentLength = uint64(fi.Size)
  330. ctx := context.Background()
  331. var res *http.Response
  332. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  333. if err != nil {
  334. return
  335. }
  336. reader = res.Body
  337. return
  338. }
  339. func (s *GDrive) Delete(token string, filename string) (err error) {
  340. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  341. s.service.Files.Delete(metadata).Do()
  342. var fileId string
  343. fileId, err = s.findId(filename, token)
  344. if err != nil {
  345. return
  346. }
  347. err = s.service.Files.Delete(fileId).Do()
  348. return
  349. }
  350. func (s *GDrive) IsNotExist(err error) bool {
  351. if err != nil {
  352. if e, ok := err.(*googleapi.Error); ok {
  353. return e.Code == http.StatusNotFound
  354. }
  355. }
  356. return false
  357. }
  358. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  359. dirId, err := s.findId("", token)
  360. if err != nil {
  361. return err
  362. }
  363. if dirId == "" {
  364. dir := &drive.File{
  365. Name: token,
  366. Parents: []string{s.rootId},
  367. MimeType: GDriveDirectoryMimeType,
  368. }
  369. di, err := s.service.Files.Create(dir).Fields("id").Do()
  370. if err != nil {
  371. return err
  372. }
  373. dirId = di.Id
  374. }
  375. // Instantiate empty drive file
  376. dst := &drive.File{
  377. Name: filename,
  378. Parents: []string{dirId},
  379. MimeType: contentType,
  380. }
  381. ctx := context.Background()
  382. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  383. if err != nil {
  384. return err
  385. }
  386. return nil
  387. }
  388. // Retrieve a token, saves the token, then returns the generated client.
  389. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  390. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  391. tok, err := gDriveTokenFromFile(tokenFile)
  392. if err != nil {
  393. tok = getGDriveTokenFromWeb(config, logger)
  394. saveGDriveToken(tokenFile, tok, logger)
  395. }
  396. return config.Client(context.Background(), tok)
  397. }
  398. // Request a token from the web, then returns the retrieved token.
  399. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  400. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  401. fmt.Printf("Go to the following link in your browser then type the "+
  402. "authorization code: \n%v\n", authURL)
  403. var authCode string
  404. if _, err := fmt.Scan(&authCode); err != nil {
  405. logger.Fatalf("Unable to read authorization code %v", err)
  406. }
  407. tok, err := config.Exchange(context.TODO(), authCode)
  408. if err != nil {
  409. logger.Fatalf("Unable to retrieve token from web %v", err)
  410. }
  411. return tok
  412. }
  413. // Retrieves a token from a local file.
  414. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  415. f, err := os.Open(file)
  416. defer f.Close()
  417. if err != nil {
  418. return nil, err
  419. }
  420. tok := &oauth2.Token{}
  421. err = json.NewDecoder(f).Decode(tok)
  422. return tok, err
  423. }
  424. // Saves a token to a file path.
  425. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  426. logger.Printf("Saving credential file to: %s\n", path)
  427. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  428. defer f.Close()
  429. if err != nil {
  430. logger.Fatalf("Unable to cache oauth token: %v", err)
  431. }
  432. json.NewEncoder(f).Encode(token)
  433. }
  434. type StorjStorage struct {
  435. Storage
  436. project *uplink.Project
  437. bucket *uplink.Bucket
  438. logger *log.Logger
  439. }
  440. func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, error) {
  441. var instance StorjStorage
  442. var err error
  443. ctx := context.TODO()
  444. parsedAccess, err := uplink.ParseAccess(access)
  445. if err != nil {
  446. return nil, err
  447. }
  448. instance.project, err = uplink.OpenProject(ctx, parsedAccess)
  449. if err != nil {
  450. return nil, err
  451. }
  452. instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
  453. if err != nil {
  454. //Ignoring the error to return the one that occurred first, but try to clean up.
  455. _ = instance.project.Close()
  456. return nil, err
  457. }
  458. instance.logger = logger
  459. return &instance, nil
  460. }
  461. func (s *StorjStorage) Type() string {
  462. return "storj"
  463. }
  464. func (s *StorjStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  465. key := storj.JoinPaths(token, filename)
  466. ctx := context.TODO()
  467. obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
  468. if err != nil {
  469. return "", 0, err
  470. }
  471. contentType = obj.Custom["content-type"]
  472. contentLength = uint64(obj.System.ContentLength)
  473. return
  474. }
  475. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  476. key := storj.JoinPaths(token, filename)
  477. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  478. ctx := context.TODO()
  479. download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
  480. if err != nil {
  481. return nil, "", 0, err
  482. }
  483. contentType = download.Info().Custom["content-type"]
  484. contentLength = uint64(download.Info().System.ContentLength)
  485. reader = download
  486. return
  487. }
  488. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  489. key := storj.JoinPaths(token, filename)
  490. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  491. ctx := context.TODO()
  492. _, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
  493. return
  494. }
  495. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  496. key := storj.JoinPaths(token, filename)
  497. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  498. ctx := context.TODO()
  499. writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, nil)
  500. if err != nil {
  501. return err
  502. }
  503. n, err := io.Copy(writer, reader)
  504. if err != nil || uint64(n) != contentLength {
  505. //Ignoring the error to return the one that occurred first, but try to clean up.
  506. _ = writer.Abort()
  507. return err
  508. }
  509. err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
  510. if err != nil {
  511. //Ignoring the error to return the one that occurred first, but try to clean up.
  512. _ = writer.Abort()
  513. return err
  514. }
  515. err = writer.Commit()
  516. return err
  517. }
  518. func (s *StorjStorage) IsNotExist(err error) bool {
  519. return uplink.ErrObjectNotFound.Has(err)
  520. }