зеркало из https://github.com/github/vitess-gh.git
Refactor: Unexplode Backup() function, pass BackupRequest as argument (#10904)
* Unexplode Backup() function, pass BackupRequest as argument Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * use tabletmanagerdata.BackupRequest Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * make proto Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * removed duplicate tabletmanagerdata imports Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * tabletmanagerdatapb Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * vschemapb Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * require.NoError Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
This commit is contained in:
Родитель
ee19530960
Коммит
bb334264ce
|
@ -52,7 +52,7 @@ func TestVtctldListAllTablets(t *testing.T) {
|
|||
func testListAllTablets(t *testing.T) {
|
||||
// first w/o any filters, aside from cell
|
||||
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets")
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
tablets := getAllTablets()
|
||||
tabletsFromCMD := strings.Split(result, "\n")
|
||||
|
@ -76,7 +76,7 @@ func deleteCell(t *testing.T) {
|
|||
// Delete cell2 info from topo
|
||||
res, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("DeleteCellInfo", "--", "--force", cell2)
|
||||
t.Log(res)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// update clusterInstance to remaining vttablets and shards
|
||||
shard1.Vttablets = []*cluster.Vttablet{shard1Primary}
|
||||
|
@ -85,7 +85,7 @@ func deleteCell(t *testing.T) {
|
|||
|
||||
// Now list all tablets
|
||||
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets")
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
tablets := getAllTablets()
|
||||
tabletsFromCMD := strings.Split(result, "\n")
|
||||
|
@ -112,7 +112,7 @@ func deleteTablet(t *testing.T, tablet *cluster.Vttablet) {
|
|||
wg.Wait()
|
||||
|
||||
err := clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", tablet.Alias)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func addCellback(t *testing.T) {
|
||||
|
@ -120,7 +120,7 @@ func addCellback(t *testing.T) {
|
|||
clusterInstance.VtctlProcess.TopoRootPath = "/org1/obj1/"
|
||||
|
||||
err := clusterInstance.VtctlProcess.AddCellInfo(cell2)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// create new vttablets
|
||||
shard1Replica = clusterInstance.NewVttabletInstance("replica", 0, cell2)
|
||||
|
@ -154,29 +154,24 @@ func addCellback(t *testing.T) {
|
|||
)
|
||||
tablet.VttabletProcess.SupportsBackup = true
|
||||
proc, err := tablet.MysqlctlProcess.StartProcess()
|
||||
if err != nil {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
mysqlProcs = append(mysqlProcs, proc)
|
||||
}
|
||||
for _, proc := range mysqlProcs {
|
||||
if err := proc.Wait(); err != nil {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
err := proc.Wait()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, tablet := range []*cluster.Vttablet{shard1Replica, shard1Rdonly} {
|
||||
tablet.VttabletProcess.Shard = shard1.Name
|
||||
if err := tablet.VttabletProcess.Setup(); err != nil {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
err := tablet.VttabletProcess.Setup()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, tablet := range []*cluster.Vttablet{shard2Replica, shard2Rdonly} {
|
||||
tablet.VttabletProcess.Shard = shard2.Name
|
||||
if err := tablet.VttabletProcess.Setup(); err != nil {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
err := tablet.VttabletProcess.Setup()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
shard1.Vttablets = append(shard1.Vttablets, shard1Replica)
|
||||
|
@ -185,7 +180,7 @@ func addCellback(t *testing.T) {
|
|||
shard2.Vttablets = append(shard2.Vttablets, shard1Rdonly)
|
||||
|
||||
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets")
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
tablets := getAllTablets()
|
||||
tabletsFromCMD := strings.Split(result, "\n")
|
||||
|
|
|
@ -23,8 +23,6 @@ import (
|
|||
"path"
|
||||
"time"
|
||||
|
||||
"vitess.io/vitess/go/vt/proto/vschema"
|
||||
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
"vitess.io/vitess/go/vt/dbconfigs"
|
||||
"vitess.io/vitess/go/vt/grpcclient"
|
||||
|
@ -50,6 +48,7 @@ import (
|
|||
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
|
||||
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
|
||||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
|
||||
vttestpb "vitess.io/vitess/go/vt/proto/vttest"
|
||||
)
|
||||
|
@ -148,7 +147,7 @@ func CreateTablet(
|
|||
func InitRoutingRules(
|
||||
ctx context.Context,
|
||||
ts *topo.Server,
|
||||
rr *vschema.RoutingRules,
|
||||
rr *vschemapb.RoutingRules,
|
||||
) error {
|
||||
if rr == nil {
|
||||
return nil
|
||||
|
@ -899,7 +898,7 @@ func (itmc *internalTabletManagerClient) PromoteReplica(context.Context, *topoda
|
|||
return "", fmt.Errorf("not implemented in vtcombo")
|
||||
}
|
||||
|
||||
func (itmc *internalTabletManagerClient) Backup(context.Context, *topodatapb.Tablet, int, bool) (logutil.EventStream, error) {
|
||||
func (itmc *internalTabletManagerClient) Backup(context.Context, *topodatapb.Tablet, *tabletmanagerdatapb.BackupRequest) (logutil.EventStream, error) {
|
||||
return nil, fmt.Errorf("not implemented in vtcombo")
|
||||
}
|
||||
|
||||
|
|
|
@ -333,7 +333,7 @@ func (s *VtctldServer) Backup(req *vtctldatapb.BackupRequest, stream vtctlservic
|
|||
span.Annotate("keyspace", ti.Keyspace)
|
||||
span.Annotate("shard", ti.Shard)
|
||||
|
||||
return s.backupTablet(ctx, ti.Tablet, int(req.Concurrency), req.AllowPrimary, stream)
|
||||
return s.backupTablet(ctx, ti.Tablet, req, stream)
|
||||
}
|
||||
|
||||
// BackupShard is part of the vtctlservicepb.VtctldServer interface.
|
||||
|
@ -386,13 +386,15 @@ func (s *VtctldServer) BackupShard(req *vtctldatapb.BackupShardRequest, stream v
|
|||
|
||||
span.Annotate("tablet_alias", topoproto.TabletAliasString(backupTablet.Alias))
|
||||
|
||||
return s.backupTablet(ctx, backupTablet, int(req.Concurrency), req.AllowPrimary, stream)
|
||||
r := &vtctldatapb.BackupRequest{Concurrency: req.Concurrency, AllowPrimary: req.AllowPrimary}
|
||||
return s.backupTablet(ctx, backupTablet, r, stream)
|
||||
}
|
||||
|
||||
func (s *VtctldServer) backupTablet(ctx context.Context, tablet *topodatapb.Tablet, concurrency int, allowPrimary bool, stream interface {
|
||||
func (s *VtctldServer) backupTablet(ctx context.Context, tablet *topodatapb.Tablet, req *vtctldatapb.BackupRequest, stream interface {
|
||||
Send(resp *vtctldatapb.BackupResponse) error
|
||||
}) error {
|
||||
logStream, err := s.tmc.Backup(ctx, tablet, concurrency, allowPrimary)
|
||||
r := &tabletmanagerdatapb.BackupRequest{Concurrency: int64(req.Concurrency), AllowPrimary: req.AllowPrimary}
|
||||
logStream, err := s.tmc.Backup(ctx, tablet, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -329,8 +329,8 @@ func (stream *backupStreamAdapter) Send(msg *logutilpb.Event) error {
|
|||
}
|
||||
|
||||
// Backup is part of the tmclient.TabletManagerClient interface.
|
||||
func (fake *TabletManagerClient) Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int, allowPrimary bool) (logutil.EventStream, error) {
|
||||
if tablet.Type == topodatapb.TabletType_PRIMARY && !allowPrimary {
|
||||
func (fake *TabletManagerClient) Backup(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.BackupRequest) (logutil.EventStream, error) {
|
||||
if tablet.Type == topodatapb.TabletType_PRIMARY && !req.AllowPrimary {
|
||||
return nil, fmt.Errorf("cannot backup primary with allowPrimary=false")
|
||||
}
|
||||
|
||||
|
|
|
@ -325,7 +325,7 @@ func (e *eofEventStream) Recv() (*logutilpb.Event, error) {
|
|||
}
|
||||
|
||||
// Backup is part of the tmclient.TabletManagerClient interface.
|
||||
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int, allowPrimary bool) (logutil.EventStream, error) {
|
||||
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.BackupRequest) (logutil.EventStream, error) {
|
||||
return &eofEventStream{}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -904,16 +904,13 @@ func (e *backupStreamAdapter) Recv() (*logutilpb.Event, error) {
|
|||
}
|
||||
|
||||
// Backup is part of the tmclient.TabletManagerClient interface.
|
||||
func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int, allowPrimary bool) (logutil.EventStream, error) {
|
||||
func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.BackupRequest) (logutil.EventStream, error) {
|
||||
c, closer, err := client.dialer.dial(ctx, tablet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stream, err := c.Backup(ctx, &tabletmanagerdatapb.BackupRequest{
|
||||
Concurrency: int64(concurrency),
|
||||
AllowPrimary: allowPrimary,
|
||||
})
|
||||
stream, err := c.Backup(ctx, req)
|
||||
if err != nil {
|
||||
closer.Close()
|
||||
return nil, err
|
||||
|
|
|
@ -494,7 +494,7 @@ func (s *server) Backup(request *tabletmanagerdatapb.BackupRequest, stream table
|
|||
})
|
||||
})
|
||||
|
||||
return s.tm.Backup(ctx, int(request.Concurrency), logger, request.AllowPrimary)
|
||||
return s.tm.Backup(ctx, logger, request)
|
||||
}
|
||||
|
||||
func (s *server) RestoreFromBackup(request *tabletmanagerdatapb.RestoreFromBackupRequest, stream tabletmanagerservicepb.TabletManager_RestoreFromBackupServer) (err error) {
|
||||
|
|
|
@ -135,7 +135,7 @@ type RPCTM interface {
|
|||
|
||||
// Backup / restore related methods
|
||||
|
||||
Backup(ctx context.Context, concurrency int, logger logutil.Logger, allowPrimary bool) error
|
||||
Backup(ctx context.Context, logger logutil.Logger, request *tabletmanagerdatapb.BackupRequest) error
|
||||
|
||||
RestoreFromBackup(ctx context.Context, logger logutil.Logger, backupTime time.Time) error
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"vitess.io/vitess/go/vt/topo/topoproto"
|
||||
"vitess.io/vitess/go/vt/vterrors"
|
||||
|
||||
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
|
@ -36,7 +37,7 @@ const (
|
|||
)
|
||||
|
||||
// Backup takes a db backup and sends it to the BackupStorage
|
||||
func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger logutil.Logger, allowPrimary bool) error {
|
||||
func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req *tabletmanagerdatapb.BackupRequest) error {
|
||||
if tm.Cnf == nil {
|
||||
return fmt.Errorf("cannot perform backup without my.cnf, please restart vttablet with a my.cnf file specified")
|
||||
}
|
||||
|
@ -46,7 +47,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log
|
|||
// but the process didn't find out about this.
|
||||
// It is not safe to take backups from tablet in this state
|
||||
currentTablet := tm.Tablet()
|
||||
if !allowPrimary && currentTablet.Type == topodatapb.TabletType_PRIMARY {
|
||||
if !req.AllowPrimary && currentTablet.Type == topodatapb.TabletType_PRIMARY {
|
||||
return fmt.Errorf("type PRIMARY cannot take backup. if you really need to do this, rerun the backup command with --allow_primary")
|
||||
}
|
||||
engine, err := mysqlctl.GetBackupEngine()
|
||||
|
@ -58,7 +59,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !allowPrimary && tablet.Type == topodatapb.TabletType_PRIMARY {
|
||||
if !req.AllowPrimary && tablet.Type == topodatapb.TabletType_PRIMARY {
|
||||
return fmt.Errorf("type PRIMARY cannot take backup. if you really need to do this, rerun the backup command with --allow_primary")
|
||||
}
|
||||
|
||||
|
@ -107,7 +108,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log
|
|||
Cnf: tm.Cnf,
|
||||
Mysqld: tm.MysqlDaemon,
|
||||
Logger: l,
|
||||
Concurrency: concurrency,
|
||||
Concurrency: int(req.Concurrency),
|
||||
HookExtraEnv: tm.hookExtraEnv(),
|
||||
TopoServer: tm.TopoServer,
|
||||
Keyspace: tablet.Keyspace,
|
||||
|
|
|
@ -206,7 +206,7 @@ type TabletManagerClient interface {
|
|||
//
|
||||
|
||||
// Backup creates a database backup
|
||||
Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int, allowPrimary bool) (logutil.EventStream, error)
|
||||
Backup(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.BackupRequest) (logutil.EventStream, error)
|
||||
|
||||
// RestoreFromBackup deletes local data and restores database from backup
|
||||
RestoreFromBackup(ctx context.Context, tablet *topodatapb.Tablet, backupTime time.Time) (logutil.EventStream, error)
|
||||
|
|
|
@ -1191,19 +1191,20 @@ var testBackupAllowPrimary = false
|
|||
var testBackupCalled = false
|
||||
var testRestoreFromBackupCalled = false
|
||||
|
||||
func (fra *fakeRPCTM) Backup(ctx context.Context, concurrency int, logger logutil.Logger, allowPrimary bool) error {
|
||||
func (fra *fakeRPCTM) Backup(ctx context.Context, logger logutil.Logger, request *tabletmanagerdatapb.BackupRequest) error {
|
||||
if fra.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
compare(fra.t, "Backup args", concurrency, testBackupConcurrency)
|
||||
compare(fra.t, "Backup args", allowPrimary, testBackupAllowPrimary)
|
||||
compare(fra.t, "Backup args", request.Concurrency, testBackupConcurrency)
|
||||
compare(fra.t, "Backup args", request.AllowPrimary, testBackupAllowPrimary)
|
||||
logStuff(logger, 10)
|
||||
testBackupCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func tmRPCTestBackup(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
|
||||
stream, err := client.Backup(ctx, tablet, testBackupConcurrency, testBackupAllowPrimary)
|
||||
req := &tabletmanagerdatapb.BackupRequest{Concurrency: int64(testBackupConcurrency), AllowPrimary: testBackupAllowPrimary}
|
||||
stream, err := client.Backup(ctx, tablet, req)
|
||||
if err != nil {
|
||||
t.Fatalf("Backup failed: %v", err)
|
||||
}
|
||||
|
@ -1212,7 +1213,8 @@ func tmRPCTestBackup(ctx context.Context, t *testing.T, client tmclient.TabletMa
|
|||
}
|
||||
|
||||
func tmRPCTestBackupPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
|
||||
stream, err := client.Backup(ctx, tablet, testBackupConcurrency, testBackupAllowPrimary)
|
||||
req := &tabletmanagerdatapb.BackupRequest{Concurrency: int64(testBackupConcurrency), AllowPrimary: testBackupAllowPrimary}
|
||||
stream, err := client.Backup(ctx, tablet, req)
|
||||
if err != nil {
|
||||
t.Fatalf("Backup failed: %v", err)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче