Merge pull request #1348 from michael-berlin/fix_cloud_storage_backup

GCS backup plugin: Adapt to new API of the GCS Go client.
This commit is contained in:
Michael Berlin 2015-11-25 22:19:17 -08:00
Родитель 2341bd4101 aa9dfda4d5
Коммит b8e3ae42e7
1 изменённых файлов: 31 добавлений и 20 удалений

Просмотреть файл

@ -12,10 +12,9 @@ import (
"io"
"sort"
"strings"
"sync"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/storage"
"github.com/youtube/vitess/go/vt/mysqlctl/backupstorage"
@ -23,6 +22,7 @@ import (
var (
// project is the Google Developers Console project ID.
// TODO(mberlin): Find out where to fill this in in the new API.
project = flag.String("gcs_backup_storage_project", "", "Google Developers Console project ID to use for backups")
// bucket is where the backups will go.
@ -34,7 +34,7 @@ var (
// GCSBackupHandle implements BackupHandle for Google Cloud Storage.
type GCSBackupHandle struct {
authCtx context.Context
client *storage.Client
bs *GCSBackupStorage
dir string
name string
@ -56,7 +56,8 @@ func (bh *GCSBackupHandle) AddFile(filename string) (io.WriteCloser, error) {
if bh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
return storage.NewWriter(bh.authCtx, *bucket, objName(bh.dir, bh.name, filename)), nil
object := objName(bh.dir, bh.name, filename)
return bh.client.Bucket(*bucket).Object(object).NewWriter(context.TODO()), nil
}
// EndBackup implements BackupHandle.
@ -80,17 +81,22 @@ func (bh *GCSBackupHandle) ReadFile(filename string) (io.ReadCloser, error) {
if !bh.readOnly {
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
return storage.NewReader(bh.authCtx, *bucket, objName(bh.dir, bh.name, filename))
object := objName(bh.dir, bh.name, filename)
return bh.client.Bucket(*bucket).Object(object).NewReader(context.TODO())
}
// GCSBackupStorage implements BackupStorage for local file system.
// GCSBackupStorage implements BackupStorage for Google Cloud Storage.
type GCSBackupStorage struct {
authCtx context.Context
// client is the instance of the Google Cloud Storage Go client.
// Once this field is set, it must not be written again/unset to nil.
client *storage.Client
// mu guards all fields.
mu sync.Mutex
}
// ListBackups implements BackupStorage.
func (bs *GCSBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHandle, error) {
authCtx, err := bs.authContext()
c, err := bs.getClientOrCreate()
if err != nil {
return nil, err
}
@ -105,7 +111,7 @@ func (bs *GCSBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHandl
// Loop in case results are returned in multiple batches.
for query != nil {
objs, err := storage.ListObjects(authCtx, *bucket, query)
objs, err := c.Bucket(*bucket).List(context.TODO(), query)
if err != nil {
return nil, err
}
@ -127,7 +133,7 @@ func (bs *GCSBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHandl
result := make([]backupstorage.BackupHandle, 0, len(subdirs))
for _, subdir := range subdirs {
result = append(result, &GCSBackupHandle{
authCtx: authCtx,
client: c,
bs: bs,
dir: dir,
name: subdir,
@ -139,13 +145,13 @@ func (bs *GCSBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHandl
// StartBackup implements BackupStorage.
func (bs *GCSBackupStorage) StartBackup(dir, name string) (backupstorage.BackupHandle, error) {
authCtx, err := bs.authContext()
c, err := bs.getClientOrCreate()
if err != nil {
return nil, err
}
return &GCSBackupHandle{
authCtx: authCtx,
client: c,
bs: bs,
dir: dir,
name: name,
@ -155,7 +161,7 @@ func (bs *GCSBackupStorage) StartBackup(dir, name string) (backupstorage.BackupH
// RemoveBackup implements BackupStorage.
func (bs *GCSBackupStorage) RemoveBackup(dir, name string) error {
authCtx, err := bs.authContext()
c, err := bs.getClientOrCreate()
if err != nil {
return err
}
@ -167,14 +173,14 @@ func (bs *GCSBackupStorage) RemoveBackup(dir, name string) error {
// Loop in case results are returned in multiple batches.
for query != nil {
objs, err := storage.ListObjects(authCtx, *bucket, query)
objs, err := c.Bucket(*bucket).List(context.TODO(), query)
if err != nil {
return err
}
// Delete all the found objects.
for _, obj := range objs.Results {
if err := storage.DeleteObject(authCtx, *bucket, obj.Name); err != nil {
if err := c.Bucket(*bucket).Object(obj.Name).Delete(context.TODO()); err != nil {
return fmt.Errorf("unable to delete %q from bucket %q: %v", obj.Name, *bucket, err)
}
}
@ -185,15 +191,20 @@ func (bs *GCSBackupStorage) RemoveBackup(dir, name string) error {
return nil
}
func (bs *GCSBackupStorage) authContext() (context.Context, error) {
if bs.authCtx == nil {
client, err := google.DefaultClient(context.TODO())
// getClientOrCreate returns the GCS Storage client instance.
// If there isn't one yet, it tries to create one.
func (bs *GCSBackupStorage) getClientOrCreate() (*storage.Client, error) {
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.client == nil {
client, err := storage.NewClient(context.TODO())
if err != nil {
return nil, err
}
bs.authCtx = cloud.NewContext(*project, client)
bs.client = client
}
return bs.authCtx, nil
return bs.client, nil
}
// objName joins path parts into an object name.