Support LogObjectSnapshot: Expose Framework and Pod History (#31)
This commit is contained in:
Родитель
48f601bb39
Коммит
1aa6e612e1
|
@ -76,6 +76,35 @@ type Config struct {
|
|||
// all-or-nothing fashion in order to perform any useful work.
|
||||
FrameworkMinRetryDelaySecForTransientConflictFailed *int64 `yaml:"frameworkMinRetryDelaySecForTransientConflictFailed"`
|
||||
FrameworkMaxRetryDelaySecForTransientConflictFailed *int64 `yaml:"frameworkMaxRetryDelaySecForTransientConflictFailed"`
|
||||
|
||||
// Specify when to log the snapshot of which managed object.
|
||||
// This enables external systems to collect and process the history snapshots,
|
||||
// such as persistence, metrics conversion, visualization, alerting, acting,
|
||||
// analysis, etc.
|
||||
// Notes:
|
||||
// 1. The snapshot is logged to stderr.
|
||||
// 2. Check GetFrameworkSnapshotLogTail and GetPodSnapshotLogTail to see how
|
||||
// to extract the snapshot from stderr.
|
||||
// 3. The same snapshot may be logged more than once in some rare cases, so
|
||||
// external systems may need to deduplicate them by object.ResourceVersion.
|
||||
// 4. The snapshot triggered by deletion may be missed to log during the
|
||||
// FrameworkController downtime.
|
||||
LogObjectSnapshot LogObjectSnapshot `yaml:"logObjectSnapshot"`
|
||||
}
|
||||
|
||||
type LogObjectSnapshot struct {
|
||||
Framework LogFrameworkSnapshot `yaml:"framework"`
|
||||
Pod LogPodSnapshot `yaml:"pod"`
|
||||
}
|
||||
|
||||
type LogFrameworkSnapshot struct {
|
||||
OnTaskRetry *bool `yaml:"onTaskRetry"`
|
||||
OnFrameworkRetry *bool `yaml:"onFrameworkRetry"`
|
||||
OnFrameworkDeletion *bool `yaml:"onFrameworkDeletion"`
|
||||
}
|
||||
|
||||
type LogPodSnapshot struct {
|
||||
OnPodDeletion *bool `yaml:"onPodDeletion"`
|
||||
}
|
||||
|
||||
func NewConfig() *Config {
|
||||
|
@ -107,6 +136,18 @@ func NewConfig() *Config {
|
|||
if c.FrameworkMaxRetryDelaySecForTransientConflictFailed == nil {
|
||||
c.FrameworkMaxRetryDelaySecForTransientConflictFailed = common.PtrInt64(15 * 60)
|
||||
}
|
||||
if c.LogObjectSnapshot.Framework.OnTaskRetry == nil {
|
||||
c.LogObjectSnapshot.Framework.OnTaskRetry = common.PtrBool(true)
|
||||
}
|
||||
if c.LogObjectSnapshot.Framework.OnFrameworkRetry == nil {
|
||||
c.LogObjectSnapshot.Framework.OnFrameworkRetry = common.PtrBool(true)
|
||||
}
|
||||
if c.LogObjectSnapshot.Framework.OnFrameworkDeletion == nil {
|
||||
c.LogObjectSnapshot.Framework.OnFrameworkDeletion = common.PtrBool(true)
|
||||
}
|
||||
if c.LogObjectSnapshot.Pod.OnPodDeletion == nil {
|
||||
c.LogObjectSnapshot.Pod.OnPodDeletion = common.PtrBool(true)
|
||||
}
|
||||
|
||||
// Validation
|
||||
errPrefix := "Config Validation Failed: "
|
||||
|
|
|
@ -90,6 +90,14 @@ func SplitTaskAttemptInstanceUID(taskAttemptInstanceUID *types.UID) (
|
|||
return int32(i), common.PtrUIDStr(parts[1])
|
||||
}
|
||||
|
||||
func GetFrameworkSnapshotLogTail(podPtr interface{}) string {
|
||||
return "FrameworkSnapshot: " + common.ToJson(podPtr)
|
||||
}
|
||||
|
||||
func GetPodSnapshotLogTail(frameworkPtr interface{}) string {
|
||||
return "PodSnapshot: " + common.ToJson(frameworkPtr)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
// Interfaces
|
||||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -245,42 +245,54 @@ func NewFrameworkController() *FrameworkController {
|
|||
|
||||
fInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
c.enqueueFrameworkObj(obj, "Framework Added")
|
||||
c.enqueueFrameworkObj(obj, "Framework Added", nil)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
// FrameworkController only cares about Framework.Spec update
|
||||
oldF := oldObj.(*ci.Framework)
|
||||
newF := newObj.(*ci.Framework)
|
||||
if !reflect.DeepEqual(oldF.Spec, newF.Spec) {
|
||||
c.enqueueFrameworkObj(newObj, "Framework.Spec Updated")
|
||||
c.enqueueFrameworkObj(newObj, "Framework.Spec Updated", nil)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
c.enqueueFrameworkObj(obj, "Framework Deleted")
|
||||
c.enqueueFrameworkObj(obj, "Framework Deleted", func() string {
|
||||
if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkDeletion {
|
||||
return ": " + ci.GetFrameworkSnapshotLogTail(obj)
|
||||
} else {
|
||||
return ""
|
||||
}
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Added")
|
||||
c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Added", nil)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
c.enqueueFrameworkConfigMapObj(newObj, "Framework ConfigMap Updated")
|
||||
c.enqueueFrameworkConfigMapObj(newObj, "Framework ConfigMap Updated", nil)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Deleted")
|
||||
c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Deleted", nil)
|
||||
},
|
||||
})
|
||||
|
||||
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
c.enqueueFrameworkPodObj(obj, "Framework Pod Added")
|
||||
c.enqueueFrameworkPodObj(obj, "Framework Pod Added", nil)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
c.enqueueFrameworkPodObj(newObj, "Framework Pod Updated")
|
||||
c.enqueueFrameworkPodObj(newObj, "Framework Pod Updated", nil)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
c.enqueueFrameworkPodObj(obj, "Framework Pod Deleted")
|
||||
c.enqueueFrameworkPodObj(obj, "Framework Pod Deleted", func() string {
|
||||
if *c.cConfig.LogObjectSnapshot.Pod.OnPodDeletion {
|
||||
return ": " + ci.GetPodSnapshotLogTail(obj)
|
||||
} else {
|
||||
return ""
|
||||
}
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
|
@ -288,7 +300,8 @@ func NewFrameworkController() *FrameworkController {
|
|||
}
|
||||
|
||||
// obj could be *ci.Framework or cache.DeletedFinalStateUnknown.
|
||||
func (c *FrameworkController) enqueueFrameworkObj(obj interface{}, msg string) {
|
||||
func (c *FrameworkController) enqueueFrameworkObj(
|
||||
obj interface{}, logSfx string, logTailFunc func() string) {
|
||||
key, err := internal.GetKey(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get key for obj %#v, skip to enqueue: %v", obj, err)
|
||||
|
@ -302,23 +315,29 @@ func (c *FrameworkController) enqueueFrameworkObj(obj interface{}, msg string) {
|
|||
}
|
||||
|
||||
c.fQueue.Add(key)
|
||||
klog.Infof("[%v]: enqueueFrameworkObj: %v", key, msg)
|
||||
|
||||
if logTailFunc != nil {
|
||||
logSfx += logTailFunc()
|
||||
}
|
||||
klog.Infof("[%v]: enqueueFrameworkObj: %v", key, logSfx)
|
||||
}
|
||||
|
||||
// obj could be *core.ConfigMap or cache.DeletedFinalStateUnknown.
|
||||
func (c *FrameworkController) enqueueFrameworkConfigMapObj(obj interface{}, msg string) {
|
||||
func (c *FrameworkController) enqueueFrameworkConfigMapObj(
|
||||
obj interface{}, logSfx string, logTailFunc func() string) {
|
||||
if cm := internal.ToConfigMap(obj); cm != nil {
|
||||
if f := c.getConfigMapOwner(cm); f != nil {
|
||||
c.enqueueFrameworkObj(f, msg+": "+cm.Name)
|
||||
c.enqueueFrameworkObj(f, logSfx+": "+cm.Name, logTailFunc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// obj could be *core.Pod or cache.DeletedFinalStateUnknown.
|
||||
func (c *FrameworkController) enqueueFrameworkPodObj(obj interface{}, msg string) {
|
||||
func (c *FrameworkController) enqueueFrameworkPodObj(
|
||||
obj interface{}, logSfx string, logTailFunc func() string) {
|
||||
if pod := internal.ToPod(obj); pod != nil {
|
||||
if cm := c.getPodOwner(pod); cm != nil {
|
||||
c.enqueueFrameworkConfigMapObj(cm, msg+": "+pod.Name)
|
||||
c.enqueueFrameworkConfigMapObj(cm, logSfx+": "+pod.Name, logTailFunc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -393,6 +412,8 @@ func (c *FrameworkController) Run(stopCh <-chan struct{}) {
|
|||
c.cConfig.CRDEstablishedCheckIntervalSec,
|
||||
c.cConfig.CRDEstablishedCheckTimeoutSec)
|
||||
|
||||
// The recovery order is not important, since all Frameworks will be enqueued
|
||||
// to sync in any case.
|
||||
go c.fInformer.Run(stopCh)
|
||||
go c.cmInformer.Run(stopCh)
|
||||
go c.podInformer.Run(stopCh)
|
||||
|
@ -669,9 +690,9 @@ func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck(
|
|||
return true
|
||||
}
|
||||
|
||||
func (c *FrameworkController) enqueueFramework(f *ci.Framework, msg string) {
|
||||
func (c *FrameworkController) enqueueFramework(f *ci.Framework, logSfx string) {
|
||||
c.fQueue.Add(f.Key())
|
||||
klog.Infof("[%v]: enqueueFramework: %v", f.Key(), msg)
|
||||
klog.Infof("[%v]: enqueueFramework: %v", f.Key(), logSfx)
|
||||
}
|
||||
|
||||
func (c *FrameworkController) syncFrameworkStatus(f *ci.Framework) error {
|
||||
|
@ -847,6 +868,14 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
|
|||
|
||||
// retryFramework
|
||||
klog.Infof(logPfx + "Retry Framework")
|
||||
|
||||
// The completed FrameworkAttempt has been persisted, so it is safe to also
|
||||
// expose it as one history snapshot.
|
||||
if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkRetry {
|
||||
klog.Infof(logPfx+
|
||||
"Framework will be retried: %v", ci.GetFrameworkSnapshotLogTail(f))
|
||||
}
|
||||
|
||||
f.Status.RetryPolicyStatus.TotalRetriedCount++
|
||||
if retryDecision.IsAccountable {
|
||||
f.Status.RetryPolicyStatus.AccountableRetriedCount++
|
||||
|
@ -1232,9 +1261,9 @@ func (c *FrameworkController) syncTaskState(
|
|||
terminated := containerStatus.State.Terminated
|
||||
if terminated != nil && terminated.ExitCode != 0 {
|
||||
allContainerDiags = append(allContainerDiags, fmt.Sprintf(
|
||||
"[Container %v, ExitCode: %v, Reason: %v, Message: %v]",
|
||||
containerStatus.Name, terminated.ExitCode, terminated.Reason,
|
||||
terminated.Message))
|
||||
"[Container: %v, ExitCode: %v, Signal: %v, Reason: %v, Message: %v]",
|
||||
containerStatus.Name, terminated.ExitCode, terminated.Signal,
|
||||
terminated.Reason, common.ToJson(terminated.Message)))
|
||||
|
||||
if lastContainerExitCode == nil ||
|
||||
lastContainerCompletionTime.Before(terminated.FinishedAt.Time) {
|
||||
|
@ -1323,6 +1352,14 @@ func (c *FrameworkController) syncTaskState(
|
|||
|
||||
// retryTask
|
||||
klog.Infof(logPfx + "Retry Task")
|
||||
|
||||
// The completed TaskAttempt has been persisted, so it is safe to also
|
||||
// expose it as one history snapshot.
|
||||
if *c.cConfig.LogObjectSnapshot.Framework.OnTaskRetry {
|
||||
klog.Infof(logPfx+
|
||||
"Task will be retried: %v", ci.GetFrameworkSnapshotLogTail(f))
|
||||
}
|
||||
|
||||
taskStatus.RetryPolicyStatus.TotalRetriedCount++
|
||||
if retryDecision.IsAccountable {
|
||||
taskStatus.RetryPolicyStatus.AccountableRetriedCount++
|
||||
|
|
Загрузка…
Ссылка в новой задаче