|
|
@@ -37,10 +37,11 @@ type Storage interface { |
|
|
|
type LocalStorage struct { |
|
|
|
Storage |
|
|
|
basedir string |
|
|
|
logger *log.Logger |
|
|
|
} |
|
|
|
|
|
|
|
func NewLocalStorage(basedir string) (*LocalStorage, error) { |
|
|
|
return &LocalStorage{basedir: basedir}, nil |
|
|
|
func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) { |
|
|
|
return &LocalStorage{basedir: basedir, logger: logger}, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (s *LocalStorage) Type() string { |
|
|
@@ -110,7 +111,6 @@ func (s *LocalStorage) Put(token string, filename string, reader io.Reader, cont |
|
|
|
} |
|
|
|
|
|
|
|
if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil { |
|
|
|
fmt.Printf("%s", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
@@ -126,15 +126,16 @@ func (s *LocalStorage) Put(token string, filename string, reader io.Reader, cont |
|
|
|
type S3Storage struct { |
|
|
|
Storage |
|
|
|
bucket *s3.Bucket |
|
|
|
logger *log.Logger |
|
|
|
} |
|
|
|
|
|
|
|
func NewS3Storage(accessKey, secretKey, bucketName, endpoint string) (*S3Storage, error) { |
|
|
|
func NewS3Storage(accessKey, secretKey, bucketName, endpoint string, logger *log.Logger) (*S3Storage, error) { |
|
|
|
bucket, err := getBucket(accessKey, secretKey, bucketName, endpoint) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
return &S3Storage{bucket: bucket}, nil |
|
|
|
return &S3Storage{bucket: bucket, logger: logger}, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (s *S3Storage) Type() string { |
|
|
@@ -165,7 +166,7 @@ func (s *S3Storage) IsNotExist(err error) bool { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("IsNotExist: %s, %#v", err.Error(), err) |
|
|
|
s.logger.Printf("IsNotExist: %s, %#v", err.Error(), err) |
|
|
|
|
|
|
|
b := (err.Error() == "The specified key does not exist.") |
|
|
|
b = b || (err.Error() == "Access Denied") |
|
|
@@ -210,7 +211,7 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content |
|
|
|
) |
|
|
|
|
|
|
|
if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil { |
|
|
|
log.Printf(err.Error()) |
|
|
|
s.logger.Printf(err.Error()) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
@@ -234,13 +235,13 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content |
|
|
|
|
|
|
|
// Amazon expects parts of at least 5MB, except for the last one |
|
|
|
if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { |
|
|
|
log.Printf(err.Error()) |
|
|
|
s.logger.Printf(err.Error()) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// always send minimal 1 part |
|
|
|
if err == io.EOF && index > 1 { |
|
|
|
log.Printf("Waiting for all parts to finish uploading.") |
|
|
|
s.logger.Printf("Waiting for all parts to finish uploading.") |
|
|
|
|
|
|
|
// wait for all parts to be finished uploading |
|
|
|
wg.Wait() |
|
|
@@ -257,10 +258,10 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content |
|
|
|
|
|
|
|
// using goroutines because of retries when upload fails |
|
|
|
go func(multi *s3.Multi, buffer []byte, index int) { |
|
|
|
log.Printf("Uploading part %d %d", index, len(buffer)) |
|
|
|
s.logger.Printf("Uploading part %d %d", index, len(buffer)) |
|
|
|
|
|
|
|
defer func() { |
|
|
|
log.Printf("Finished part %d %d", index, len(buffer)) |
|
|
|
s.logger.Printf("Finished part %d %d", index, len(buffer)) |
|
|
|
|
|
|
|
wg.Done() |
|
|
|
|
|
|
@@ -272,12 +273,12 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content |
|
|
|
var part s3.Part |
|
|
|
|
|
|
|
if part, err = multi.PutPart(index, partReader); err != nil { |
|
|
|
log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error()) |
|
|
|
s.logger.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error()) |
|
|
|
partsChan <- err |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("Finished uploading part %d %d", index, len(buffer)) |
|
|
|
s.logger.Printf("Finished uploading part %d %d", index, len(buffer)) |
|
|
|
|
|
|
|
partsChan <- part |
|
|
|
|
|
|
@@ -294,7 +295,7 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content |
|
|
|
parts = append(parts, part.(s3.Part)) |
|
|
|
case error: |
|
|
|
// abort multi upload |
|
|
|
log.Printf("Error during upload, aborting %s.", part.(error).Error()) |
|
|
|
s.logger.Printf("Error during upload, aborting %s.", part.(error).Error()) |
|
|
|
err = part.(error) |
|
|
|
|
|
|
|
multi.Abort() |
|
|
@@ -303,14 +304,14 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("Completing upload %d parts", len(parts)) |
|
|
|
s.logger.Printf("Completing upload %d parts", len(parts)) |
|
|
|
|
|
|
|
if err = multi.Complete(parts); err != nil { |
|
|
|
log.Printf("Error during completing upload %d parts %s", len(parts), err.Error()) |
|
|
|
s.logger.Printf("Error during completing upload %d parts %s", len(parts), err.Error()) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("Completed uploading %d", len(parts)) |
|
|
|
s.logger.Printf("Completed uploading %d", len(parts)) |
|
|
|
|
|
|
|
return |
|
|
|
} |
|
|
@@ -320,9 +321,10 @@ type GDrive struct { |
|
|
|
rootId string |
|
|
|
basedir string |
|
|
|
localConfigPath string |
|
|
|
logger *log.Logger |
|
|
|
} |
|
|
|
|
|
|
|
func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string) (*GDrive, error) { |
|
|
|
func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, logger *log.Logger) (*GDrive, error) { |
|
|
|
b, err := ioutil.ReadFile(clientJsonFilepath) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
@@ -334,12 +336,12 @@ func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
srv, err := drive.New(getGDriveClient(config, localConfigPath)) |
|
|
|
srv, err := drive.New(getGDriveClient(config, localConfigPath, logger)) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath} |
|
|
|
storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, logger: logger} |
|
|
|
err = storage.setupRoot() |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
@@ -570,31 +572,31 @@ func (s *GDrive) Put(token string, filename string, reader io.Reader, contentTyp |
|
|
|
} |
|
|
|
|
|
|
|
// Retrieve a token, saves the token, then returns the generated client. |
|
|
|
func getGDriveClient(config *oauth2.Config, localConfigPath string) *http.Client { |
|
|
|
func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client { |
|
|
|
tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile) |
|
|
|
tok, err := gDriveTokenFromFile(tokenFile) |
|
|
|
if err != nil { |
|
|
|
tok = getGDriveTokenFromWeb(config) |
|
|
|
saveGDriveToken(tokenFile, tok) |
|
|
|
tok = getGDriveTokenFromWeb(config, logger) |
|
|
|
saveGDriveToken(tokenFile, tok, logger) |
|
|
|
} |
|
|
|
|
|
|
|
return config.Client(context.Background(), tok) |
|
|
|
} |
|
|
|
|
|
|
|
// Request a token from the web, then returns the retrieved token. |
|
|
|
func getGDriveTokenFromWeb(config *oauth2.Config) *oauth2.Token { |
|
|
|
func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token { |
|
|
|
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline) |
|
|
|
fmt.Printf("Go to the following link in your browser then type the "+ |
|
|
|
"authorization code: \n%v\n", authURL) |
|
|
|
|
|
|
|
var authCode string |
|
|
|
if _, err := fmt.Scan(&authCode); err != nil { |
|
|
|
log.Fatalf("Unable to read authorization code %v", err) |
|
|
|
logger.Fatalf("Unable to read authorization code %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
tok, err := config.Exchange(context.TODO(), authCode) |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Unable to retrieve token from web %v", err) |
|
|
|
logger.Fatalf("Unable to retrieve token from web %v", err) |
|
|
|
} |
|
|
|
return tok |
|
|
|
} |
|
|
@@ -612,12 +614,13 @@ func gDriveTokenFromFile(file string) (*oauth2.Token, error) { |
|
|
|
} |
|
|
|
|
|
|
|
// Saves a token to a file path. |
|
|
|
func saveGDriveToken(path string, token *oauth2.Token) { |
|
|
|
fmt.Printf("Saving credential file to: %s\n", path) |
|
|
|
func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) { |
|
|
|
logger.Printf("Saving credential file to: %s\n", path) |
|
|
|
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) |
|
|
|
defer f.Close() |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Unable to cache oauth token: %v", err) |
|
|
|
logger.Fatalf("Unable to cache oauth token: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
json.NewEncoder(f).Encode(token) |
|
|
|
} |