зеркало из https://github.com/github/vitess-gh.git
Refactoring backup interface.
This commit is contained in:
Родитель
0dc6e4bdd7
Коммит
f71c09363e
|
@ -179,18 +179,18 @@ func (mysqld *Mysqld) Backup(logger logutil.Logger, bucket, name string, backupC
|
|||
return fmt.Errorf("StartBackup failed: %v", err)
|
||||
}
|
||||
|
||||
if err = mysqld.backup(logger, bs, bh, backupConcurrency, hookExtraEnv); err != nil {
|
||||
if err := bs.AbortBackup(bh); err != nil {
|
||||
if err = mysqld.backup(logger, bh, backupConcurrency, hookExtraEnv); err != nil {
|
||||
if err := bh.AbortBackup(); err != nil {
|
||||
logger.Errorf("failed to abort backup: %v", err)
|
||||
}
|
||||
} else {
|
||||
err = bs.EndBackup(bh)
|
||||
err = bh.EndBackup()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (mysqld *Mysqld) backup(logger logutil.Logger, bs backupstorage.BackupStorage, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) error {
|
||||
func (mysqld *Mysqld) backup(logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) error {
|
||||
|
||||
// save initial state so we can restore
|
||||
slaveStartRequired := false
|
||||
|
@ -240,7 +240,7 @@ func (mysqld *Mysqld) backup(logger logutil.Logger, bs backupstorage.BackupStora
|
|||
}
|
||||
replicationPosition = slaveStatus.Position
|
||||
}
|
||||
logger.Infof("using replication position: %#v", replicationPosition)
|
||||
logger.Infof("using replication position: %v", replicationPosition)
|
||||
|
||||
// shutdown mysqld
|
||||
if err = mysqld.Shutdown(true, MysqlWaitTime); err != nil {
|
||||
|
@ -255,7 +255,7 @@ func (mysqld *Mysqld) backup(logger logutil.Logger, bs backupstorage.BackupStora
|
|||
logger.Infof("found %v files to backup", len(fes))
|
||||
|
||||
// backup everything
|
||||
if err := mysqld.backupFiles(logger, bs, bh, fes, replicationPosition, backupConcurrency); err != nil {
|
||||
if err := mysqld.backupFiles(logger, bh, fes, replicationPosition, backupConcurrency); err != nil {
|
||||
return fmt.Errorf("cannot backup files: %v", err)
|
||||
}
|
||||
|
||||
|
@ -286,7 +286,7 @@ func (mysqld *Mysqld) backup(logger logutil.Logger, bs backupstorage.BackupStora
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mysqld *Mysqld) backupFiles(logger logutil.Logger, bs backupstorage.BackupStorage, bh backupstorage.BackupHandle, fes []FileEntry, replicationPosition proto.ReplicationPosition, backupConcurrency int) error {
|
||||
func (mysqld *Mysqld) backupFiles(logger logutil.Logger, bh backupstorage.BackupHandle, fes []FileEntry, replicationPosition proto.ReplicationPosition, backupConcurrency int) error {
|
||||
|
||||
sema := sync2.NewSemaphore(backupConcurrency, 0)
|
||||
rec := concurrency.AllErrorRecorder{}
|
||||
|
@ -314,7 +314,7 @@ func (mysqld *Mysqld) backupFiles(logger logutil.Logger, bs backupstorage.Backup
|
|||
|
||||
// open the destination file for writing, and a buffer
|
||||
name := fmt.Sprintf("%v", i)
|
||||
wc, err := bs.AddFile(bh, name)
|
||||
wc, err := bh.AddFile(name)
|
||||
if err != nil {
|
||||
rec.RecordError(fmt.Errorf("cannot add file: %v", err))
|
||||
return
|
||||
|
@ -358,7 +358,7 @@ func (mysqld *Mysqld) backupFiles(logger logutil.Logger, bs backupstorage.Backup
|
|||
}
|
||||
|
||||
// open the MANIFEST
|
||||
wc, err := bs.AddFile(bh, backupManifest)
|
||||
wc, err := bh.AddFile(backupManifest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot add %v to backup: %v", backupManifest, err)
|
||||
}
|
||||
|
|
|
@ -22,8 +22,10 @@ var (
|
|||
|
||||
// FileBackupHandle implements BackupHandle for local file system.
|
||||
type FileBackupHandle struct {
|
||||
bucket string
|
||||
name string
|
||||
fbs *FileBackupStorage
|
||||
bucket string
|
||||
name string
|
||||
readOnly bool
|
||||
}
|
||||
|
||||
// Bucket is part of the BackupHandle interface
|
||||
|
@ -36,6 +38,40 @@ func (fbh *FileBackupHandle) Name() string {
|
|||
return fbh.name
|
||||
}
|
||||
|
||||
// AddFile is part of the BackupHandle interface
|
||||
func (fbh *FileBackupHandle) AddFile(filename string) (io.WriteCloser, error) {
|
||||
if fbh.readOnly {
|
||||
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
|
||||
}
|
||||
p := path.Join(fbh.fbs.root, fbh.bucket, fbh.name, filename)
|
||||
return os.Create(p)
|
||||
}
|
||||
|
||||
// EndBackup is part of the BackupHandle interface
|
||||
func (fbh *FileBackupHandle) EndBackup() error {
|
||||
if fbh.readOnly {
|
||||
return fmt.Errorf("EndBackup cannot be called on read-only backup")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AbortBackup is part of the BackupHandle interface
|
||||
func (fbh *FileBackupHandle) AbortBackup() error {
|
||||
if fbh.readOnly {
|
||||
return fmt.Errorf("AbortBackup cannot be called on read-only backup")
|
||||
}
|
||||
return fbh.fbs.RemoveBackup(fbh.bucket, fbh.name)
|
||||
}
|
||||
|
||||
// ReadFile is part of the BackupHandle interface
|
||||
func (fbh *FileBackupHandle) ReadFile(filename string) (io.ReadCloser, error) {
|
||||
if !fbh.readOnly {
|
||||
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
|
||||
}
|
||||
p := path.Join(fbh.fbs.root, fbh.bucket, fbh.name, filename)
|
||||
return os.Open(p)
|
||||
}
|
||||
|
||||
// FileBackupStorage implements BackupStorage for local file system.
|
||||
type FileBackupStorage struct {
|
||||
root string
|
||||
|
@ -62,8 +98,10 @@ func (fbs *FileBackupStorage) ListBackups(bucket string) ([]BackupHandle, error)
|
|||
continue
|
||||
}
|
||||
result = append(result, &FileBackupHandle{
|
||||
bucket: bucket,
|
||||
name: info.Name(),
|
||||
fbs: fbs,
|
||||
bucket: bucket,
|
||||
name: info.Name(),
|
||||
readOnly: true,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
|
@ -84,45 +122,13 @@ func (fbs *FileBackupStorage) StartBackup(bucket, name string) (BackupHandle, er
|
|||
}
|
||||
|
||||
return &FileBackupHandle{
|
||||
bucket: bucket,
|
||||
name: name,
|
||||
fbs: fbs,
|
||||
bucket: bucket,
|
||||
name: name,
|
||||
readOnly: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AddFile is part of the BackupStorage interface
|
||||
func (fbs *FileBackupStorage) AddFile(handle BackupHandle, filename string) (io.WriteCloser, error) {
|
||||
fbh, ok := handle.(*FileBackupHandle)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("FileBackupStorage only accepts FileBackupHandle")
|
||||
}
|
||||
p := path.Join(fbs.root, fbh.bucket, fbh.name, filename)
|
||||
return os.Create(p)
|
||||
}
|
||||
|
||||
// EndBackup is part of the BackupStorage interface
|
||||
func (fbs *FileBackupStorage) EndBackup(handle BackupHandle) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AbortBackup is part of the BackupStorage interface
|
||||
func (fbs *FileBackupStorage) AbortBackup(handle BackupHandle) error {
|
||||
fbh, ok := handle.(*FileBackupHandle)
|
||||
if !ok {
|
||||
return fmt.Errorf("FileBackupStorage only accepts FileBackupHandle")
|
||||
}
|
||||
return fbs.RemoveBackup(fbh.bucket, fbh.name)
|
||||
}
|
||||
|
||||
// ReadFile is part of the BackupStorage interface
|
||||
func (fbs *FileBackupStorage) ReadFile(handle BackupHandle, filename string) (io.ReadCloser, error) {
|
||||
fbh, ok := handle.(*FileBackupHandle)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("FileBackupStorage only accepts FileBackupHandle")
|
||||
}
|
||||
p := path.Join(fbs.root, fbh.bucket, fbh.name, filename)
|
||||
return os.Open(p)
|
||||
}
|
||||
|
||||
// RemoveBackup is part of the BackupStorage interface
|
||||
func (fbs *FileBackupStorage) RemoveBackup(bucket, name string) error {
|
||||
p := path.Join(fbs.root, bucket, name)
|
||||
|
|
|
@ -55,8 +55,8 @@ func TestListBackups(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("fbs.StartBackup failed: %v", err)
|
||||
}
|
||||
if err := fbs.EndBackup(bh); err != nil {
|
||||
t.Fatalf("fbs.EndBackup failed: %v", err)
|
||||
if err := bh.EndBackup(); err != nil {
|
||||
t.Fatalf("bh.EndBackup failed: %v", err)
|
||||
}
|
||||
|
||||
// verify we have one entry now
|
||||
|
@ -76,8 +76,8 @@ func TestListBackups(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("fbs.StartBackup failed: %v", err)
|
||||
}
|
||||
if err := fbs.EndBackup(bh); err != nil {
|
||||
t.Fatalf("fbs.EndBackup failed: %v", err)
|
||||
if err := bh.EndBackup(); err != nil {
|
||||
t.Fatalf("bh.EndBackup failed: %v", err)
|
||||
}
|
||||
|
||||
// verify we have two sorted entries now
|
||||
|
@ -112,8 +112,8 @@ func TestListBackups(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("fbs.StartBackup failed: %v", err)
|
||||
}
|
||||
if err := fbs.AbortBackup(bh); err != nil {
|
||||
t.Fatalf("fbs.AbortBackup failed: %v", err)
|
||||
if err := bh.AbortBackup(); err != nil {
|
||||
t.Fatalf("bh.AbortBackup failed: %v", err)
|
||||
}
|
||||
bhs, err = fbs.ListBackups(bucket)
|
||||
if err != nil {
|
||||
|
@ -124,6 +124,17 @@ func TestListBackups(t *testing.T) {
|
|||
bhs[0].Name() != firstBackup {
|
||||
t.Fatalf("ListBackups after abort returned wrong results: %#v", bhs)
|
||||
}
|
||||
|
||||
// check we cannot chaneg a backup we listed
|
||||
if _, err := bhs[0].AddFile("test"); err == nil {
|
||||
t.Fatalf("was able to AddFile to read-only backup")
|
||||
}
|
||||
if err := bhs[0].EndBackup(); err == nil {
|
||||
t.Fatalf("was able to EndBackup a read-only backup")
|
||||
}
|
||||
if err := bhs[0].AbortBackup(); err == nil {
|
||||
t.Fatalf("was able to AbortBackup a read-only backup")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileContents(t *testing.T) {
|
||||
|
@ -140,9 +151,9 @@ func TestFileContents(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("fbs.StartBackup failed: %v", err)
|
||||
}
|
||||
wc, err := fbs.AddFile(bh, filename1)
|
||||
wc, err := bh.AddFile(filename1)
|
||||
if err != nil {
|
||||
t.Fatalf("fbs.AddFile failed: %v", err)
|
||||
t.Fatalf("bh.AddFile failed: %v", err)
|
||||
}
|
||||
if _, err := wc.Write([]byte(contents1)); err != nil {
|
||||
t.Fatalf("wc.Write failed: %v", err)
|
||||
|
@ -150,8 +161,15 @@ func TestFileContents(t *testing.T) {
|
|||
if err := wc.Close(); err != nil {
|
||||
t.Fatalf("wc.Close failed: %v", err)
|
||||
}
|
||||
if err := fbs.EndBackup(bh); err != nil {
|
||||
t.Fatalf("fbs.EndBackup failed: %v", err)
|
||||
|
||||
// test we can't read back on read-write backup
|
||||
if _, err := bh.ReadFile(filename1); err == nil {
|
||||
t.Fatalf("was able to ReadFile to read-write backup")
|
||||
}
|
||||
|
||||
// and close
|
||||
if err := bh.EndBackup(); err != nil {
|
||||
t.Fatalf("bh.EndBackup failed: %v", err)
|
||||
}
|
||||
|
||||
// re-read the file
|
||||
|
@ -159,9 +177,9 @@ func TestFileContents(t *testing.T) {
|
|||
if err != nil || len(bhs) != 1 {
|
||||
t.Fatalf("ListBackups after abort returned wrong return: %v %v", err, bhs)
|
||||
}
|
||||
rc, err := fbs.ReadFile(bhs[0], filename1)
|
||||
rc, err := bhs[0].ReadFile(filename1)
|
||||
if err != nil {
|
||||
t.Fatalf("fbs.ReadFile failed: %v", err)
|
||||
t.Fatalf("bhs[0].ReadFile failed: %v", err)
|
||||
}
|
||||
buf := make([]byte, len(contents1)+10)
|
||||
if n, err := rc.Read(buf); (err != nil && err != io.EOF) || n != len(contents1) {
|
||||
|
|
|
@ -25,34 +25,43 @@ type BackupHandle interface {
|
|||
// Name is the individual name of the backup. Will contain
|
||||
// tabletAlias-timestamp.
|
||||
Name() string
|
||||
}
|
||||
|
||||
// BackupStorage is the interface to the storage system
|
||||
type BackupStorage interface {
|
||||
// ListBackups returns all the backups in a bucket.
|
||||
ListBackups(bucket string) ([]BackupHandle, error)
|
||||
|
||||
// StartBackup creates a new backup with the given name.
|
||||
// If a backup with the same name already exists, it's an error.
|
||||
StartBackup(bucket, name string) (BackupHandle, error)
|
||||
|
||||
// AddFile opens a new file to be added to the backup.
|
||||
// Only works for read-write backups (created by StartBackup).
|
||||
// filename is guaranteed to only contain alphanumerical
|
||||
// characters and hyphens.
|
||||
// It should be thread safe, it is possible to call AddFile in
|
||||
// multiple go routines once a backup has been started.
|
||||
AddFile(handle BackupHandle, filename string) (io.WriteCloser, error)
|
||||
AddFile(filename string) (io.WriteCloser, error)
|
||||
|
||||
// EndBackup stops and closes a backup. The contents should be kept.
|
||||
EndBackup(handle BackupHandle) error
|
||||
// Only works for read-write backups (created by StartBackup).
|
||||
EndBackup() error
|
||||
|
||||
// AbortBackup stops a backup, and removes the contents that
|
||||
// have been copied already. It is called if an error occurs
|
||||
// while the backup is being taken, and the backup cannot be finished.
|
||||
AbortBackup(handle BackupHandle) error
|
||||
// Only works for read-write backups (created by StartBackup).
|
||||
AbortBackup() error
|
||||
|
||||
// ReadFile starts reading a file from a backup.
|
||||
ReadFile(handle BackupHandle, filename string) (io.ReadCloser, error)
|
||||
// Only works for read-only backups (created by ListBackups).
|
||||
ReadFile(filename string) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
// BackupStorage is the interface to the storage system
|
||||
type BackupStorage interface {
|
||||
// ListBackups returns all the backups in a bucket. The
|
||||
// returned backups are read-only (ReadFile can be called, but
|
||||
// AddFile/EndBackup/AbortBackup cannot)
|
||||
ListBackups(bucket string) ([]BackupHandle, error)
|
||||
|
||||
// StartBackup creates a new backup with the given name. If a
|
||||
// backup with the same name already exists, it's an error.
|
||||
// The returned backup is read-write
|
||||
// (AddFile/EndBackup/AbortBackup cann all be called, not
|
||||
// ReadFile)
|
||||
StartBackup(bucket, name string) (BackupHandle, error)
|
||||
|
||||
// RemoveBackup removes all the data associated with a backup.
|
||||
// It will not appear in ListBackups after RemoveBackup succeeds.
|
||||
|
|
Загрузка…
Ссылка в новой задаче