Merge pull request #959 from michael-berlin/auto_threadsafe

automation: Fix data race.
This commit is contained in:
Michael Berlin 2015-08-05 14:58:18 -07:00
Родитель 813e4f1de9 e419ae9669
Коммит f866d3c29a
5 изменённых файлов: 188 добавлений и 126 удалений

Просмотреть файл

@ -5,19 +5,21 @@
package automation
import (
"github.com/golang/protobuf/proto"
pb "github.com/youtube/vitess/go/vt/proto/automation"
)
// ClusterOperationInstance is a runtime type which enhances the protobuf message "ClusterOperation" with runtime specific data.
// Unlike the protobuf message, the additional runtime data will not be part of a checkpoint.
// Methods of this struct are not thread-safe.
type ClusterOperationInstance struct {
pb.ClusterOperation
taskIDGenerator *IDGenerator
}
// NewClusterOperationInstance creates a new cluster operation instance with one initial task.
func NewClusterOperationInstance(clusterOpID string, initialTask *pb.TaskContainer, taskIDGenerator *IDGenerator) *ClusterOperationInstance {
c := &ClusterOperationInstance{
func NewClusterOperationInstance(clusterOpID string, initialTask *pb.TaskContainer, taskIDGenerator *IDGenerator) ClusterOperationInstance {
c := ClusterOperationInstance{
pb.ClusterOperation{
Id: clusterOpID,
SerialTasks: []*pb.TaskContainer{},
@ -29,20 +31,9 @@ func NewClusterOperationInstance(clusterOpID string, initialTask *pb.TaskContain
return c
}
// addMissingTaskID assigns a task id to each task in "tc".
func (c *ClusterOperationInstance) addMissingTaskID(tc []*pb.TaskContainer) {
for _, taskContainer := range tc {
for _, task := range taskContainer.ParallelTasks {
if task.Id == "" {
task.Id = c.taskIDGenerator.GetNextID()
}
}
}
}
// InsertTaskContainers inserts "newTaskContainers" at pos in the current list of task containers. Existing task containers will be moved after the new task containers.
func (c *ClusterOperationInstance) InsertTaskContainers(newTaskContainers []*pb.TaskContainer, pos int) {
c.addMissingTaskID(newTaskContainers)
AddMissingTaskID(newTaskContainers, c.taskIDGenerator)
newSerialTasks := make([]*pb.TaskContainer, len(c.SerialTasks)+len(newTaskContainers))
copy(newSerialTasks, c.SerialTasks[:pos])
@ -50,3 +41,11 @@ func (c *ClusterOperationInstance) InsertTaskContainers(newTaskContainers []*pb.
copy(newSerialTasks[pos+len(newTaskContainers):], c.SerialTasks[pos:])
c.SerialTasks = newSerialTasks
}
// Clone creates a deep copy of the inner protobuf.
// Other elements e.g. taskIDGenerator are not deep-copied.
func (c ClusterOperationInstance) Clone() ClusterOperationInstance {
var clone = c
clone.ClusterOperation = *(proto.Clone(&c.ClusterOperation).(*pb.ClusterOperation))
return clone
}

Просмотреть файл

@ -10,6 +10,7 @@ Package automation contains code to execute high-level cluster operations
package automation
import (
"errors"
"fmt"
"sync"
@ -37,7 +38,7 @@ type Scheduler struct {
// Guarded by "mu".
registeredClusterOperations map[string]bool
// Guarded by "mu".
toBeScheduledClusterOperations chan *ClusterOperationInstance
toBeScheduledClusterOperations chan ClusterOperationInstance
// Guarded by "mu".
state schedulerState
@ -49,9 +50,13 @@ type Scheduler struct {
muOpList sync.Mutex
// Guarded by "muOpList".
activeClusterOperations map[string]*ClusterOperationInstance
// The key of the map is ClusterOperationInstance.ID.
// This map contains a copy of the ClusterOperationInstance which is currently processed.
// The scheduler may update the copy with the latest status.
activeClusterOperations map[string]ClusterOperationInstance
// Guarded by "muOpList".
finishedClusterOperations map[string]*ClusterOperationInstance
// The key of the map is ClusterOperationInstance.ID.
finishedClusterOperations map[string]ClusterOperationInstance
}
// NewScheduler creates a new instance.
@ -63,12 +68,12 @@ func NewScheduler() (*Scheduler, error) {
s := &Scheduler{
registeredClusterOperations: defaultClusterOperations,
idGenerator: IDGenerator{},
toBeScheduledClusterOperations: make(chan *ClusterOperationInstance, 10),
toBeScheduledClusterOperations: make(chan ClusterOperationInstance, 10),
state: stateNotRunning,
taskCreator: defaultTaskCreator,
pendingOpsWg: &sync.WaitGroup{},
activeClusterOperations: make(map[string]*ClusterOperationInstance),
finishedClusterOperations: make(map[string]*ClusterOperationInstance),
activeClusterOperations: make(map[string]ClusterOperationInstance),
finishedClusterOperations: make(map[string]ClusterOperationInstance),
}
return s, nil
@ -106,7 +111,7 @@ func (s *Scheduler) processRequestsLoop() {
log.Infof("Stopped processing loop for ClusterOperations.")
}
func (s *Scheduler) processClusterOperation(clusterOp *ClusterOperationInstance) {
func (s *Scheduler) processClusterOperation(clusterOp ClusterOperationInstance) {
if clusterOp.State == pb.ClusterOperationState_CLUSTER_OPERATION_DONE {
log.Infof("ClusterOperation: %v skipping because it is already done. Details: %v", clusterOp.Id, clusterOp)
return
@ -114,77 +119,86 @@ func (s *Scheduler) processClusterOperation(clusterOp *ClusterOperationInstance)
log.Infof("ClusterOperation: %v running. Details: %v", clusterOp.Id, clusterOp)
var lastTaskError string
clusterOpLoop:
for i := 0; i < len(clusterOp.SerialTasks); i++ {
taskContainer := clusterOp.SerialTasks[i]
for _, taskProto := range taskContainer.ParallelTasks {
if taskProto.State == pb.TaskState_DONE {
if taskProto.Error != "" {
log.Errorf("Task: %v (%v/%v) failed before. Aborting the ClusterOperation. Error: %v Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, taskProto.Error, taskProto)
lastTaskError = taskProto.Error
break
} else {
log.Infof("Task: %v (%v/%v) skipped because it is already done. Full Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, taskProto)
}
}
task, err := s.createTaskInstance(taskProto.Name)
newTaskContainers, output, err := s.runTask(taskProto, clusterOp.Id)
if err != nil {
log.Errorf("Task: %v (%v/%v) could not be instantiated. Error: %v Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, err, taskProto)
MarkTaskFailed(taskProto, "", err)
lastTaskError = err.Error()
break
MarkTaskFailed(taskProto, output, err)
clusterOp.Error = err.Error()
break clusterOpLoop
} else {
MarkTaskSucceeded(taskProto, output)
}
taskProto.State = pb.TaskState_RUNNING
log.Infof("Task: %v (%v/%v) running. Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, taskProto)
newTaskContainers, output, errRun := task.Run(taskProto.Parameters)
log.Infof("Task: %v (%v/%v) finished. newTaskContainers: %v, output: %v, error: %v", taskProto.Name, clusterOp.Id, taskProto.Id, newTaskContainers, output, errRun)
if errRun != nil {
MarkTaskFailed(taskProto, output, errRun)
lastTaskError = errRun.Error()
break
}
MarkTaskSucceeded(taskProto, output)
if newTaskContainers != nil {
// Make sure all new tasks do not miss any required parameters.
for _, newTaskContainer := range newTaskContainers {
for _, newTaskProto := range newTaskContainer.ParallelTasks {
err := s.validateTaskSpecification(newTaskProto.Name, newTaskProto.Parameters)
if err != nil {
log.Errorf("Task: %v (%v/%v) emitted a new task which is not valid. Error: %v Details: %v", taskProto.Name, clusterOp.Id, taskProto.Id, err, newTaskProto)
MarkTaskFailed(taskProto, output, err)
lastTaskError = err.Error()
break
}
}
err := s.validateTaskContainers(newTaskContainers)
if err != nil {
log.Errorf("Task: %v (%v/%v) emitted a new task which is not valid. Error: %v", taskProto.Name, clusterOp.Id, taskProto.Id, err)
MarkTaskFailed(taskProto, output, err)
clusterOp.Error = err.Error()
break clusterOpLoop
}
if lastTaskError == "" {
clusterOp.InsertTaskContainers(newTaskContainers, i+1)
log.Infof("ClusterOperation: %v %d new task containers added by %v (%v/%v). Updated ClusterOperation: %v",
clusterOp.Id, len(newTaskContainers), taskProto.Name, clusterOp.Id, taskProto.Id, clusterOp)
}
clusterOp.InsertTaskContainers(newTaskContainers, i+1)
log.Infof("ClusterOperation: %v %d new task containers added by %v (%v/%v). Updated ClusterOperation: %v",
clusterOp.Id, len(newTaskContainers), taskProto.Name, clusterOp.Id, taskProto.Id, clusterOp)
}
s.Checkpoint(clusterOp)
}
}
clusterOp.State = pb.ClusterOperationState_CLUSTER_OPERATION_DONE
if lastTaskError != "" {
clusterOp.Error = lastTaskError
}
log.Infof("ClusterOperation: %v finished. Details: %v", clusterOp.Id, clusterOp)
s.Checkpoint(clusterOp)
// Move operation from active to finished.
s.muOpList.Lock()
if s.activeClusterOperations[clusterOp.Id] != clusterOp {
defer s.muOpList.Unlock()
if _, ok := s.activeClusterOperations[clusterOp.Id]; !ok {
panic("Pending ClusterOperation was not recorded as active, but should have.")
}
delete(s.activeClusterOperations, clusterOp.Id)
s.finishedClusterOperations[clusterOp.Id] = clusterOp
s.muOpList.Unlock()
}
func (s *Scheduler) runTask(taskProto *pb.Task, clusterOpID string) ([]*pb.TaskContainer, string, error) {
if taskProto.State == pb.TaskState_DONE {
// Task is already done (e.g. because we resume from a checkpoint).
if taskProto.Error != "" {
log.Errorf("Task: %v (%v/%v) failed before. Aborting the ClusterOperation. Error: %v Details: %v", taskProto.Name, clusterOpID, taskProto.Id, taskProto.Error, taskProto)
return nil, "", errors.New(taskProto.Error)
}
log.Infof("Task: %v (%v/%v) skipped because it is already done. Full Details: %v", taskProto.Name, clusterOpID, taskProto.Id, taskProto)
return nil, taskProto.Output, nil
}
task, err := s.createTaskInstance(taskProto.Name)
if err != nil {
log.Errorf("Task: %v (%v/%v) could not be instantiated. Error: %v Details: %v", taskProto.Name, clusterOpID, taskProto.Id, err, taskProto)
return nil, "", err
}
taskProto.State = pb.TaskState_RUNNING
log.Infof("Task: %v (%v/%v) running. Details: %v", taskProto.Name, clusterOpID, taskProto.Id, taskProto)
newTaskContainers, output, err := task.Run(taskProto.Parameters)
log.Infof("Task: %v (%v/%v) finished. newTaskContainers: %v, output: %v, error: %v", taskProto.Name, clusterOpID, taskProto.Id, newTaskContainers, output, err)
return newTaskContainers, output, err
}
func (s *Scheduler) validateTaskContainers(newTaskContainers []*pb.TaskContainer) error {
for _, newTaskContainer := range newTaskContainers {
for _, newTaskProto := range newTaskContainer.ParallelTasks {
err := s.validateTaskSpecification(newTaskProto.Name, newTaskProto.Parameters)
if err != nil {
return fmt.Errorf("Error: %v Task: %v", err, newTaskProto)
}
}
}
return nil
}
func defaultTaskCreator(taskName string) Task {
@ -267,9 +281,9 @@ func (s *Scheduler) EnqueueClusterOperation(ctx context.Context, req *pb.Enqueue
clusterOp := NewClusterOperationInstance(clusterOpID, initialTask, &taskIDGenerator)
s.muOpList.Lock()
s.toBeScheduledClusterOperations <- clusterOp
s.activeClusterOperations[clusterOpID] = clusterOp
s.muOpList.Unlock()
s.toBeScheduledClusterOperations <- clusterOp
return &pb.EnqueueClusterOperationResponse{
Id: clusterOp.Id,
@ -277,9 +291,9 @@ func (s *Scheduler) EnqueueClusterOperation(ctx context.Context, req *pb.Enqueue
}
// findClusterOp checks for a given ClusterOperation ID if it's in the list of active or finished operations.
func (s *Scheduler) findClusterOp(id string) (*ClusterOperationInstance, error) {
func (s *Scheduler) findClusterOp(id string) (ClusterOperationInstance, error) {
var ok bool
var clusterOp *ClusterOperationInstance
var clusterOp ClusterOperationInstance
s.muOpList.Lock()
defer s.muOpList.Unlock()
@ -288,9 +302,18 @@ func (s *Scheduler) findClusterOp(id string) (*ClusterOperationInstance, error)
clusterOp, ok = s.finishedClusterOperations[id]
}
if !ok {
return nil, fmt.Errorf("ClusterOperation with id: %v not found", id)
return clusterOp, fmt.Errorf("ClusterOperation with id: %v not found", id)
}
return clusterOp, nil
return clusterOp.Clone(), nil
}
// Checkpoint should be called every time the state of the cluster op changes.
// It is used to update the copy of the state in activeClusterOperations.
func (s *Scheduler) Checkpoint(clusterOp ClusterOperationInstance) {
// TODO(mberlin): Add here support for persistent checkpoints.
s.muOpList.Lock()
defer s.muOpList.Unlock()
s.activeClusterOperations[clusterOp.Id] = clusterOp.Clone()
}
// GetClusterOperationDetails can be used to query the full details of active or finished operations.

Просмотреть файл

@ -5,6 +5,7 @@
package automation
import (
"strings"
"testing"
"time"
@ -25,6 +26,30 @@ func newTestScheduler(t *testing.T) *Scheduler {
return scheduler
}
func enqueueClusterOperationAndCheckOutput(t *testing.T, taskName string, expectedOutput string, expectedError string) *pb.ClusterOperation {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()
scheduler.registerClusterOperation("TestingEchoTask")
scheduler.registerClusterOperation("TestingFailTask")
scheduler.registerClusterOperation("TestingEmitEchoTask")
scheduler.registerClusterOperation("TestingEmitEchoFailEchoTask")
scheduler.Run()
enqueueRequest := &pb.EnqueueClusterOperationRequest{
Name: taskName,
Parameters: map[string]string{
"echo_text": expectedOutput,
},
}
enqueueResponse, err := scheduler.EnqueueClusterOperation(context.TODO(), enqueueRequest)
if err != nil {
t.Fatalf("Failed to start cluster operation. Request: %v Error: %v", enqueueRequest, err)
}
return waitForClusterOperation(t, scheduler, enqueueResponse.Id, expectedOutput, expectedError)
}
// waitForClusterOperation is a helper function which blocks until the Cluster Operation has finished.
func waitForClusterOperation(t *testing.T, scheduler *Scheduler, id string, expectedOutputLastTask string, expectedErrorLastTask string) *pb.ClusterOperation {
if expectedOutputLastTask == "" && expectedErrorLastTask == "" {
@ -42,15 +67,22 @@ func waitForClusterOperation(t *testing.T, scheduler *Scheduler, id string, expe
}
if getDetailsResponse.ClusterOp.State == pb.ClusterOperationState_CLUSTER_OPERATION_DONE {
tc := getDetailsResponse.ClusterOp.SerialTasks
lastTc := tc[len(tc)-1]
// Check the last task which have finished. (It may not be the last one because tasks can fail.)
var lastTc *pb.TaskContainer
for i := len(tc) - 1; i >= 0; i-- {
if tc[i].ParallelTasks[len(tc[i].ParallelTasks)-1].State == pb.TaskState_DONE {
lastTc = tc[i]
break
}
}
if expectedOutputLastTask != "" {
if lastTc.ParallelTasks[len(lastTc.ParallelTasks)-1].Output != expectedOutputLastTask {
t.Fatalf("ClusterOperation finished but did not return expected output. want: %v Full ClusterOperation details: %v", expectedOutputLastTask, proto.MarshalTextString(getDetailsResponse.ClusterOp))
}
}
if expectedErrorLastTask != "" {
if lastTc.ParallelTasks[len(lastTc.ParallelTasks)-1].Error != expectedErrorLastTask {
t.Fatalf("ClusterOperation finished but did not return expected error. Full ClusterOperation details: %v", getDetailsResponse.ClusterOp)
if lastError := lastTc.ParallelTasks[len(lastTc.ParallelTasks)-1].Error; !strings.Contains(lastError, expectedErrorLastTask) {
t.Fatalf("ClusterOperation finished last error does not contain expected error. got: '%v' want: '%v' Full ClusterOperation details: %v", lastError, expectedErrorLastTask, getDetailsResponse.ClusterOp)
}
}
return getDetailsResponse.ClusterOp
@ -70,34 +102,29 @@ func TestSchedulerImmediateShutdown(t *testing.T) {
scheduler.ShutdownAndWait()
}
func enqueueClusterOperationAndCheckOutput(t *testing.T, taskName string, expectedOutput string) {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()
scheduler.registerClusterOperation("TestingEchoTask")
scheduler.registerClusterOperation("TestingEmitEchoTask")
scheduler.Run()
enqueueRequest := &pb.EnqueueClusterOperationRequest{
Name: taskName,
Parameters: map[string]string{
"echo_text": expectedOutput,
},
}
enqueueResponse, err := scheduler.EnqueueClusterOperation(context.TODO(), enqueueRequest)
if err != nil {
t.Fatalf("Failed to start cluster operation. Request: %v Error: %v", enqueueRequest, err)
}
waitForClusterOperation(t, scheduler, enqueueResponse.Id, expectedOutput, "")
}
func TestEnqueueSingleTask(t *testing.T) {
enqueueClusterOperationAndCheckOutput(t, "TestingEchoTask", "echoed text")
enqueueClusterOperationAndCheckOutput(t, "TestingEchoTask", "echoed text", "")
}
func TestEnqueueEmittingTask(t *testing.T) {
enqueueClusterOperationAndCheckOutput(t, "TestingEmitEchoTask", "echoed text from emitted task")
enqueueClusterOperationAndCheckOutput(t, "TestingEmitEchoTask", "echoed text from emitted task", "")
}
func TestFailedTaskFailsClusterOperation(t *testing.T) {
enqueueClusterOperationAndCheckOutput(t, "TestingFailTask", "something went wrong", "full error message")
}
func TestFailedTaskFailsWholeClusterOperationEarly(t *testing.T) {
// If a task fails in the middle of a cluster operation, the remaining tasks must not be executed.
details := enqueueClusterOperationAndCheckOutput(t, "TestingEmitEchoFailEchoTask", "", "full error message")
got := details.SerialTasks[2].ParallelTasks[0].Error
want := "full error message"
if got != want {
t.Errorf("TestFailedTaskFailsWholeClusterOperationEarly: got error: '%v' want error: '%v'", got, want)
}
if details.SerialTasks[3].ParallelTasks[0].State != pb.TaskState_NOT_STARTED {
t.Errorf("TestFailedTaskFailsWholeClusterOperationEarly: Task after a failing task must not have been started.")
}
}
func TestEnqueueFailsDueToMissingParameter(t *testing.T) {
@ -124,24 +151,6 @@ func TestEnqueueFailsDueToMissingParameter(t *testing.T) {
}
}
func TestFailedTaskFailsClusterOperation(t *testing.T) {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()
scheduler.registerClusterOperation("TestingFailTask")
scheduler.Run()
enqueueRequest := &pb.EnqueueClusterOperationRequest{
Name: "TestingFailTask",
}
enqueueResponse, err := scheduler.EnqueueClusterOperation(context.TODO(), enqueueRequest)
if err != nil {
t.Fatalf("Failed to start cluster operation. Request: %v Error: %v", enqueueRequest, err)
}
waitForClusterOperation(t, scheduler, enqueueResponse.Id, "something went wrong", "full error message")
}
func TestEnqueueFailsDueToUnregisteredClusterOperation(t *testing.T) {
scheduler := newTestScheduler(t)
defer scheduler.ShutdownAndWait()

Просмотреть файл

@ -30,3 +30,14 @@ func NewTaskContainer() *pb.TaskContainer {
func AddTask(t *pb.TaskContainer, taskName string, parameters map[string]string) {
t.ParallelTasks = append(t.ParallelTasks, NewTask(taskName, parameters))
}
// AddMissingTaskID assigns a task id to each task in "tc".
func AddMissingTaskID(tc []*pb.TaskContainer, taskIDGenerator *IDGenerator) {
for _, taskContainer := range tc {
for _, task := range taskContainer.ParallelTasks {
if task.Id == "" {
task.Id = taskIDGenerator.GetNextID()
}
}
}
}

Просмотреть файл

@ -15,10 +15,12 @@ func testingTaskCreator(taskName string) Task {
// Tasks for testing only.
case "TestingEchoTask":
return &TestingEchoTask{}
case "TestingEmitEchoTask":
return &TestingEmitEchoTask{}
case "TestingFailTask":
return &TestingFailTask{}
case "TestingEmitEchoTask":
return &TestingEmitEchoTask{}
case "TestingEmitEchoFailEchoTask":
return &TestingEmitEchoFailEchoTask{}
default:
return nil
}
@ -39,6 +41,18 @@ func (t *TestingEchoTask) RequiredParameters() []string {
return []string{"echo_text"}
}
// TestingFailTask is used only for testing. It always fails.
type TestingFailTask struct {
}
func (t *TestingFailTask) Run(parameters map[string]string) (newTasks []*pb.TaskContainer, output string, err error) {
return nil, "something went wrong", errors.New("full error message")
}
func (t *TestingFailTask) RequiredParameters() []string {
return []string{}
}
// TestingEmitEchoTask is used only for testing. It emits a TestingEchoTask.
type TestingEmitEchoTask struct {
}
@ -53,14 +67,20 @@ func (t *TestingEmitEchoTask) RequiredParameters() []string {
return []string{}
}
// TestingFailTask is used only for testing. It always fails.
type TestingFailTask struct {
// TestingEmitEchoFailEchoTask is used only for testing.
// It emits three sequential tasks: Echo, Fail, Echo.
type TestingEmitEchoFailEchoTask struct {
}
func (t *TestingFailTask) Run(parameters map[string]string) (newTasks []*pb.TaskContainer, output string, err error) {
return nil, "something went wrong", errors.New("full error message")
func (t *TestingEmitEchoFailEchoTask) Run(parameters map[string]string) (newTasks []*pb.TaskContainer, output string, err error) {
newTasks = []*pb.TaskContainer{
NewTaskContainerWithSingleTask("TestingEchoTask", parameters),
NewTaskContainerWithSingleTask("TestingFailTask", parameters),
NewTaskContainerWithSingleTask("TestingEchoTask", parameters),
}
return newTasks, "emitted tasks: Echo, Fail, Echo", nil
}
func (t *TestingFailTask) RequiredParameters() []string {
func (t *TestingEmitEchoFailEchoTask) RequiredParameters() []string {
return []string{}
}