Browse Source

Merge pull request #337 from dutchcoders/PURGE_FEATURE

Purge feature
tags/v1.2.0
Andrea Spacca 3 years ago
committed by GitHub
parent
commit
76fd83b599
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 20 deletions
  1. +2
    -0
      README.md
  2. +20
    -2
      cmd/cmd.go
  3. +13
    -0
      server/handlers.go
  4. +14
    -0
      server/server.go
  5. +90
    -18
      server/storage.go

+ 2
- 0
README.md View File

@@ -113,6 +113,8 @@ cors-domains | comma separated list of domains for CORS, setting it enable CORS
clamav-host | host for clamav feature | | CLAMAV_HOST |
rate-limit | request per minute | | RATE_LIMIT |
max-upload-size | max upload size in kilobytes | | MAX_UPLOAD_SIZE |
purge-days | number of days after the uploads are purged automatically | | PURGE_DAYS |
purge-interval | interval in hours to run the automatic purge for (not applicable to S3 and Storj) | | PURGE_INTERVAL |

If you want to use TLS using lets encrypt certificates, set lets-encrypt-hosts to your domain, set tls-listener to :443 and enable force-https.



+ 20
- 2
cmd/cmd.go View File

@@ -191,6 +191,18 @@ var globalFlags = []cli.Flag{
Value: 0,
EnvVar: "RATE_LIMIT",
},
cli.IntFlag{
Name: "purge-days",
Usage: "number of days after uploads are purged automatically",
Value: 0,
EnvVar: "PURGE_DAYS",
},
cli.IntFlag{
Name: "purge-interval",
Usage: "interval in hours to run the automatic purge for",
Value: 0,
EnvVar: "PURGE_INTERVAL",
},
cli.Int64Flag{
Name: "max-upload-size",
Usage: "max limit for upload, in kilobytes",
@@ -365,6 +377,12 @@ func New() *Cmd {
options = append(options, server.RateLimit(v))
}

purgeDays := c.Int("purge-days")
purgeInterval := c.Int("purge-interval")
if purgeDays > 0 && purgeInterval > 0 {
options = append(options, server.Purge(purgeDays, purgeInterval))
}

if cert := c.String("tls-cert-file"); cert == "" {
} else if pk := c.String("tls-private-key"); pk == "" {
} else {
@@ -410,7 +428,7 @@ func New() *Cmd {
panic("secret-key not set.")
} else if bucket := c.String("bucket"); bucket == "" {
panic("bucket not set.")
} else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, c.String("s3-region"), c.String("s3-endpoint"), logger, c.Bool("s3-no-multipart"), c.Bool("s3-path-style")); err != nil {
} else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, purgeDays, c.String("s3-region"), c.String("s3-endpoint"), c.Bool("s3-no-multipart"), c.Bool("s3-path-style"), logger); err != nil {
panic(err)
} else {
options = append(options, server.UseStorage(storage))
@@ -434,7 +452,7 @@ func New() *Cmd {
panic("storj-access not set.")
} else if bucket := c.String("storj-bucket"); bucket == "" {
panic("storj-bucket not set.")
} else if storage, err := server.NewStorjStorage(access, bucket, logger); err != nil {
} else if storage, err := server.NewStorjStorage(access, bucket, purgeDays, logger); err != nil {
panic(err)
} else {
options = append(options, server.UseStorage(storage))


+ 13
- 0
server/handlers.go View File

@@ -690,6 +690,19 @@ func (s *Server) CheckDeletionToken(deletionToken, token, filename string) error
return nil
}

func (s *Server) purgeHandler() {
ticker := time.NewTicker(s.purgeInterval)
go func() {
for {
select {
case <-ticker.C:
err := s.storage.Purge(s.purgeDays)
log.Printf("error cleaning up expired files: %v", err)
}
}
}()
}

func (s *Server) deleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)



+ 14
- 0
server/server.go View File

@@ -187,6 +187,13 @@ func RateLimit(requests int) OptionFn {
}
}

func Purge(days, interval int) OptionFn {
return func(srvr *Server) {
srvr.purgeDays = time.Duration(days) * time.Hour * 24
srvr.purgeInterval = time.Duration(interval) * time.Hour
}
}

func ForceHTTPs() OptionFn {
return func(srvr *Server) {
srvr.forceHTTPs = true
@@ -280,6 +287,9 @@ type Server struct {
maxUploadSize int64
rateLimitRequests int

purgeDays time.Duration
purgeInterval time.Duration

storage Storage

forceHTTPs bool
@@ -500,6 +510,10 @@ func (s *Server) Run() {

s.logger.Printf("---------------------------")

if s.purgeDays > 0 {
go s.purgeHandler()
}

term := make(chan os.Signal, 1)
signal.Notify(term, os.Interrupt)
signal.Notify(term, syscall.SIGTERM)


+ 90
- 18
server/storage.go View File

@@ -4,14 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
@@ -22,6 +14,14 @@ import (
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"storj.io/common/storj"
"storj.io/uplink"
@@ -33,6 +33,7 @@ type Storage interface {
Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
Delete(token string, filename string) error
IsNotExist(err error) bool
Purge(days time.Duration) error

Type() string
}
@@ -91,6 +92,27 @@ func (s *LocalStorage) Delete(token string, filename string) (err error) {
return
}

func (s *LocalStorage) Purge(days time.Duration) (err error) {
err = filepath.Walk(s.basedir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}

if info.ModTime().After(time.Now().Add(-1 * days)) {
err = os.Remove(path)
return err
}

return nil
})

return
}

func (s *LocalStorage) IsNotExist(err error) bool {
if err == nil {
return false
@@ -128,13 +150,21 @@ type S3Storage struct {
session *session.Session
s3 *s3.S3
logger *log.Logger
purgeDays time.Duration
noMultipart bool
}

func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) {
func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)

return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil
return &S3Storage{
bucket: bucketName,
s3: s3.New(sess),
session: sess,
logger: logger,
noMultipart: disableMultipart,
purgeDays: time.Duration(purgeDays*24) * time.Hour,
}, nil
}

func (s *S3Storage) Type() string {
@@ -162,6 +192,11 @@ func (s *S3Storage) Head(token string, filename string) (contentLength uint64, e
return
}

func (s *S3Storage) Purge(days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}

func (s *S3Storage) IsNotExist(err error) bool {
if err == nil {
return false
@@ -239,9 +274,10 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content
})

_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
Expires: aws.Time(time.Now().Add(s.purgeDays)),
})

return
@@ -448,6 +484,34 @@ func (s *GDrive) Delete(token string, filename string) (err error) {
return
}

func (s *GDrive) Purge(days time.Duration) (err error) {
nextPageToken := ""

expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
q := fmt.Sprintf("'%s' in parents and modifiedTime > '%s' and mimeType!='%s' and trashed=false", s.rootId, expirationDate, GDriveDirectoryMimeType)
l, err := s.list(nextPageToken, q)
if err != nil {
return err
}

for 0 < len(l.Files) {
for _, fi := range l.Files {
err = s.service.Files.Delete(fi.Id).Do()
if err != nil {
return
}
}

if l.NextPageToken == "" {
break
}

l, err = s.list(l.NextPageToken, q)
}

return
}

func (s *GDrive) IsNotExist(err error) bool {
if err != nil {
if e, ok := err.(*googleapi.Error); ok {
@@ -552,12 +616,13 @@ func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {

type StorjStorage struct {
Storage
project *uplink.Project
bucket *uplink.Bucket
logger *log.Logger
project *uplink.Project
bucket *uplink.Bucket
purgeDays time.Duration
logger *log.Logger
}

func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, error) {
func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
var instance StorjStorage
var err error

@@ -580,6 +645,8 @@ func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage,
return nil, err
}

instance.purgeDays = time.Duration(purgeDays*24) * time.Hour

instance.logger = logger

return &instance, nil
@@ -634,6 +701,11 @@ func (s *StorjStorage) Delete(token string, filename string) (err error) {
return
}

func (s *StorjStorage) Purge(days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}

func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := storj.JoinPaths(token, filename)

@@ -641,7 +713,7 @@ func (s *StorjStorage) Put(token string, filename string, reader io.Reader, cont

ctx := context.TODO()

writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, nil)
writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)})
if err != nil {
return err
}


Loading…
Cancel
Save