You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

566 lines
13 KiB

  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "mime"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  18. "golang.org/x/net/context"
  19. "golang.org/x/oauth2"
  20. "golang.org/x/oauth2/google"
  21. "google.golang.org/api/drive/v3"
  22. "google.golang.org/api/googleapi"
  23. )
  24. type Storage interface {
  25. Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error)
  26. Head(token string, filename string) (contentType string, contentLength uint64, err error)
  27. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  28. Delete(token string, filename string) error
  29. IsNotExist(err error) bool
  30. Type() string
  31. }
  32. type LocalStorage struct {
  33. Storage
  34. basedir string
  35. logger *log.Logger
  36. }
  37. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  38. return &LocalStorage{basedir: basedir, logger: logger}, nil
  39. }
  40. func (s *LocalStorage) Type() string {
  41. return "local"
  42. }
  43. func (s *LocalStorage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  44. path := filepath.Join(s.basedir, token, filename)
  45. var fi os.FileInfo
  46. if fi, err = os.Lstat(path); err != nil {
  47. return
  48. }
  49. contentLength = uint64(fi.Size())
  50. contentType = mime.TypeByExtension(filepath.Ext(filename))
  51. return
  52. }
  53. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  54. path := filepath.Join(s.basedir, token, filename)
  55. // content type , content length
  56. if reader, err = os.Open(path); err != nil {
  57. return
  58. }
  59. var fi os.FileInfo
  60. if fi, err = os.Lstat(path); err != nil {
  61. return
  62. }
  63. contentLength = uint64(fi.Size())
  64. contentType = mime.TypeByExtension(filepath.Ext(filename))
  65. return
  66. }
  67. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  68. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  69. os.Remove(metadata)
  70. path := filepath.Join(s.basedir, token, filename)
  71. err = os.Remove(path)
  72. return
  73. }
  74. func (s *LocalStorage) IsNotExist(err error) bool {
  75. if err == nil {
  76. return false
  77. }
  78. return os.IsNotExist(err)
  79. }
  80. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  81. var f io.WriteCloser
  82. var err error
  83. path := filepath.Join(s.basedir, token)
  84. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  85. return err
  86. }
  87. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  88. return err
  89. }
  90. defer f.Close()
  91. if _, err = io.Copy(f, reader); err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. type S3Storage struct {
  97. Storage
  98. bucket string
  99. session *session.Session
  100. s3 *s3.S3
  101. logger *log.Logger
  102. noMultipart bool
  103. }
  104. func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
  105. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  106. return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
  107. }
  108. func (s *S3Storage) Type() string {
  109. return "s3"
  110. }
  111. func (s *S3Storage) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  112. key := fmt.Sprintf("%s/%s", token, filename)
  113. headRequest := &s3.HeadObjectInput{
  114. Bucket: aws.String(s.bucket),
  115. Key: aws.String(key),
  116. }
  117. // content type , content length
  118. response, err := s.s3.HeadObject(headRequest)
  119. if err != nil {
  120. return
  121. }
  122. if response.ContentType != nil {
  123. contentType = *response.ContentType
  124. }
  125. if response.ContentLength != nil {
  126. contentLength = uint64(*response.ContentLength)
  127. }
  128. return
  129. }
  130. func (s *S3Storage) IsNotExist(err error) bool {
  131. if err == nil {
  132. return false
  133. }
  134. if aerr, ok := err.(awserr.Error); ok {
  135. switch aerr.Code() {
  136. case s3.ErrCodeNoSuchKey:
  137. return true
  138. }
  139. }
  140. return false
  141. }
  142. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  143. key := fmt.Sprintf("%s/%s", token, filename)
  144. getRequest := &s3.GetObjectInput{
  145. Bucket: aws.String(s.bucket),
  146. Key: aws.String(key),
  147. }
  148. response, err := s.s3.GetObject(getRequest)
  149. if err != nil {
  150. return
  151. }
  152. if response.ContentType != nil {
  153. contentType = *response.ContentType
  154. }
  155. if response.ContentLength != nil {
  156. contentLength = uint64(*response.ContentLength)
  157. }
  158. reader = response.Body
  159. return
  160. }
  161. func (s *S3Storage) Delete(token string, filename string) (err error) {
  162. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  163. deleteRequest := &s3.DeleteObjectInput{
  164. Bucket: aws.String(s.bucket),
  165. Key: aws.String(metadata),
  166. }
  167. _, err = s.s3.DeleteObject(deleteRequest)
  168. if err != nil {
  169. return
  170. }
  171. key := fmt.Sprintf("%s/%s", token, filename)
  172. deleteRequest = &s3.DeleteObjectInput{
  173. Bucket: aws.String(s.bucket),
  174. Key: aws.String(key),
  175. }
  176. _, err = s.s3.DeleteObject(deleteRequest)
  177. return
  178. }
  179. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  180. key := fmt.Sprintf("%s/%s", token, filename)
  181. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  182. var concurrency int
  183. if !s.noMultipart {
  184. concurrency = 20
  185. } else {
  186. concurrency = 1
  187. }
  188. // Create an uploader with the session and custom options
  189. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  190. u.PartSize = (1 << 20) * 5 // The minimum/default allowed part size is 5MB
  191. u.Concurrency = concurrency // default is 5
  192. u.MaxUploadParts = concurrency
  193. u.LeavePartsOnError = false
  194. })
  195. _, err = uploader.Upload(&s3manager.UploadInput{
  196. Bucket: aws.String(s.bucket),
  197. Key: aws.String(key),
  198. Body: reader,
  199. })
  200. return
  201. }
  202. type GDrive struct {
  203. service *drive.Service
  204. rootId string
  205. basedir string
  206. localConfigPath string
  207. chunkSize int
  208. logger *log.Logger
  209. }
  210. func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  211. b, err := ioutil.ReadFile(clientJsonFilepath)
  212. if err != nil {
  213. return nil, err
  214. }
  215. // If modifying these scopes, delete your previously saved client_secret.json.
  216. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  217. if err != nil {
  218. return nil, err
  219. }
  220. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
  221. if err != nil {
  222. return nil, err
  223. }
  224. chunkSize = chunkSize * 1024 * 1024
  225. storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  226. err = storage.setupRoot()
  227. if err != nil {
  228. return nil, err
  229. }
  230. return storage, nil
  231. }
  232. const GDriveRootConfigFile = "root_id.conf"
  233. const GDriveTokenJsonFile = "token.json"
  234. const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
  235. func (s *GDrive) setupRoot() error {
  236. rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
  237. rootId, err := ioutil.ReadFile(rootFileConfig)
  238. if err != nil && !os.IsNotExist(err) {
  239. return err
  240. }
  241. if string(rootId) != "" {
  242. s.rootId = string(rootId)
  243. return nil
  244. }
  245. dir := &drive.File{
  246. Name: s.basedir,
  247. MimeType: GDriveDirectoryMimeType,
  248. }
  249. di, err := s.service.Files.Create(dir).Fields("id").Do()
  250. if err != nil {
  251. return err
  252. }
  253. s.rootId = di.Id
  254. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
  255. if err != nil {
  256. return err
  257. }
  258. return nil
  259. }
  260. func (s *GDrive) hasChecksum(f *drive.File) bool {
  261. return f.Md5Checksum != ""
  262. }
  263. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  264. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  265. }
  266. func (s *GDrive) findId(filename string, token string) (string, error) {
  267. filename = strings.Replace(filename, `'`, `\'`, -1)
  268. filename = strings.Replace(filename, `"`, `\"`, -1)
  269. fileId, tokenId, nextPageToken := "", "", ""
  270. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
  271. l, err := s.list(nextPageToken, q)
  272. if err != nil {
  273. return "", err
  274. }
  275. for 0 < len(l.Files) {
  276. for _, fi := range l.Files {
  277. tokenId = fi.Id
  278. break
  279. }
  280. if l.NextPageToken == "" {
  281. break
  282. }
  283. l, err = s.list(l.NextPageToken, q)
  284. }
  285. if filename == "" {
  286. return tokenId, nil
  287. } else if tokenId == "" {
  288. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  289. }
  290. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
  291. l, err = s.list(nextPageToken, q)
  292. if err != nil {
  293. return "", err
  294. }
  295. for 0 < len(l.Files) {
  296. for _, fi := range l.Files {
  297. fileId = fi.Id
  298. break
  299. }
  300. if l.NextPageToken == "" {
  301. break
  302. }
  303. l, err = s.list(l.NextPageToken, q)
  304. }
  305. if fileId == "" {
  306. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  307. }
  308. return fileId, nil
  309. }
  310. func (s *GDrive) Type() string {
  311. return "gdrive"
  312. }
  313. func (s *GDrive) Head(token string, filename string) (contentType string, contentLength uint64, err error) {
  314. var fileId string
  315. fileId, err = s.findId(filename, token)
  316. if err != nil {
  317. return
  318. }
  319. var fi *drive.File
  320. if fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size").Do(); err != nil {
  321. return
  322. }
  323. contentLength = uint64(fi.Size)
  324. contentType = fi.MimeType
  325. return
  326. }
  327. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentType string, contentLength uint64, err error) {
  328. var fileId string
  329. fileId, err = s.findId(filename, token)
  330. if err != nil {
  331. return
  332. }
  333. var fi *drive.File
  334. fi, err = s.service.Files.Get(fileId).Fields("mimeType", "size", "md5Checksum").Do()
  335. if !s.hasChecksum(fi) {
  336. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  337. return
  338. }
  339. contentLength = uint64(fi.Size)
  340. contentType = fi.MimeType
  341. ctx := context.Background()
  342. var res *http.Response
  343. res, err = s.service.Files.Get(fileId).Context(ctx).Download()
  344. if err != nil {
  345. return
  346. }
  347. reader = res.Body
  348. return
  349. }
  350. func (s *GDrive) Delete(token string, filename string) (err error) {
  351. metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
  352. s.service.Files.Delete(metadata).Do()
  353. var fileId string
  354. fileId, err = s.findId(filename, token)
  355. if err != nil {
  356. return
  357. }
  358. err = s.service.Files.Delete(fileId).Do()
  359. return
  360. }
  361. func (s *GDrive) IsNotExist(err error) bool {
  362. if err != nil {
  363. if e, ok := err.(*googleapi.Error); ok {
  364. return e.Code == http.StatusNotFound
  365. }
  366. }
  367. return false
  368. }
  369. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  370. dirId, err := s.findId("", token)
  371. if err != nil {
  372. return err
  373. }
  374. if dirId == "" {
  375. dir := &drive.File{
  376. Name: token,
  377. Parents: []string{s.rootId},
  378. MimeType: GDriveDirectoryMimeType,
  379. }
  380. di, err := s.service.Files.Create(dir).Fields("id").Do()
  381. if err != nil {
  382. return err
  383. }
  384. dirId = di.Id
  385. }
  386. // Instantiate empty drive file
  387. dst := &drive.File{
  388. Name: filename,
  389. Parents: []string{dirId},
  390. MimeType: contentType,
  391. }
  392. ctx := context.Background()
  393. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  394. if err != nil {
  395. return err
  396. }
  397. return nil
  398. }
  399. // Retrieve a token, saves the token, then returns the generated client.
  400. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  401. tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
  402. tok, err := gDriveTokenFromFile(tokenFile)
  403. if err != nil {
  404. tok = getGDriveTokenFromWeb(config, logger)
  405. saveGDriveToken(tokenFile, tok, logger)
  406. }
  407. return config.Client(context.Background(), tok)
  408. }
  409. // Request a token from the web, then returns the retrieved token.
  410. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  411. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  412. fmt.Printf("Go to the following link in your browser then type the "+
  413. "authorization code: \n%v\n", authURL)
  414. var authCode string
  415. if _, err := fmt.Scan(&authCode); err != nil {
  416. logger.Fatalf("Unable to read authorization code %v", err)
  417. }
  418. tok, err := config.Exchange(context.TODO(), authCode)
  419. if err != nil {
  420. logger.Fatalf("Unable to retrieve token from web %v", err)
  421. }
  422. return tok
  423. }
  424. // Retrieves a token from a local file.
  425. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  426. f, err := os.Open(file)
  427. defer f.Close()
  428. if err != nil {
  429. return nil, err
  430. }
  431. tok := &oauth2.Token{}
  432. err = json.NewDecoder(f).Decode(tok)
  433. return tok, err
  434. }
  435. // Saves a token to a file path.
  436. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  437. logger.Printf("Saving credential file to: %s\n", path)
  438. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  439. defer f.Close()
  440. if err != nil {
  441. logger.Fatalf("Unable to cache oauth token: %v", err)
  442. }
  443. json.NewEncoder(f).Encode(token)
  444. }