Still sync Task after FrameworkAttemptCompleted (#27)

This commit is contained in:
Yuqi Wang 2019-07-17 15:35:32 +08:00 коммит произвёл GitHub
Родитель 20f38add58
Коммит 157c3bfe0f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 111 добавлений и 87 удалений

Просмотреть файл

@ -192,6 +192,18 @@ func (ts *TaskStatus) IsRunning() bool {
return ts.State == TaskAttemptRunning
}
func (f *Framework) IsCompleting() bool {
return f.Status.State == FrameworkAttemptDeletionPending ||
f.Status.State == FrameworkAttemptDeletionRequested ||
f.Status.State == FrameworkAttemptDeleting
}
func (ts *TaskStatus) IsCompleting() bool {
return ts.State == TaskAttemptDeletionPending ||
ts.State == TaskAttemptDeletionRequested ||
ts.State == TaskAttemptDeleting
}
func (ct CompletionType) IsSucceeded() bool {
return ct.Name == CompletionTypeNameSucceeded
}
@ -507,7 +519,8 @@ func (rp RetryPolicySpec) ShouldRetry(
// 0. Built-in Always-on RetryPolicy
if cs.Code == CompletionCodePodSpecInvalid ||
cs.Code == CompletionCodeStopFrameworkRequested {
cs.Code == CompletionCodeStopFrameworkRequested ||
cs.Code == CompletionCodeFrameworkAttemptCompletion {
return RetryDecision{false, true, 0, cs.Diagnostics}
}

Просмотреть файл

@ -132,7 +132,9 @@ const (
// complete a single Task in the TaskRole.
//
// Usage:
// If the Pod Spec is invalid or the ExecutionType is ExecutionStop,
// If the Pod Spec is invalid or
// the ExecutionType is ExecutionStop or
// the Task's FrameworkAttempt is completing,
// will not retry.
//
// If the FancyRetryPolicy is enabled,
@ -506,7 +508,6 @@ const (
// must have been creation requested successfully and is expected to exist.
// [AssociatedState]
// -> TaskAttemptPreparing
// -> TaskAttemptRunning
// -> TaskAttemptDeleting
// -> TaskAttemptCompleted
TaskAttemptCreationRequested TaskState = "AttemptCreationRequested"

Просмотреть файл

@ -775,13 +775,15 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
// so always just wait.
log.Infof(logPfx +
"Waiting ConfigMap to disappearing or disappear in the local cache")
return nil
}
} else {
// At this point, f.Status.State must be in:
// {FrameworkAttemptCreationRequested, FrameworkAttemptPreparing,
// FrameworkAttemptRunning}
if f.Status.State != ci.FrameworkAttemptPreparing &&
f.Status.State != ci.FrameworkAttemptRunning {
if f.Status.State == ci.FrameworkAttemptCreationRequested {
f.TransitionFrameworkState(ci.FrameworkAttemptPreparing)
}
}
} else {
if f.Status.AttemptStatus.CompletionStatus == nil {
diag := fmt.Sprintf("ConfigMap is being deleted by others")
@ -792,13 +794,13 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
f.TransitionFrameworkState(ci.FrameworkAttemptDeleting)
log.Infof(logPfx + "Waiting ConfigMap to be deleted")
return nil
}
}
}
// At this point, f.Status.State must be in:
// {FrameworkAttemptCreationPending, FrameworkAttemptCompleted,
// FrameworkAttemptPreparing, FrameworkAttemptRunning}
// {FrameworkAttemptCreationPending, FrameworkAttemptPreparing,
// FrameworkAttemptRunning, FrameworkAttemptDeletionRequested,
// FrameworkAttemptDeleting, FrameworkAttemptCompleted}
if f.Status.State == ci.FrameworkAttemptCompleted {
// attemptToRetryFramework
@ -857,7 +859,8 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
}
// At this point, f.Status.State must be in:
// {FrameworkAttemptCreationPending, FrameworkAttemptPreparing,
// FrameworkAttemptRunning}
// FrameworkAttemptRunning, FrameworkAttemptDeletionRequested,
// FrameworkAttemptDeleting}
if f.Status.State == ci.FrameworkAttemptCreationPending {
if f.Spec.ExecutionType == ci.ExecutionStop {
@ -899,20 +902,26 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
return nil
}
// At this point, f.Status.State must be in:
// {FrameworkAttemptPreparing, FrameworkAttemptRunning}
// {FrameworkAttemptPreparing, FrameworkAttemptRunning,
// FrameworkAttemptDeletionRequested, FrameworkAttemptDeleting}
if f.Status.State == ci.FrameworkAttemptPreparing ||
f.Status.State == ci.FrameworkAttemptRunning {
f.Status.State == ci.FrameworkAttemptRunning ||
f.Status.State == ci.FrameworkAttemptDeletionRequested ||
f.Status.State == ci.FrameworkAttemptDeleting {
if !f.IsCompleting() {
if f.Spec.ExecutionType == ci.ExecutionStop {
diag := fmt.Sprintf("User has requested to stop the Framework")
log.Infof(logPfx + diag)
c.completeFrameworkAttempt(f, false,
ci.CompletionCodeStopFrameworkRequested.NewCompletionStatus(diag))
return nil
} else {
cancelled, err := c.syncTaskRoleStatuses(f, cm)
}
}
if !cancelled {
err := c.syncTaskRoleStatuses(f, cm)
if f.Status.State == ci.FrameworkAttemptPreparing ||
f.Status.State == ci.FrameworkAttemptRunning {
if !f.IsAnyTaskRunning() {
f.TransitionFrameworkState(ci.FrameworkAttemptPreparing)
} else {
@ -921,12 +930,14 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
}
return err
}
} else {
// Unreachable
panic(fmt.Errorf(logPfx+
"Failed: At this point, FrameworkState should be in {%v, %v} instead of %v",
ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning, f.Status.State))
"Failed: At this point, FrameworkState should be in "+
"{%v, %v, %v, %v} instead of %v",
ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning,
ci.FrameworkAttemptDeletionRequested, ci.FrameworkAttemptDeleting,
f.Status.State))
}
}
@ -1052,7 +1063,7 @@ func (c *FrameworkController) createConfigMap(
}
func (c *FrameworkController) syncTaskRoleStatuses(
f *ci.Framework, cm *core.ConfigMap) (syncFrameworkCancelled bool, err error) {
f *ci.Framework, cm *core.ConfigMap) (err error) {
logPfx := fmt.Sprintf("[%v]: syncTaskRoleStatuses: ", f.Key())
log.Infof(logPfx + "Started")
defer func() { log.Infof(logPfx + "Completed") }()
@ -1061,36 +1072,23 @@ func (c *FrameworkController) syncTaskRoleStatuses(
for _, taskRoleStatus := range f.TaskRoleStatuses() {
log.Infof("[%v][%v]: syncTaskRoleStatus", f.Key(), taskRoleStatus.Name)
for _, taskStatus := range taskRoleStatus.TaskStatuses {
cancelled, err := c.syncTaskState(f, cm, taskRoleStatus.Name, taskStatus.Index)
// At this point, f.Status.State must be in:
// {FrameworkAttemptPreparing, FrameworkAttemptRunning,
// FrameworkAttemptDeletionPending, FrameworkAttemptDeletionRequested,
// FrameworkAttemptDeleting}
err := c.syncTaskState(f, cm, taskRoleStatus.Name, taskStatus.Index)
if err != nil {
errs = append(errs, err)
}
if cancelled {
log.Infof(
"[%v][%v][%v]: syncFramework is cancelled",
f.Key(), taskRoleStatus.Name, taskStatus.Index)
return true, errorAgg.NewAggregate(errs)
}
if err != nil {
// The Tasks in the TaskRole have the same Spec except for the PodName,
// so in most cases, same Platform Transient Error will return.
log.Warnf(
"[%v][%v][%v]: Failed to sync Task, "+
"skip to sync the Tasks behind it in the TaskRole: %v",
f.Key(), taskRoleStatus.Name, taskStatus.Index, err)
break
}
}
}
return false, errorAgg.NewAggregate(errs)
return errorAgg.NewAggregate(errs)
}
func (c *FrameworkController) syncTaskState(
f *ci.Framework, cm *core.ConfigMap,
taskRoleName string, taskIndex int32) (syncFrameworkCancelled bool, err error) {
taskRoleName string, taskIndex int32) (err error) {
logPfx := fmt.Sprintf("[%v][%v][%v]: syncTaskState: ",
f.Key(), taskRoleName, taskIndex)
log.Infof(logPfx + "Started")
@ -1109,7 +1107,7 @@ func (c *FrameworkController) syncTaskState(
// should have already triggered and persisted FrameworkAttemptDeletionPending
// in previous sync, so current sync should have already been skipped but not.
log.Infof(logPfx + "Skipped: Task is already completed")
return false, nil
return nil
}
var pod *core.Pod
@ -1118,7 +1116,7 @@ func (c *FrameworkController) syncTaskState(
// so need to sync against it.
pod, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, false)
if err != nil {
return false, err
return err
}
if pod == nil {
@ -1128,7 +1126,7 @@ func (c *FrameworkController) syncTaskState(
if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) {
log.Infof(logPfx +
"Waiting Pod to appear in the local cache or timeout")
return false, nil
return nil
}
diag := fmt.Sprintf(
@ -1141,12 +1139,12 @@ func (c *FrameworkController) syncTaskState(
// TaskAttemptCompleted.
err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), true)
if err != nil {
return false, err
return err
}
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
ci.CompletionCodePodCreationTimeout.NewCompletionStatus(diag))
return false, nil
return nil
}
if taskStatus.State != ci.TaskAttemptCreationPending {
@ -1159,7 +1157,7 @@ func (c *FrameworkController) syncTaskState(
c.completeTaskAttempt(f, taskRoleName, taskIndex, true, nil)
}
return false, nil
return nil
}
} else {
if pod.DeletionTimestamp == nil {
@ -1168,7 +1166,7 @@ func (c *FrameworkController) syncTaskState(
// pod now.
err := c.deletePod(f, taskRoleName, taskIndex, *taskStatus.PodUID(), false)
if err != nil {
return false, err
return err
}
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeletionRequested)
}
@ -1181,7 +1179,13 @@ func (c *FrameworkController) syncTaskState(
// so always just wait.
log.Infof(logPfx +
"Waiting Pod to disappearing or disappear in the local cache")
return false, nil
return nil
}
// At this point, taskStatus.State must be in:
// {TaskAttemptCreationRequested, TaskAttemptPreparing, TaskAttemptRunning}
if taskStatus.State == ci.TaskAttemptCreationRequested {
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing)
}
// Possibly due to the NodeController has not heard from the kubelet who
@ -1194,7 +1198,7 @@ func (c *FrameworkController) syncTaskState(
log.Infof(logPfx+
"Waiting Pod to be deleted or deleting or transitioned from %v",
pod.Status.Phase)
return false, nil
return nil
}
// Below Pod fields may be available even when PodPending, such as the Pod
@ -1204,16 +1208,16 @@ func (c *FrameworkController) syncTaskState(
if pod.Status.Phase == core.PodPending {
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing)
return false, nil
return nil
} else if pod.Status.Phase == core.PodRunning {
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptRunning)
return false, nil
return nil
} else if pod.Status.Phase == core.PodSucceeded {
diag := fmt.Sprintf("Pod succeeded")
log.Infof(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
ci.CompletionCodeSucceeded.NewCompletionStatus(diag))
return false, nil
return nil
} else if pod.Status.Phase == core.PodFailed {
// All Container names in a Pod must be different, so we can still identify
// a Container even after the InitContainers is merged with the AppContainers.
@ -1260,9 +1264,9 @@ func (c *FrameworkController) syncTaskState(
ci.CompletionCode(*lastContainerExitCode).NewCompletionStatus(diag))
}
}
return false, nil
return nil
} else {
return false, fmt.Errorf(logPfx+
return fmt.Errorf(logPfx+
"Failed: Got unrecognized Pod Phase: %v", pod.Status.Phase)
}
} else {
@ -1275,7 +1279,7 @@ func (c *FrameworkController) syncTaskState(
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeleting)
log.Infof(logPfx + "Waiting Pod to be deleted")
return false, nil
return nil
}
}
}
@ -1314,7 +1318,7 @@ func (c *FrameworkController) syncTaskState(
// should be executed now.
if c.enqueueTaskRetryDelayTimeoutCheck(f, taskRoleName, taskIndex, true) {
log.Infof(logPfx + "Waiting Task to retry after delay")
return false, nil
return nil
}
// retryTask
@ -1333,6 +1337,12 @@ func (c *FrameworkController) syncTaskState(
// {TaskAttemptCreationPending, TaskCompleted}
if taskStatus.State == ci.TaskAttemptCreationPending {
if f.IsCompleting() {
log.Infof(logPfx + "Skip to createTaskAttempt: " +
"FrameworkAttempt is completing")
return nil
}
// createTaskAttempt
pod, err = c.createPod(f, cm, taskRoleName, taskIndex)
if err != nil {
@ -1349,14 +1359,14 @@ func (c *FrameworkController) syncTaskState(
// TaskAttemptCompleted.
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
if err != nil {
return false, err
return err
}
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
ci.CompletionCodePodSpecInvalid.NewCompletionStatus(diag))
return false, nil
return nil
} else {
return false, err
return err
}
}
@ -1374,12 +1384,18 @@ func (c *FrameworkController) syncTaskState(
// so need to wait before continue the sync.
log.Infof(logPfx +
"Waiting Pod to appear in the local cache or timeout")
return false, nil
return nil
}
// At this point, taskStatus.State must be in:
// {TaskCompleted}
if taskStatus.State == ci.TaskCompleted {
if f.IsCompleting() {
log.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " +
"FrameworkAttempt is completing")
return nil
}
// attemptToCompleteFrameworkAttempt
completionPolicy := taskRoleSpec.FrameworkAttemptCompletionPolicy
minFailedTaskCount := completionPolicy.MinFailedTaskCount
@ -1396,7 +1412,7 @@ func (c *FrameworkController) syncTaskState(
log.Infof(logPfx + diag)
c.completeFrameworkAttempt(f, false,
taskStatus.AttemptStatus.CompletionStatus.Code.NewCompletionStatus(diag))
return true, nil
return nil
}
}
@ -1411,7 +1427,7 @@ func (c *FrameworkController) syncTaskState(
log.Infof(logPfx + diag)
c.completeFrameworkAttempt(f, false,
ci.CompletionCodeSucceeded.NewCompletionStatus(diag))
return true, nil
return nil
}
}
@ -1428,10 +1444,10 @@ func (c *FrameworkController) syncTaskState(
log.Infof(logPfx + diag)
c.completeFrameworkAttempt(f, false,
ci.CompletionCodeSucceeded.NewCompletionStatus(diag))
return true, nil
return nil
}
return false, nil
return nil
}
// At this point, taskStatus.State must be in:
// {}
@ -1577,12 +1593,6 @@ func (c *FrameworkController) completeTaskAttempt(
// CompletionStatus should be immutable after set.
if taskStatus.AttemptStatus.CompletionStatus == nil {
taskStatus.AttemptStatus.CompletionStatus = completionStatus
} else if taskStatus.AttemptStatus.CompletionStatus.Code ==
ci.CompletionCodeFrameworkAttemptCompletion &&
completionStatus != nil {
// Just append to diagnostics
taskStatus.AttemptStatus.CompletionStatus.Diagnostics +=
": More Recent CompletionStatus: " + completionStatus.String()
}
if force {
@ -1625,14 +1635,14 @@ func (c *FrameworkController) completeFrameworkAttempt(
f.Status.AttemptStatus.CompletionStatus = completionStatus
}
taskCompletionStatus := ci.CompletionCodeFrameworkAttemptCompletion.
NewCompletionStatus("Stop to complete FrameworkAttempt")
for i := range f.TaskRoleStatuses() {
taskRoleStatus := &f.TaskRoleStatuses()[i]
for j := range taskRoleStatus.TaskStatuses {
taskStatus := &taskRoleStatus.TaskStatuses[j]
if taskStatus.AttemptStatus.CompletionStatus == nil {
taskStatus.AttemptStatus.CompletionStatus = taskCompletionStatus
taskStatus.AttemptStatus.CompletionStatus =
ci.CompletionCodeFrameworkAttemptCompletion.
NewCompletionStatus("Stop to complete current FrameworkAttempt")
}
}
}