Treat invalid Pod caused by network error as PodCreationUnknownError (#61)
This commit is contained in:
Родитель
a220bd321f
Коммит
959722c429
|
@ -26,8 +26,11 @@ import (
|
|||
"fmt"
|
||||
"github.com/microsoft/frameworkcontroller/pkg/common"
|
||||
core "k8s.io/api/core/v1"
|
||||
apiErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -63,17 +66,19 @@ const (
|
|||
|
||||
// [-999, -1]: Predefined Framework Error
|
||||
// -1XX: Transient Error
|
||||
CompletionCodeConfigMapExternalDeleted CompletionCode = -100
|
||||
CompletionCodePodExternalDeleted CompletionCode = -101
|
||||
CompletionCodeConfigMapCreationTimeout CompletionCode = -110
|
||||
CompletionCodePodCreationTimeout CompletionCode = -111
|
||||
CompletionCodeConfigMapExternalDeleted CompletionCode = -100
|
||||
CompletionCodePodExternalDeleted CompletionCode = -101
|
||||
CompletionCodeConfigMapLocalCacheCreationTimeout CompletionCode = -110
|
||||
CompletionCodePodLocalCacheCreationTimeout CompletionCode = -111
|
||||
CompletionCodePodCreationTransientError CompletionCode = -120
|
||||
// -2XX: Permanent Error
|
||||
CompletionCodePodSpecPermanentError CompletionCode = -200
|
||||
CompletionCodePodCreationPermanentError CompletionCode = -200
|
||||
CompletionCodeStopFrameworkRequested CompletionCode = -210
|
||||
CompletionCodeFrameworkAttemptCompletion CompletionCode = -220
|
||||
CompletionCodeDeleteTaskRequested CompletionCode = -230
|
||||
// -3XX: Unknown Error
|
||||
CompletionCodePodFailedWithoutFailedContainer CompletionCode = -300
|
||||
CompletionCodePodCreationUnknownError CompletionCode = -310
|
||||
)
|
||||
|
||||
var completionCodeInfoList = []*CompletionCodeInfo{}
|
||||
|
@ -152,20 +157,28 @@ func initCompletionCodeInfos() {
|
|||
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
|
||||
},
|
||||
{
|
||||
Code: CompletionCodeConfigMapCreationTimeout.Ptr(),
|
||||
Phrase: "ConfigMapCreationTimeout",
|
||||
Code: CompletionCodeConfigMapLocalCacheCreationTimeout.Ptr(),
|
||||
Phrase: "ConfigMapLocalCacheCreationTimeout",
|
||||
Type: CompletionType{CompletionTypeNameFailed,
|
||||
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
|
||||
},
|
||||
{
|
||||
Code: CompletionCodePodCreationTimeout.Ptr(),
|
||||
Phrase: "PodCreationTimeout",
|
||||
Code: CompletionCodePodLocalCacheCreationTimeout.Ptr(),
|
||||
Phrase: "PodLocalCacheCreationTimeout",
|
||||
Type: CompletionType{CompletionTypeNameFailed,
|
||||
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
|
||||
},
|
||||
{
|
||||
Code: CompletionCodePodSpecPermanentError.Ptr(),
|
||||
Phrase: "PodSpecPermanentError",
|
||||
// Only used to distinguish with others, and will never be used to complete
|
||||
// a TaskAttempt.
|
||||
Code: CompletionCodePodCreationTransientError.Ptr(),
|
||||
Phrase: "PodCreationTransientError",
|
||||
Type: CompletionType{CompletionTypeNameFailed,
|
||||
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
|
||||
},
|
||||
{
|
||||
Code: CompletionCodePodCreationPermanentError.Ptr(),
|
||||
Phrase: "PodCreationPermanentError",
|
||||
Type: CompletionType{CompletionTypeNameFailed,
|
||||
[]CompletionTypeAttribute{CompletionTypeAttributePermanent}},
|
||||
},
|
||||
|
@ -193,6 +206,12 @@ func initCompletionCodeInfos() {
|
|||
Type: CompletionType{CompletionTypeNameFailed,
|
||||
[]CompletionTypeAttribute{}},
|
||||
},
|
||||
{
|
||||
Code: CompletionCodePodCreationUnknownError.Ptr(),
|
||||
Phrase: "PodCreationUnknownError",
|
||||
Type: CompletionType{CompletionTypeNameFailed,
|
||||
[]CompletionTypeAttribute{}},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -238,6 +257,9 @@ type MatchedContainer struct {
|
|||
}
|
||||
|
||||
// Match ANY CompletionCodeInfo
|
||||
// The returned CompletionCode may not within CompletionCodeInfos, such as for
|
||||
// the ContainerUnrecognizedFailed, so it should not be used to
|
||||
// NewTaskAttemptCompletionStatus or NewFrameworkAttemptCompletionStatus later.
|
||||
func MatchCompletionCodeInfos(pod *core.Pod) PodMatchResult {
|
||||
for _, codeInfo := range completionCodeInfoList {
|
||||
for _, podPattern := range codeInfo.PodPatterns {
|
||||
|
@ -404,6 +426,55 @@ func generatePodUnmatchedResult(pod *core.Pod) PodMatchResult {
|
|||
}
|
||||
}
|
||||
|
||||
// The returned CompletionCode must be within CompletionCodeInfos.
|
||||
func ClassifyPodCreationError(apiErr error) PodMatchResult {
|
||||
diag := fmt.Sprintf("Failed to create Pod: %v", common.ToJson(apiErr))
|
||||
|
||||
// Treat Platform Error as Transient Error, such as Pod decoding error.
|
||||
if strings.Contains(apiErr.Error(), "object provided is unrecognized") ||
|
||||
strings.Contains(apiErr.Error(), "exceeded quota") {
|
||||
return PodMatchResult{
|
||||
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationTransientError],
|
||||
Diagnostics: diag,
|
||||
}
|
||||
}
|
||||
|
||||
// Treat General Framework Error as Unknown Error for safety.
|
||||
if apiErrors.IsBadRequest(apiErr) ||
|
||||
apiErrors.IsForbidden(apiErr) {
|
||||
return PodMatchResult{
|
||||
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationUnknownError],
|
||||
Diagnostics: diag,
|
||||
}
|
||||
}
|
||||
|
||||
// Treat Permanent Framework Error as Permanent Error only if it must be
|
||||
// Permanent Error.
|
||||
if apiErrors.IsInvalid(apiErr) ||
|
||||
apiErrors.IsRequestEntityTooLargeError(apiErr) {
|
||||
// TODO: Also check net.IsConnectionRefused
|
||||
if net.IsConnectionReset(apiErr) || net.IsProbableEOF(apiErr) {
|
||||
// The ApiServer Permanent Error may be caused by Network Transient Error,
|
||||
// so treat it as Unknown Error for safety.
|
||||
return PodMatchResult{
|
||||
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationUnknownError],
|
||||
Diagnostics: diag,
|
||||
}
|
||||
} else {
|
||||
return PodMatchResult{
|
||||
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationPermanentError],
|
||||
Diagnostics: diag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Treat all other errors as Transient Error, including all non-APIStatus errors.
|
||||
return PodMatchResult{
|
||||
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationTransientError],
|
||||
Diagnostics: diag,
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
// Completion Utils
|
||||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -60,7 +60,7 @@ type Config struct {
|
|||
// Generally, it should be proportional to the cluster Framework workload, and within the ApiServer
|
||||
// serving capacity/limit such as the --max-mutating-requests-inflight.
|
||||
KubeClientQps *float32 `yaml:"kubeClientQps"`
|
||||
KubeClientBurst *int32 `yaml:"kubeClientBurst"`
|
||||
KubeClientBurst *int32 `yaml:"kubeClientBurst"`
|
||||
|
||||
// Number of concurrent workers to process each different Frameworks.
|
||||
// Generally, it should be proportional to the above rate limits of requests.
|
||||
|
|
|
@ -195,15 +195,17 @@ func NewCompletedTaskTriggeredCompletionStatus(
|
|||
"conditions in FrameworkAttemptCompletionPolicy have ever been triggered",
|
||||
completedTaskCount, totalTaskCount)
|
||||
if triggerTaskStatus == nil {
|
||||
return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag, nil)
|
||||
return CompletionCodeSucceeded.
|
||||
NewFrameworkAttemptCompletionStatus(diag, nil)
|
||||
} else {
|
||||
return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag,
|
||||
&CompletionPolicyTriggerStatus{
|
||||
Message: diag,
|
||||
TaskRoleName: triggerTaskRoleName,
|
||||
TaskIndex: triggerTaskStatus.Index,
|
||||
},
|
||||
)
|
||||
return CompletionCodeSucceeded.
|
||||
NewFrameworkAttemptCompletionStatus(diag,
|
||||
&CompletionPolicyTriggerStatus{
|
||||
Message: diag,
|
||||
TaskRoleName: triggerTaskRoleName,
|
||||
TaskIndex: triggerTaskStatus.Index,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
errorWrap "github.com/pkg/errors"
|
||||
"gopkg.in/yaml.v2"
|
||||
"io/ioutil"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -287,3 +288,12 @@ func Decompress(compressedBytes []byte) (string, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetErrorCause(err error) error {
|
||||
causeErr := errorWrap.Cause(err)
|
||||
if causeErr == nil {
|
||||
return err
|
||||
} else {
|
||||
return causeErr
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1053,7 +1053,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
|
|||
"ConfigMap does not appear in the local cache within timeout %v, "+
|
||||
"so consider it was deleted and explicitly delete it",
|
||||
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
|
||||
code = ci.CompletionCodeConfigMapCreationTimeout
|
||||
code = ci.CompletionCodeConfigMapLocalCacheCreationTimeout
|
||||
klog.Warning(logPfx + diag)
|
||||
}
|
||||
|
||||
|
@ -1670,7 +1670,7 @@ func (c *FrameworkController) syncTaskState(
|
|||
"Pod does not appear in the local cache within timeout %v, "+
|
||||
"so consider it was deleted and explicitly delete it",
|
||||
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
|
||||
code = ci.CompletionCodePodCreationTimeout
|
||||
code = ci.CompletionCodePodLocalCacheCreationTimeout
|
||||
klog.Warning(logPfx + diag)
|
||||
}
|
||||
|
||||
|
@ -1752,8 +1752,9 @@ func (c *FrameworkController) syncTaskState(
|
|||
diag := fmt.Sprintf("Pod succeeded")
|
||||
klog.Info(logPfx + diag)
|
||||
c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
|
||||
ci.CompletionCodeSucceeded.NewTaskAttemptCompletionStatus(
|
||||
diag, ci.ExtractPodCompletionStatus(pod)))
|
||||
ci.CompletionCodeSucceeded.
|
||||
NewTaskAttemptCompletionStatus(
|
||||
diag, ci.ExtractPodCompletionStatus(pod)))
|
||||
return nil
|
||||
} else if pod.Status.Phase == core.PodFailed {
|
||||
result := ci.MatchCompletionCodeInfos(pod)
|
||||
|
@ -1910,26 +1911,26 @@ func (c *FrameworkController) syncTaskState(
|
|||
// createTaskAttempt
|
||||
pod, err = c.createPod(f, cm, taskRoleName, taskIndex)
|
||||
if err != nil {
|
||||
apiErr := errorWrap.Cause(err)
|
||||
if internal.IsPodSpecPermanentError(apiErr) {
|
||||
// Should be Framework Error instead of Platform Transient Error.
|
||||
diag := fmt.Sprintf("Failed to create Pod: %v", common.ToJson(apiErr))
|
||||
klog.Info(logPfx + diag)
|
||||
|
||||
// Ensure pod is deleted in remote to avoid managed pod leak after
|
||||
// TaskAttemptCompleted.
|
||||
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
|
||||
ci.CompletionCodePodSpecPermanentError.
|
||||
NewTaskAttemptCompletionStatus(diag, nil))
|
||||
return nil
|
||||
} else {
|
||||
apiErr := common.GetErrorCause(err)
|
||||
result := ci.ClassifyPodCreationError(apiErr)
|
||||
if *result.CodeInfo.Code == ci.CompletionCodePodCreationTransientError {
|
||||
// Do not complete the TaskAttempt, as generally, user does not need to
|
||||
// aware the Transient Error during Pod creation.
|
||||
return err
|
||||
}
|
||||
|
||||
klog.Info(logPfx + result.Diagnostics)
|
||||
// Ensure pod is deleted in remote to avoid managed pod leak after
|
||||
// TaskAttemptCompleted.
|
||||
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
|
||||
result.CodeInfo.Code.
|
||||
NewTaskAttemptCompletionStatus(result.Diagnostics, nil))
|
||||
return nil
|
||||
}
|
||||
|
||||
taskStatus.AttemptStatus.PodUID = &pod.UID
|
||||
|
|
|
@ -38,7 +38,6 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -222,11 +221,3 @@ func GetPodDeletionStartTime(pod *core.Pod) *meta.Time {
|
|||
}
|
||||
return common.PtrTime(meta.NewTime(pod.DeletionTimestamp.Add(-gracePeriod)))
|
||||
}
|
||||
|
||||
func IsPodSpecPermanentError(apiErr error) bool {
|
||||
return apiErrors.IsBadRequest(apiErr) ||
|
||||
apiErrors.IsInvalid(apiErr) ||
|
||||
apiErrors.IsRequestEntityTooLargeError(apiErr) ||
|
||||
(apiErrors.IsForbidden(apiErr) &&
|
||||
!strings.Contains(apiErr.Error(), "exceeded quota"))
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче