Browse Source

Timeout reader wrapper actually prevents big upload

tags/v1.0.0
Andrea Spacca 5 years ago
parent
commit
828e693f44
1 changed files with 5 additions and 106 deletions
  1. +5
    -106
      server/storage.go

+ 5
- 106
server/storage.go View File

@@ -21,7 +21,6 @@ import (
"google.golang.org/api/googleapi"
"net/http"
"io/ioutil"
"time"
)

type Storage interface {
@@ -329,11 +328,8 @@ func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir
}

const GDriveRootConfigFile = "root_id.conf"
const GDriveTimeoutTimerInterval = time.Second * 10
const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"

type gDriveTimeoutReaderWrapper func(io.Reader) io.Reader

func (s *GDrive) setupRoot() error {
rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)

@@ -366,100 +362,6 @@ func (s *GDrive) setupRoot() error {
return nil
}

func (s *GDrive) getTimeoutReader(r io.Reader, cancel context.CancelFunc, timeout time.Duration) io.Reader {
return &GDriveTimeoutReader{
reader: r,
cancel: cancel,
mutex: &sync.Mutex{},
maxIdleTimeout: timeout,
}
}

type GDriveTimeoutReader struct {
reader io.Reader
cancel context.CancelFunc
lastActivity time.Time
timer *time.Timer
mutex *sync.Mutex
maxIdleTimeout time.Duration
done bool
}

func (r *GDriveTimeoutReader) Read(p []byte) (int, error) {
if r.timer == nil {
r.startTimer()
}

r.mutex.Lock()

// Read
n, err := r.reader.Read(p)

r.lastActivity = time.Now()
r.done = (err != nil)

r.mutex.Unlock()

if r.done {
r.stopTimer()
}

return n, err
}

func (r *GDriveTimeoutReader) Close() error {
return r.reader.(io.ReadCloser).Close()
}

func (r *GDriveTimeoutReader) startTimer() {
r.mutex.Lock()
defer r.mutex.Unlock()

if !r.done {
r.timer = time.AfterFunc(GDriveTimeoutTimerInterval, r.timeout)
}
}

func (r *GDriveTimeoutReader) stopTimer() {
r.mutex.Lock()
defer r.mutex.Unlock()

if r.timer != nil {
r.timer.Stop()
}
}

func (r *GDriveTimeoutReader) timeout() {
r.mutex.Lock()

if r.done {
r.mutex.Unlock()
return
}

if time.Since(r.lastActivity) > r.maxIdleTimeout {
r.cancel()
r.mutex.Unlock()
return
}

r.mutex.Unlock()
r.startTimer()
}

func (s *GDrive) getTimeoutReaderWrapperContext(timeout time.Duration) (gDriveTimeoutReaderWrapper, context.Context) {
ctx, cancel := context.WithCancel(context.TODO())
wrapper := func(r io.Reader) io.Reader {
// Return untouched reader if timeout is 0
if timeout == 0 {
return r
}

return s.getTimeoutReader(r, cancel, timeout)
}
return wrapper, ctx
}

func (s *GDrive) hasChecksum(f *drive.File) bool {
return f.Md5Checksum != ""
}
@@ -566,16 +468,14 @@ func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, conte
contentLength = uint64(fi.Size)
contentType = fi.MimeType

// Get timeout reader wrapper and context
timeoutReaderWrapper, ctx := s.getTimeoutReaderWrapperContext(time.Duration(GDriveTimeoutTimerInterval))

ctx := context.Background()
var res *http.Response
res, err = s.service.Files.Get(fileId).Context(ctx).Download()
if err != nil {
return
}

reader = timeoutReaderWrapper(res.Body).(io.ReadCloser)
reader = res.Body

return
}
@@ -616,9 +516,6 @@ func (s *GDrive) Put(token string, filename string, reader io.Reader, contentTyp
dirId = di.Id
}

// Wrap reader in timeout reader
timeoutReaderWrapper, ctx := s.getTimeoutReaderWrapperContext(time.Duration(GDriveTimeoutTimerInterval))

// Instantiate empty drive file
dst := &drive.File{
Name: filename,
@@ -626,7 +523,9 @@ func (s *GDrive) Put(token string, filename string, reader io.Reader, contentTyp
MimeType: contentType,
}

_, err = s.service.Files.Create(dst).Context(ctx).Media(timeoutReaderWrapper(reader)).Do()
ctx := context.Background()
_, err = s.service.Files.Create(dst).Context(ctx).Media(reader).Do()

if err != nil {
return err
}


Loading…
Cancel
Save