From 157c3bfe0ff01ee9e50355767cce23223e43cfaf Mon Sep 17 00:00:00 2001 From: Yuqi Wang <32826762+yqwang-ms@users.noreply.github.com> Date: Wed, 17 Jul 2019 15:35:32 +0800 Subject: [PATCH] Still sync Task after FrameworkAttemptCompleted (#27) --- pkg/apis/frameworkcontroller/v1/funcs.go | 15 +- pkg/apis/frameworkcontroller/v1/types.go | 5 +- pkg/controller/controller.go | 178 ++++++++++++----------- 3 files changed, 111 insertions(+), 87 deletions(-) diff --git a/pkg/apis/frameworkcontroller/v1/funcs.go b/pkg/apis/frameworkcontroller/v1/funcs.go index fb3e9af..399433f 100644 --- a/pkg/apis/frameworkcontroller/v1/funcs.go +++ b/pkg/apis/frameworkcontroller/v1/funcs.go @@ -192,6 +192,18 @@ func (ts *TaskStatus) IsRunning() bool { return ts.State == TaskAttemptRunning } +func (f *Framework) IsCompleting() bool { + return f.Status.State == FrameworkAttemptDeletionPending || + f.Status.State == FrameworkAttemptDeletionRequested || + f.Status.State == FrameworkAttemptDeleting +} + +func (ts *TaskStatus) IsCompleting() bool { + return ts.State == TaskAttemptDeletionPending || + ts.State == TaskAttemptDeletionRequested || + ts.State == TaskAttemptDeleting +} + func (ct CompletionType) IsSucceeded() bool { return ct.Name == CompletionTypeNameSucceeded } @@ -507,7 +519,8 @@ func (rp RetryPolicySpec) ShouldRetry( // 0. Built-in Always-on RetryPolicy if cs.Code == CompletionCodePodSpecInvalid || - cs.Code == CompletionCodeStopFrameworkRequested { + cs.Code == CompletionCodeStopFrameworkRequested || + cs.Code == CompletionCodeFrameworkAttemptCompletion { return RetryDecision{false, true, 0, cs.Diagnostics} } diff --git a/pkg/apis/frameworkcontroller/v1/types.go b/pkg/apis/frameworkcontroller/v1/types.go index 6590499..75483c5 100644 --- a/pkg/apis/frameworkcontroller/v1/types.go +++ b/pkg/apis/frameworkcontroller/v1/types.go @@ -132,7 +132,9 @@ const ( // complete a single Task in the TaskRole. // // Usage: -// If the Pod Spec is invalid or the ExecutionType is ExecutionStop, +// If the Pod Spec is invalid or +// the ExecutionType is ExecutionStop or +// the Task's FrameworkAttempt is completing, // will not retry. // // If the FancyRetryPolicy is enabled, @@ -506,7 +508,6 @@ const ( // must have been creation requested successfully and is expected to exist. // [AssociatedState] // -> TaskAttemptPreparing - // -> TaskAttemptRunning // -> TaskAttemptDeleting // -> TaskAttemptCompleted TaskAttemptCreationRequested TaskState = "AttemptCreationRequested" diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cd4d14a..cb0a619 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -775,12 +775,14 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { // so always just wait. log.Infof(logPfx + "Waiting ConfigMap to disappearing or disappear in the local cache") - return nil - } + } else { + // At this point, f.Status.State must be in: + // {FrameworkAttemptCreationRequested, FrameworkAttemptPreparing, + // FrameworkAttemptRunning} - if f.Status.State != ci.FrameworkAttemptPreparing && - f.Status.State != ci.FrameworkAttemptRunning { - f.TransitionFrameworkState(ci.FrameworkAttemptPreparing) + if f.Status.State == ci.FrameworkAttemptCreationRequested { + f.TransitionFrameworkState(ci.FrameworkAttemptPreparing) + } } } else { if f.Status.AttemptStatus.CompletionStatus == nil { @@ -792,13 +794,13 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { f.TransitionFrameworkState(ci.FrameworkAttemptDeleting) log.Infof(logPfx + "Waiting ConfigMap to be deleted") - return nil } } } // At this point, f.Status.State must be in: - // {FrameworkAttemptCreationPending, FrameworkAttemptCompleted, - // FrameworkAttemptPreparing, FrameworkAttemptRunning} + // {FrameworkAttemptCreationPending, FrameworkAttemptPreparing, + // FrameworkAttemptRunning, FrameworkAttemptDeletionRequested, + // FrameworkAttemptDeleting, FrameworkAttemptCompleted} if f.Status.State == ci.FrameworkAttemptCompleted { // attemptToRetryFramework @@ -857,7 +859,8 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { } // At this point, f.Status.State must be in: // {FrameworkAttemptCreationPending, FrameworkAttemptPreparing, - // FrameworkAttemptRunning} + // FrameworkAttemptRunning, FrameworkAttemptDeletionRequested, + // FrameworkAttemptDeleting} if f.Status.State == ci.FrameworkAttemptCreationPending { if f.Spec.ExecutionType == ci.ExecutionStop { @@ -899,34 +902,42 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { return nil } // At this point, f.Status.State must be in: - // {FrameworkAttemptPreparing, FrameworkAttemptRunning} + // {FrameworkAttemptPreparing, FrameworkAttemptRunning, + // FrameworkAttemptDeletionRequested, FrameworkAttemptDeleting} if f.Status.State == ci.FrameworkAttemptPreparing || - f.Status.State == ci.FrameworkAttemptRunning { - if f.Spec.ExecutionType == ci.ExecutionStop { - diag := fmt.Sprintf("User has requested to stop the Framework") - log.Infof(logPfx + diag) - c.completeFrameworkAttempt(f, false, - ci.CompletionCodeStopFrameworkRequested.NewCompletionStatus(diag)) - return nil - } else { - cancelled, err := c.syncTaskRoleStatuses(f, cm) - - if !cancelled { - if !f.IsAnyTaskRunning() { - f.TransitionFrameworkState(ci.FrameworkAttemptPreparing) - } else { - f.TransitionFrameworkState(ci.FrameworkAttemptRunning) - } + f.Status.State == ci.FrameworkAttemptRunning || + f.Status.State == ci.FrameworkAttemptDeletionRequested || + f.Status.State == ci.FrameworkAttemptDeleting { + if !f.IsCompleting() { + if f.Spec.ExecutionType == ci.ExecutionStop { + diag := fmt.Sprintf("User has requested to stop the Framework") + log.Infof(logPfx + diag) + c.completeFrameworkAttempt(f, false, + ci.CompletionCodeStopFrameworkRequested.NewCompletionStatus(diag)) } - - return err } + + err := c.syncTaskRoleStatuses(f, cm) + + if f.Status.State == ci.FrameworkAttemptPreparing || + f.Status.State == ci.FrameworkAttemptRunning { + if !f.IsAnyTaskRunning() { + f.TransitionFrameworkState(ci.FrameworkAttemptPreparing) + } else { + f.TransitionFrameworkState(ci.FrameworkAttemptRunning) + } + } + + return err } else { // Unreachable panic(fmt.Errorf(logPfx+ - "Failed: At this point, FrameworkState should be in {%v, %v} instead of %v", - ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning, f.Status.State)) + "Failed: At this point, FrameworkState should be in "+ + "{%v, %v, %v, %v} instead of %v", + ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning, + ci.FrameworkAttemptDeletionRequested, ci.FrameworkAttemptDeleting, + f.Status.State)) } } @@ -1052,7 +1063,7 @@ func (c *FrameworkController) createConfigMap( } func (c *FrameworkController) syncTaskRoleStatuses( - f *ci.Framework, cm *core.ConfigMap) (syncFrameworkCancelled bool, err error) { + f *ci.Framework, cm *core.ConfigMap) (err error) { logPfx := fmt.Sprintf("[%v]: syncTaskRoleStatuses: ", f.Key()) log.Infof(logPfx + "Started") defer func() { log.Infof(logPfx + "Completed") }() @@ -1061,36 +1072,23 @@ func (c *FrameworkController) syncTaskRoleStatuses( for _, taskRoleStatus := range f.TaskRoleStatuses() { log.Infof("[%v][%v]: syncTaskRoleStatus", f.Key(), taskRoleStatus.Name) for _, taskStatus := range taskRoleStatus.TaskStatuses { - cancelled, err := c.syncTaskState(f, cm, taskRoleStatus.Name, taskStatus.Index) + // At this point, f.Status.State must be in: + // {FrameworkAttemptPreparing, FrameworkAttemptRunning, + // FrameworkAttemptDeletionPending, FrameworkAttemptDeletionRequested, + // FrameworkAttemptDeleting} + err := c.syncTaskState(f, cm, taskRoleStatus.Name, taskStatus.Index) if err != nil { errs = append(errs, err) } - - if cancelled { - log.Infof( - "[%v][%v][%v]: syncFramework is cancelled", - f.Key(), taskRoleStatus.Name, taskStatus.Index) - return true, errorAgg.NewAggregate(errs) - } - - if err != nil { - // The Tasks in the TaskRole have the same Spec except for the PodName, - // so in most cases, same Platform Transient Error will return. - log.Warnf( - "[%v][%v][%v]: Failed to sync Task, "+ - "skip to sync the Tasks behind it in the TaskRole: %v", - f.Key(), taskRoleStatus.Name, taskStatus.Index, err) - break - } } } - return false, errorAgg.NewAggregate(errs) + return errorAgg.NewAggregate(errs) } func (c *FrameworkController) syncTaskState( f *ci.Framework, cm *core.ConfigMap, - taskRoleName string, taskIndex int32) (syncFrameworkCancelled bool, err error) { + taskRoleName string, taskIndex int32) (err error) { logPfx := fmt.Sprintf("[%v][%v][%v]: syncTaskState: ", f.Key(), taskRoleName, taskIndex) log.Infof(logPfx + "Started") @@ -1109,7 +1107,7 @@ func (c *FrameworkController) syncTaskState( // should have already triggered and persisted FrameworkAttemptDeletionPending // in previous sync, so current sync should have already been skipped but not. log.Infof(logPfx + "Skipped: Task is already completed") - return false, nil + return nil } var pod *core.Pod @@ -1118,7 +1116,7 @@ func (c *FrameworkController) syncTaskState( // so need to sync against it. pod, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, false) if err != nil { - return false, err + return err } if pod == nil { @@ -1128,7 +1126,7 @@ func (c *FrameworkController) syncTaskState( if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) { log.Infof(logPfx + "Waiting Pod to appear in the local cache or timeout") - return false, nil + return nil } diag := fmt.Sprintf( @@ -1141,12 +1139,12 @@ func (c *FrameworkController) syncTaskState( // TaskAttemptCompleted. err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), true) if err != nil { - return false, err + return err } c.completeTaskAttempt(f, taskRoleName, taskIndex, true, ci.CompletionCodePodCreationTimeout.NewCompletionStatus(diag)) - return false, nil + return nil } if taskStatus.State != ci.TaskAttemptCreationPending { @@ -1159,7 +1157,7 @@ func (c *FrameworkController) syncTaskState( c.completeTaskAttempt(f, taskRoleName, taskIndex, true, nil) } - return false, nil + return nil } } else { if pod.DeletionTimestamp == nil { @@ -1168,7 +1166,7 @@ func (c *FrameworkController) syncTaskState( // pod now. err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), false) if err != nil { - return false, err + return err } f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeletionRequested) } @@ -1181,7 +1179,13 @@ func (c *FrameworkController) syncTaskState( // so always just wait. log.Infof(logPfx + "Waiting Pod to disappearing or disappear in the local cache") - return false, nil + return nil + } + + // At this point, taskStatus.State must be in: + // {TaskAttemptCreationRequested, TaskAttemptPreparing, TaskAttemptRunning} + if taskStatus.State == ci.TaskAttemptCreationRequested { + f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing) } // Possibly due to the NodeController has not heard from the kubelet who @@ -1194,7 +1198,7 @@ func (c *FrameworkController) syncTaskState( log.Infof(logPfx+ "Waiting Pod to be deleted or deleting or transitioned from %v", pod.Status.Phase) - return false, nil + return nil } // Below Pod fields may be available even when PodPending, such as the Pod @@ -1204,16 +1208,16 @@ func (c *FrameworkController) syncTaskState( if pod.Status.Phase == core.PodPending { f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing) - return false, nil + return nil } else if pod.Status.Phase == core.PodRunning { f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptRunning) - return false, nil + return nil } else if pod.Status.Phase == core.PodSucceeded { diag := fmt.Sprintf("Pod succeeded") log.Infof(logPfx + diag) c.completeTaskAttempt(f, taskRoleName, taskIndex, false, ci.CompletionCodeSucceeded.NewCompletionStatus(diag)) - return false, nil + return nil } else if pod.Status.Phase == core.PodFailed { // All Container names in a Pod must be different, so we can still identify // a Container even after the InitContainers is merged with the AppContainers. @@ -1260,9 +1264,9 @@ func (c *FrameworkController) syncTaskState( ci.CompletionCode(*lastContainerExitCode).NewCompletionStatus(diag)) } } - return false, nil + return nil } else { - return false, fmt.Errorf(logPfx+ + return fmt.Errorf(logPfx+ "Failed: Got unrecognized Pod Phase: %v", pod.Status.Phase) } } else { @@ -1275,7 +1279,7 @@ func (c *FrameworkController) syncTaskState( f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeleting) log.Infof(logPfx + "Waiting Pod to be deleted") - return false, nil + return nil } } } @@ -1314,7 +1318,7 @@ func (c *FrameworkController) syncTaskState( // should be executed now. if c.enqueueTaskRetryDelayTimeoutCheck(f, taskRoleName, taskIndex, true) { log.Infof(logPfx + "Waiting Task to retry after delay") - return false, nil + return nil } // retryTask @@ -1333,6 +1337,12 @@ func (c *FrameworkController) syncTaskState( // {TaskAttemptCreationPending, TaskCompleted} if taskStatus.State == ci.TaskAttemptCreationPending { + if f.IsCompleting() { + log.Infof(logPfx + "Skip to createTaskAttempt: " + + "FrameworkAttempt is completing") + return nil + } + // createTaskAttempt pod, err = c.createPod(f, cm, taskRoleName, taskIndex) if err != nil { @@ -1349,14 +1359,14 @@ func (c *FrameworkController) syncTaskState( // TaskAttemptCompleted. _, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true) if err != nil { - return false, err + return err } c.completeTaskAttempt(f, taskRoleName, taskIndex, true, ci.CompletionCodePodSpecInvalid.NewCompletionStatus(diag)) - return false, nil + return nil } else { - return false, err + return err } } @@ -1374,12 +1384,18 @@ func (c *FrameworkController) syncTaskState( // so need to wait before continue the sync. log.Infof(logPfx + "Waiting Pod to appear in the local cache or timeout") - return false, nil + return nil } // At this point, taskStatus.State must be in: // {TaskCompleted} if taskStatus.State == ci.TaskCompleted { + if f.IsCompleting() { + log.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " + + "FrameworkAttempt is completing") + return nil + } + // attemptToCompleteFrameworkAttempt completionPolicy := taskRoleSpec.FrameworkAttemptCompletionPolicy minFailedTaskCount := completionPolicy.MinFailedTaskCount @@ -1396,7 +1412,7 @@ func (c *FrameworkController) syncTaskState( log.Infof(logPfx + diag) c.completeFrameworkAttempt(f, false, taskStatus.AttemptStatus.CompletionStatus.Code.NewCompletionStatus(diag)) - return true, nil + return nil } } @@ -1411,7 +1427,7 @@ func (c *FrameworkController) syncTaskState( log.Infof(logPfx + diag) c.completeFrameworkAttempt(f, false, ci.CompletionCodeSucceeded.NewCompletionStatus(diag)) - return true, nil + return nil } } @@ -1428,10 +1444,10 @@ func (c *FrameworkController) syncTaskState( log.Infof(logPfx + diag) c.completeFrameworkAttempt(f, false, ci.CompletionCodeSucceeded.NewCompletionStatus(diag)) - return true, nil + return nil } - return false, nil + return nil } // At this point, taskStatus.State must be in: // {} @@ -1577,12 +1593,6 @@ func (c *FrameworkController) completeTaskAttempt( // CompletionStatus should be immutable after set. if taskStatus.AttemptStatus.CompletionStatus == nil { taskStatus.AttemptStatus.CompletionStatus = completionStatus - } else if taskStatus.AttemptStatus.CompletionStatus.Code == - ci.CompletionCodeFrameworkAttemptCompletion && - completionStatus != nil { - // Just append to diagnostics - taskStatus.AttemptStatus.CompletionStatus.Diagnostics += - ": More Recent CompletionStatus: " + completionStatus.String() } if force { @@ -1625,14 +1635,14 @@ func (c *FrameworkController) completeFrameworkAttempt( f.Status.AttemptStatus.CompletionStatus = completionStatus } - taskCompletionStatus := ci.CompletionCodeFrameworkAttemptCompletion. - NewCompletionStatus("Stop to complete FrameworkAttempt") for i := range f.TaskRoleStatuses() { taskRoleStatus := &f.TaskRoleStatuses()[i] for j := range taskRoleStatus.TaskStatuses { taskStatus := &taskRoleStatus.TaskStatuses[j] if taskStatus.AttemptStatus.CompletionStatus == nil { - taskStatus.AttemptStatus.CompletionStatus = taskCompletionStatus + taskStatus.AttemptStatus.CompletionStatus = + ci.CompletionCodeFrameworkAttemptCompletion. + NewCompletionStatus("Stop to complete current FrameworkAttempt") } } }