You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

548 lines
12 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/awserr"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  17. "golang.org/x/net/context"
  18. "golang.org/x/oauth2"
  19. "golang.org/x/oauth2/google"
  20. "google.golang.org/api/drive/v3"
  21. "google.golang.org/api/googleapi"
  22. )
  23. type Storage interface {
  24. Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
  25. Head(token string, filename string) (contentLength uint64, err error)
  26. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  27. Delete(token string, filename string) error
  28. IsNotExist(err error) bool
  29. Type() string
  30. }
  31. type LocalStorage struct {
  32. Storage
  33. basedir string
  34. logger *log.Logger
  35. }
  36. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  37. return &LocalStorage{basedir: basedir, logger: logger}, nil
  38. }
  39. func (s *LocalStorage) Type() string {
  40. return "local"
  41. }
  42. func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
  43. path := filepath.Join(s.basedir, token, filename)
  44. var fi os.FileInfo
  45. if fi, err = os.Lstat(path); err != nil {
  46. return
  47. }
  48. contentLength = uint64(fi.Size())
  49. return
  50. }
  51. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  52. path := filepath.Join(s.basedir, token, filename)
  53. // content type , content length
  54. if reader, err = os.Open(path); err != nil {
  55. return
  56. }
  57. var fi os.FileInfo
  58. if fi, err = os.Lstat(path); err != nil {
  59. return
  60. }
  61. contentLength = uint64(fi.Size())
  62. return
  63. }
  64. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  65. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  66. os.Remove(metadata)
  67. path := filepath.Join(s.basedir, token, filename)
  68. err = os.Remove(path)
  69. return
  70. }
  71. func (s *LocalStorage) IsNotExist(err error) bool {
  72. if err == nil {
  73. return false
  74. }
  75. return os.IsNotExist(err)
  76. }
  77. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  78. var f io.WriteCloser
  79. var err error
  80. path := filepath.Join(s.basedir, token)
  81. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  82. return err
  83. }
  84. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  85. return err
  86. }
  87. defer f.Close()
  88. if _, err = io.Copy(f, reader); err != nil {
  89. return err
  90. }
  91. return nil
  92. }
  93. type S3Storage struct {
  94. Storage
  95. bucket string
  96. session *session.Session
  97. s3 *s3.S3
  98. logger *log.Logger
  99. noMultipart bool
  100. }
  101. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  102. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  103. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  104. }
  105. func (s *S3Storage) Type() string {
  106. return "s3"
  107. }
  108. func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
  109. key := fmt.Sprintf("%s/%s", token, filename)
  110. headRequest := &s3.HeadObjectInput{
  111. Bucket: aws.String(s.bucket),
  112. Key: aws.String(key),
  113. }
  114. // content type , content length
  115. response, err := s.s3.HeadObject(headRequest)
  116. if err != nil {
  117. return
  118. }
  119. if response.ContentLength != nil {
  120. contentLength = uint64(*response.ContentLength)
  121. }
  122. return
  123. }
  124. func (s *S3Storage) IsNotExist(err error) bool {
  125. if err == nil {
  126. return false
  127. }
  128. if aerr, ok := err.(awserr.Error); ok {
  129. switch aerr.Code() {
  130. case s3.ErrCodeNoSuchKey:
  131. return true
  132. }
  133. }
  134. return false
  135. }
  136. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  137. key := fmt.Sprintf("%s/%s", token, filename)
  138. getRequest := &s3.GetObjectInput{
  139. Bucket: aws.String(s.bucket),
  140. Key: aws.String(key),
  141. }
  142. response, err := s.s3.GetObject(getRequest)
  143. if err != nil {
  144. return
  145. }
  146. if response.ContentLength != nil {
  147. contentLength = uint64(*response.ContentLength)
  148. }
  149. reader = response.Body
  150. return
  151. }
  152. func (s *S3Storage) Delete(token string, filename string) (err error) {
  153. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  154. deleteRequest := &s3.DeleteObjectInput{
  155. Bucket: aws.String(s.bucket),
  156. Key: aws.String(metadata),
  157. }
  158. _, err = s.s3.DeleteObject(deleteRequest)
  159. if err != nil {
  160. return
  161. }
  162. key := fmt.Sprintf("%s/%s", token, filename)
  163. deleteRequest = &s3.DeleteObjectInput{
  164. Bucket: aws.String(s.bucket),
  165. Key: aws.String(key),
  166. }
  167. _, err = s.s3.DeleteObject(deleteRequest)
  168. return
  169. }
  170. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  171. key := fmt.Sprintf("%s/%s", token, filename)
  172. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  173. var concurrency int
  174. if !s.noMultipart {
  175. concurrency = 20
  176. } else {
  177. concurrency = 1
  178. }
  179. // Create an uploader with the session and custom options
  180. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  181. u.Concurrency = concurrency // default is 5
  182. u.LeavePartsOnError = false
  183. })
  184. _, err = uploader.Upload(&s3manager.UploadInput{
  185. Bucket: aws.String(s.bucket),
  186. Key: aws.String(key),
  187. Body: reader,
  188. })
  189. return
  190. }
  191. type GDrive struct {
  192. service *drive.Service
  193. rootId string
  194. basedir string
  195. localConfigPath string
  196. chunkSize int
  197. logger *log.Logger
  198. }
  199. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  200. b, err := ioutil.ReadFile(clientJsonFilepath)
  201. if err != nil {
  202. return nil, err
  203. }
  204. // If modifying these scopes, delete your previously saved client_secret.json.
  205. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  206. if err != nil {
  207. return nil, err
  208. }
  209. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  210. if err != nil {
  211. return nil, err
  212. }
  213. chunkSize = chunkSize * 1024 * 1024
  214. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  215. err = storage.setupRoot()
  216. if err != nil {
  217. return nil, err
  218. }
  219. return storage, nil
  220. }
  221. const GDriveRootConfigFile = "root_id.conf"
  222. const GDriveTokenJsonFile = "token.json"
  223. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  224. func (s *GDrive) setupRoot() error {
  225. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  226. rootId, err := ioutil.ReadFile(rootFileConfig)
  227. if err != nil && !os.IsNotExist(err) {
  228. return err
  229. }
  230. if string(rootId) != "" {
  231. s.rootId = string(rootId)
  232. return nil
  233. }
  234. dir := &drive.File{
  235. Name: s.basedir,
  236. MimeType: GDriveDirectoryMimeType,
  237. }
  238. di, err := s.service.Files.Create(dir).Fields("id").Do()
  239. if err != nil {
  240. return err
  241. }
  242. s.rootId = di.Id
  243. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  244. if err != nil {
  245. return err
  246. }
  247. return nil
  248. }
  249. func (s *GDrive) hasChecksum(f *drive.File) bool {
  250. return f.Md5Checksum != ""
  251. }
  252. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  253. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  254. }
  255. func (s *GDrive) findId(filename string, token string) (string, error) {
  256. filename = strings.Replace(filename, `'`, `\'`, -1)
  257. filename = strings.Replace(filename, `"`, `\"`, -1)
  258. fileId, tokenId, nextPageToken := "", "", ""
  259. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  260. l, err := s.list(nextPageToken, q)
  261. if err != nil {
  262. return "", err
  263. }
  264. for 0 < len(l.Files) {
  265. for _, fi := range l.Files {
  266. tokenId = fi.Id
  267. break
  268. }
  269. if l.NextPageToken == "" {
  270. break
  271. }
  272. l, err = s.list(l.NextPageToken, q)
  273. }
  274. if filename == "" {
  275. return tokenId, nil
  276. } else if tokenId == "" {
  277. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  278. }
  279. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  280. l, err = s.list(nextPageToken, q)
  281. if err != nil {
  282. return "", err
  283. }
  284. for 0 < len(l.Files) {
  285. for _, fi := range l.Files {
  286. fileId = fi.Id
  287. break
  288. }
  289. if l.NextPageToken == "" {
  290. break
  291. }
  292. l, err = s.list(l.NextPageToken, q)
  293. }
  294. if fileId == "" {
  295. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  296. }
  297. return fileId, nil
  298. }
  299. func (s *GDrive) Type() string {
  300. return "gdrive"
  301. }
  302. func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
  303. var fileId string
  304. fileId, err = s.findId(filename, token)
  305. if err != nil {
  306. return
  307. }
  308. var fi *drive.File
  309. if fi, err = s.service.Files.Get(fileId).Fields("size").Do(); err != nil {
  310. return
  311. }
  312. contentLength = uint64(fi.Size)
  313. return
  314. }
  315. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  316. var fileId string
  317. fileId, err = s.findId(filename, token)
  318. if err != nil {
  319. return
  320. }
  321. var fi *drive.File
  322. fi, err = s.service.Files.Get(fileId).Fields("size", "md5Checksum").Do()
  323. if !s.hasChecksum(fi) {
  324. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  325. return
  326. }
  327. contentLength = uint64(fi.Size)
  328. ctx := context.Background()
  329. var res *http.Response
  330. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  331. if err != nil {
  332. return
  333. }
  334. reader = res.Body
  335. return
  336. }
  337. func (s *GDrive) Delete(token string, filename string) (err error) {
  338. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  339. s.service.Files.Delete(metadata).Do()
  340. var fileId string
  341. fileId, err = s.findId(filename, token)
  342. if err != nil {
  343. return
  344. }
  345. err = s.service.Files.Delete(fileId).Do()
  346. return
  347. }
  348. func (s *GDrive) IsNotExist(err error) bool {
  349. if err != nil {
  350. if e, ok := err.(*googleapi.Error); ok {
  351. return e.Code == http.StatusNotFound
  352. }
  353. }
  354. return false
  355. }
  356. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  357. dirId, err := s.findId("", token)
  358. if err != nil {
  359. return err
  360. }
  361. if dirId == "" {
  362. dir := &drive.File{
  363. Name: token,
  364. Parents: []string{s.rootId},
  365. MimeType: GDriveDirectoryMimeType,
  366. }
  367. di, err := s.service.Files.Create(dir).Fields("id").Do()
  368. if err != nil {
  369. return err
  370. }
  371. dirId = di.Id
  372. }
  373. // Instantiate empty drive file
  374. dst := &drive.File{
  375. Name: filename,
  376. Parents: []string{dirId},
  377. MimeType: contentType,
  378. }
  379. ctx := context.Background()
  380. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  381. if err != nil {
  382. return err
  383. }
  384. return nil
  385. }
  386. // Retrieve a token, saves the token, then returns the generated client.
  387. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  388. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  389. tok, err := gDriveTokenFromFile(tokenFile)
  390. if err != nil {
  391. tok = getGDriveTokenFromWeb(config, logger)
  392. saveGDriveToken(tokenFile, tok, logger)
  393. }
  394. return config.Client(context.Background(), tok)
  395. }
  396. // Request a token from the web, then returns the retrieved token.
  397. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  398. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  399. fmt.Printf("Go to the following link in your browser then type the "+
  400. "authorization code: \n%v\n", authURL)
  401. var authCode string
  402. if _, err := fmt.Scan(&authCode); err != nil {
  403. logger.Fatalf("Unable to read authorization code %v", err)
  404. }
  405. tok, err := config.Exchange(context.TODO(), authCode)
  406. if err != nil {
  407. logger.Fatalf("Unable to retrieve token from web %v", err)
  408. }
  409. return tok
  410. }
  411. // Retrieves a token from a local file.
  412. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  413. f, err := os.Open(file)
  414. defer f.Close()
  415. if err != nil {
  416. return nil, err
  417. }
  418. tok := &oauth2.Token{}
  419. err = json.NewDecoder(f).Decode(tok)
  420. return tok, err
  421. }
  422. // Saves a token to a file path.
  423. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  424. logger.Printf("Saving credential file to: %s\n", path)
  425. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  426. defer f.Close()
  427. if err != nil {
  428. logger.Fatalf("Unable to cache oauth token: %v", err)
  429. }
  430. json.NewEncoder(f).Encode(token)
  431. }