From 6ac6c8fa99a57e61a7bf65806d80c966fd022caf Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 5 Jan 2021 17:23:47 +0100 Subject: [PATCH] PURGE FEATURE --- README.md | 2 + cmd/cmd.go | 23 ++++++++++- server/handlers.go | 13 +++++++ server/server.go | 15 ++++++++ server/storage.go | 96 ++++++++++++++++++++++++++++++++++++++++------ 5 files changed, 135 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index c37629f..474c91e 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,8 @@ cors-domains | comma separated list of domains for CORS, setting it enable CORS clamav-host | host for clamav feature | | CLAMAV_HOST | rate-limit | request per minute | | RATE_LIMIT | max-upload-size | max upload size in kilobytes | | MAX_UPLOAD_SIZE | +purge-days | number of days after the uploads are purged automatically | | PURGE_DAYS | +purge-interval | interval in hours to run the automatic purge for (not applicable to S3 and Storj) | | PURGE_INTERVAL | If you want to use TLS using lets encrypt certificates, set lets-encrypt-hosts to your domain, set tls-listener to :443 and enable force-https. diff --git a/cmd/cmd.go b/cmd/cmd.go index f289af6..fde914c 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -191,6 +191,18 @@ var globalFlags = []cli.Flag{ Value: 0, EnvVar: "RATE_LIMIT", }, + cli.IntFlag{ + Name: "purge-days", + Usage: "number of days after uploads are purged automatically", + Value: 0, + EnvVar: "PURGE_DAYS", + }, + cli.IntFlag{ + Name: "purge-interval", + Usage: "interval in hours to run the automatic purge for", + Value: 0, + EnvVar: "PURGE_INTERVAL", + }, cli.Int64Flag{ Name: "max-upload-size", Usage: "max limit for upload, in kilobytes", @@ -365,6 +377,13 @@ func New() *Cmd { options = append(options, server.RateLimit(v)) } + + purgeDays := c.Int("purge-days") + purgeInterval := c.Int("purge-interval") + if purgeDays > 0 && purgeInterval > 0 { + options = append(options, server.Purge(purgeDays, purgeInterval)) + } + if cert := c.String("tls-cert-file"); cert == "" { } else if pk := c.String("tls-private-key"); pk == "" { } else { @@ -410,7 +429,7 @@ func New() *Cmd { panic("secret-key not set.") } else if bucket := c.String("bucket"); bucket == "" { panic("bucket not set.") - } else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, c.String("s3-region"), c.String("s3-endpoint"), logger, c.Bool("s3-no-multipart"), c.Bool("s3-path-style")); err != nil { + } else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, purgeDays, c.String("s3-region"), c.String("s3-endpoint"), c.Bool("s3-no-multipart"), c.Bool("s3-path-style"), logger); err != nil { panic(err) } else { options = append(options, server.UseStorage(storage)) @@ -434,7 +453,7 @@ func New() *Cmd { panic("storj-access not set.") } else if bucket := c.String("storj-bucket"); bucket == "" { panic("storj-bucket not set.") - } else if storage, err := server.NewStorjStorage(access, bucket, logger); err != nil { + } else if storage, err := server.NewStorjStorage(access, bucket, purgeDays, logger); err != nil { panic(err) } else { options = append(options, server.UseStorage(storage)) diff --git a/server/handlers.go b/server/handlers.go index 67f3779..a03e511 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -690,6 +690,19 @@ func (s *Server) CheckDeletionToken(deletionToken, token, filename string) error return nil } +func (s *Server) purgeHandler() { + ticker := time.NewTicker(s.purgeInterval) + go func() { + for { + select { + case <-ticker.C: + err := s.storage.Purge(s.purgeDays) + log.Printf("error cleaning up expired files: %v", err) + } + } + }() +} + func (s *Server) deleteHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) diff --git a/server/server.go b/server/server.go index 2c2d6ec..ff999b0 100644 --- a/server/server.go +++ b/server/server.go @@ -187,6 +187,14 @@ func RateLimit(requests int) OptionFn { } } +func Purge(days, interval int) OptionFn { + return func(srvr *Server) { + srvr.purgeDays = time.Duration(days) * time.Hour * 24 + srvr.purgeInterval = time.Duration(interval) * time.Hour + } +} + + func ForceHTTPs() OptionFn { return func(srvr *Server) { srvr.forceHTTPs = true @@ -280,6 +288,9 @@ type Server struct { maxUploadSize int64 rateLimitRequests int + purgeDays time.Duration + purgeInterval time.Duration + storage Storage forceHTTPs bool @@ -500,6 +511,10 @@ func (s *Server) Run() { s.logger.Printf("---------------------------") + if s.purgeDays > 0 { + go s.purgeHandler() + } + term := make(chan os.Signal, 1) signal.Notify(term, os.Interrupt) signal.Notify(term, syscall.SIGTERM) diff --git a/server/storage.go b/server/storage.go index 0f610b6..ab3c839 100644 --- a/server/storage.go +++ b/server/storage.go @@ -4,14 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "io" - "io/ioutil" - "log" - "net/http" - "os" - "path/filepath" - "strings" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" @@ -22,6 +14,14 @@ import ( "golang.org/x/oauth2/google" "google.golang.org/api/drive/v3" "google.golang.org/api/googleapi" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "path/filepath" + "strings" + "time" "storj.io/common/storj" "storj.io/uplink" @@ -33,6 +33,7 @@ type Storage interface { Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error Delete(token string, filename string) error IsNotExist(err error) bool + Purge(days time.Duration) error Type() string } @@ -91,6 +92,27 @@ func (s *LocalStorage) Delete(token string, filename string) (err error) { return } +func (s *LocalStorage) Purge(days time.Duration) (err error) { + err = filepath.Walk(s.basedir, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + + if info.ModTime().After(time.Now().Add(-1 * days)) { + err = os.Remove(path) + return err + } + + return nil + }) + + return +} + func (s *LocalStorage) IsNotExist(err error) bool { if err == nil { return false @@ -128,13 +150,21 @@ type S3Storage struct { session *session.Session s3 *s3.S3 logger *log.Logger + purgeDays time.Duration noMultipart bool } -func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) { +func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) { sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle) - return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil + return &S3Storage{ + bucket: bucketName, + s3: s3.New(sess), + session: sess, + logger: logger, + noMultipart: disableMultipart, + purgeDays: time.Duration(purgeDays * 24) * time.Hour, + }, nil } func (s *S3Storage) Type() string { @@ -162,6 +192,11 @@ func (s *S3Storage) Head(token string, filename string) (contentLength uint64, e return } +func (s *S3Storage) Purge(days time.Duration) (err error) { + // NOOP expiration is set at upload time + return nil +} + func (s *S3Storage) IsNotExist(err error) bool { if err == nil { return false @@ -242,6 +277,7 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content Bucket: aws.String(s.bucket), Key: aws.String(key), Body: reader, + Expires: aws.Time(time.Now().Add(s.purgeDays)), }) return @@ -448,6 +484,34 @@ func (s *GDrive) Delete(token string, filename string) (err error) { return } +func (s *GDrive) Purge(days time.Duration) (err error) { + nextPageToken := "" + + expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339) + q := fmt.Sprintf("'%s' in parents and modifiedTime > '%s' and mimeType!='%s' and trashed=false", s.rootId, expirationDate, GDriveDirectoryMimeType) + l, err := s.list(nextPageToken, q) + if err != nil { + return err + } + + for 0 < len(l.Files) { + for _, fi := range l.Files { + err = s.service.Files.Delete(fi.Id).Do() + if err != nil { + return + } + } + + if l.NextPageToken == "" { + break + } + + l, err = s.list(l.NextPageToken, q) + } + + return +} + func (s *GDrive) IsNotExist(err error) bool { if err != nil { if e, ok := err.(*googleapi.Error); ok { @@ -554,10 +618,11 @@ type StorjStorage struct { Storage project *uplink.Project bucket *uplink.Bucket + purgeDays time.Duration logger *log.Logger } -func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, error) { +func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) { var instance StorjStorage var err error @@ -580,6 +645,8 @@ func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, return nil, err } + instance.purgeDays = time.Duration(purgeDays * 24) * time.Hour + instance.logger = logger return &instance, nil @@ -634,6 +701,11 @@ func (s *StorjStorage) Delete(token string, filename string) (err error) { return } +func (s *StorjStorage) Purge(days time.Duration) (err error) { + // NOOP expiration is set at upload time + return nil +} + func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) { key := storj.JoinPaths(token, filename) @@ -641,7 +713,7 @@ func (s *StorjStorage) Put(token string, filename string, reader io.Reader, cont ctx := context.TODO() - writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, nil) + writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)}) if err != nil { return err }