Apply `*topo.RemoteOperationTimeout` to all remote ops during validation

Signed-off-by: Andrew Mason <amason@slack-corp.com>
This commit is contained in:
Andrew Mason 2021-09-23 06:45:43 -04:00
Родитель 0362acc7d8
Коммит 4befb1a347
1 изменённых файлов: 43 добавлений и 10 удалений

Просмотреть файл

@ -2273,9 +2273,11 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
span.Annotate("ping_tablets", req.PingTablets)
var resp vtctldatapb.ValidateResponse
resp := vtctldatapb.ValidateResponse{}
getKeyspacesCtx, getKeyspacesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer getKeyspacesCancel()
keyspaces, err := s.ts.GetKeyspaces(ctx)
keyspaces, err := s.ts.GetKeyspaces(getKeyspacesCtx)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("GetKeyspaces failed: %v", err))
return &resp, nil
@ -2295,7 +2297,10 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
cellSet := sets.NewString()
for _, keyspace := range keyspaces {
shards, err := s.ts.GetShardNames(ctx, keyspace)
getShardNamesCtx, getShardNamesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
shards, err := s.ts.GetShardNames(getShardNamesCtx, keyspace)
getShardNamesCancel() // don't defer in a loop
if err != nil {
m.Lock()
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%v) failed: %v", keyspace, err))
@ -2304,7 +2309,10 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
}
for _, shard := range shards {
aliases, err := s.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard)
findAllTabletAliasesCtx, findAllTabletAliasesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
aliases, err := s.ts.FindAllTabletAliasesInShard(findAllTabletAliasesCtx, keyspace, shard)
findAllTabletAliasesCancel() // don't defer in a loop
if err != nil {
m.Lock()
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.FindAllTabletAliasesInShard(%v/%v) failed: %v", keyspace, shard, err))
@ -2319,7 +2327,10 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
}
for _, cell := range cellSet.List() {
aliases, err := s.ts.GetTabletsByCell(ctx, cell)
getTabletsByCellCtx, getTabletsByCellCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
aliases, err := s.ts.GetTabletsByCell(getTabletsByCellCtx, cell)
getTabletsByCellCancel() // don't defer in a loop
if err != nil {
m.Lock()
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetTabletsByCell(%v) failed: %v", cell, err))
@ -2338,6 +2349,9 @@ func (s *VtctldServer) Validate(ctx context.Context, req *vtctldatapb.ValidateRe
key := topoproto.TabletAliasString(alias)
span.Annotate("tablet_alias", key)
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
if err := topo.Validate(ctx, s.ts, alias); err != nil {
m.Lock()
defer m.Unlock()
@ -2392,9 +2406,11 @@ func (s *VtctldServer) ValidateKeyspace(ctx context.Context, req *vtctldatapb.Va
span.Annotate("keyspace", req.Keyspace)
span.Annotate("ping_tablets", req.PingTablets)
var resp vtctldatapb.ValidateKeyspaceResponse
resp := vtctldatapb.ValidateKeyspaceResponse{}
getShardNamesCtx, getShardNamesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer getShardNamesCancel()
shards, err := s.ts.GetShardNames(ctx, req.Keyspace)
shards, err := s.ts.GetShardNames(getShardNamesCtx, req.Keyspace)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err))
return &resp, nil
@ -2442,20 +2458,27 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
span.Annotate("ping_tablets", req.PingTablets)
resp := vtctldatapb.ValidateShardResponse{}
getShardCtx, getShardCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer getShardCancel()
si, err := s.ts.GetShard(ctx, req.Keyspace, req.Shard)
si, err := s.ts.GetShard(getShardCtx, req.Keyspace, req.Shard)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShard(%v, %v) failed: %v", req.Keyspace, req.Shard, err))
return &resp, nil
}
aliases, err := s.ts.FindAllTabletAliasesInShard(ctx, req.Keyspace, req.Shard)
findAllTabletAliasesCtx, findAllTabletAliasesCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer findAllTabletAliasesCancel()
aliases, err := s.ts.FindAllTabletAliasesInShard(findAllTabletAliasesCtx, req.Keyspace, req.Shard)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.FindAllTabletAliasesInShard(%v, %v) failed: %v", req.Keyspace, req.Shard, err))
return &resp, nil
}
tabletMap, _ := s.ts.GetTabletMap(ctx, aliases)
getTabletMapCtx, getTabletMapCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer getTabletMapCancel()
tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases)
var primaryAlias *topodatapb.TabletAlias
for _, alias := range aliases {
@ -2491,6 +2514,10 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
wg.Add(1)
go func(alias *topodatapb.TabletAlias) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
if err := topo.Validate(ctx, s.ts, alias); err != nil {
results <- fmt.Sprintf("topo.Validate(%v) failed: %v", topoproto.TabletAliasString(alias), err)
return
@ -2514,6 +2541,9 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
return
}
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
replicaList, err := s.tmc.GetReplicas(ctx, primaryTabletInfo.Tablet)
if err != nil {
results <- fmt.Sprintf("GetReplicas(%v) failed: %v", primaryTabletInfo, err)
@ -2569,6 +2599,9 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid
go func(alias string, ti *topo.TabletInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
if err := s.tmc.Ping(ctx, ti.Tablet); err != nil {
results <- fmt.Sprintf("Ping(%v) failed: %v tablet hostname: %v", alias, err, ti.Hostname)
}