This commit is contained in:
Yuqi Wang 2019-07-18 15:58:23 +08:00 коммит произвёл GitHub
Родитель 243996c2c0
Коммит 2caad5b969
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 210 добавлений и 211 удалений

Просмотреть файл

@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE # SOFTWARE
FROM golang:alpine as builder FROM golang:1.12.6-alpine as builder
ENV PROJECT_DIR=${GOPATH}/src/github.com/microsoft/frameworkcontroller ENV PROJECT_DIR=${GOPATH}/src/github.com/microsoft/frameworkcontroller
ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkbarrier ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkbarrier
@ -32,7 +32,7 @@ RUN ${PROJECT_DIR}/build/frameworkbarrier/go-build.sh && \
mv ${PROJECT_DIR}/dist/frameworkbarrier/* ${INSTALL_DIR} mv ${PROJECT_DIR}/dist/frameworkbarrier/* ${INSTALL_DIR}
FROM alpine:latest FROM alpine:3.10.1
ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkbarrier ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkbarrier

Просмотреть файл

@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE # SOFTWARE
FROM golang:alpine as builder FROM golang:1.12.6-alpine as builder
ENV PROJECT_DIR=${GOPATH}/src/github.com/microsoft/frameworkcontroller ENV PROJECT_DIR=${GOPATH}/src/github.com/microsoft/frameworkcontroller
ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkcontroller ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkcontroller
@ -32,7 +32,7 @@ RUN ${PROJECT_DIR}/build/frameworkcontroller/go-build.sh && \
mv ${PROJECT_DIR}/dist/frameworkcontroller/* ${INSTALL_DIR} mv ${PROJECT_DIR}/dist/frameworkcontroller/* ${INSTALL_DIR}
FROM alpine:latest FROM alpine:3.10.1
ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkcontroller ENV INSTALL_DIR=/opt/frameworkcontroller/frameworkcontroller

Просмотреть файл

@ -23,8 +23,8 @@
package main package main
import ( import (
"github.com/microsoft/frameworkcontroller/pkg/common"
"github.com/microsoft/frameworkcontroller/pkg/barrier" "github.com/microsoft/frameworkcontroller/pkg/barrier"
"github.com/microsoft/frameworkcontroller/pkg/common"
) )
func init() { func init() {

Просмотреть файл

@ -23,11 +23,11 @@
package main package main
import ( import (
"github.com/microsoft/frameworkcontroller/pkg/common"
"github.com/microsoft/frameworkcontroller/pkg/controller"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/microsoft/frameworkcontroller/pkg/common"
"github.com/microsoft/frameworkcontroller/pkg/controller"
) )
func init() { func init() {

Просмотреть файл

@ -97,7 +97,7 @@ docker run -e KUBECONFIG=/mnt/.kube/config \
**Prerequisite** **Prerequisite**
Ensure you have installed [Golang 1.10 or above](https://golang.org/doc/install#install) and the [${GOPATH}](https://golang.org/doc/code.html#GOPATH) is valid. Ensure you have installed [Golang 1.12.6 or above](https://golang.org/doc/install#install) and the [${GOPATH}](https://golang.org/doc/code.html#GOPATH) is valid.
Then build the FrameworkController binary distribution: Then build the FrameworkController binary distribution:
```shell ```shell

Просмотреть файл

@ -23,12 +23,12 @@
package v1 package v1
import ( import (
"os"
"fmt" "fmt"
"github.com/microsoft/frameworkcontroller/pkg/common"
"io/ioutil" "io/ioutil"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"github.com/microsoft/frameworkcontroller/pkg/common" "os"
) )
type Config struct { type Config struct {
@ -112,34 +112,34 @@ func NewConfig() *Config {
errPrefix := "Config Validation Failed: " errPrefix := "Config Validation Failed: "
if *c.WorkerNumber <= 0 { if *c.WorkerNumber <= 0 {
panic(fmt.Errorf(errPrefix+ panic(fmt.Errorf(errPrefix+
"WorkerNumber %v should be positive", "WorkerNumber %v should be positive",
*c.WorkerNumber)) *c.WorkerNumber))
} }
if *c.CRDEstablishedCheckIntervalSec < 1 { if *c.CRDEstablishedCheckIntervalSec < 1 {
panic(fmt.Errorf(errPrefix+ panic(fmt.Errorf(errPrefix+
"CRDEstablishedCheckIntervalSec %v should not be less than 1", "CRDEstablishedCheckIntervalSec %v should not be less than 1",
*c.CRDEstablishedCheckIntervalSec)) *c.CRDEstablishedCheckIntervalSec))
} }
if *c.CRDEstablishedCheckTimeoutSec < 10 { if *c.CRDEstablishedCheckTimeoutSec < 10 {
panic(fmt.Errorf(errPrefix+ panic(fmt.Errorf(errPrefix+
"CRDEstablishedCheckTimeoutSec %v should not be less than 10", "CRDEstablishedCheckTimeoutSec %v should not be less than 10",
*c.CRDEstablishedCheckTimeoutSec)) *c.CRDEstablishedCheckTimeoutSec))
} }
if *c.ObjectLocalCacheCreationTimeoutSec < 60 { if *c.ObjectLocalCacheCreationTimeoutSec < 60 {
panic(fmt.Errorf(errPrefix+ panic(fmt.Errorf(errPrefix+
"ObjectLocalCacheCreationTimeoutSec %v should not be less than 60", "ObjectLocalCacheCreationTimeoutSec %v should not be less than 60",
*c.ObjectLocalCacheCreationTimeoutSec)) *c.ObjectLocalCacheCreationTimeoutSec))
} }
if *c.FrameworkMinRetryDelaySecForTransientConflictFailed < 0 { if *c.FrameworkMinRetryDelaySecForTransientConflictFailed < 0 {
panic(fmt.Errorf(errPrefix+ panic(fmt.Errorf(errPrefix+
"FrameworkMinRetryDelaySecForTransientConflictFailed %v should not be negative", "FrameworkMinRetryDelaySecForTransientConflictFailed %v should not be negative",
*c.FrameworkMinRetryDelaySecForTransientConflictFailed)) *c.FrameworkMinRetryDelaySecForTransientConflictFailed))
} }
if *c.FrameworkMaxRetryDelaySecForTransientConflictFailed < if *c.FrameworkMaxRetryDelaySecForTransientConflictFailed <
*c.FrameworkMinRetryDelaySecForTransientConflictFailed { *c.FrameworkMinRetryDelaySecForTransientConflictFailed {
panic(fmt.Errorf(errPrefix+ panic(fmt.Errorf(errPrefix+
"FrameworkMaxRetryDelaySecForTransientConflictFailed %v should not be less than "+ "FrameworkMaxRetryDelaySecForTransientConflictFailed %v should not be less than "+
"FrameworkMinRetryDelaySecForTransientConflictFailed %v", "FrameworkMinRetryDelaySecForTransientConflictFailed %v",
*c.FrameworkMaxRetryDelaySecForTransientConflictFailed, *c.FrameworkMaxRetryDelaySecForTransientConflictFailed,
*c.FrameworkMinRetryDelaySecForTransientConflictFailed)) *c.FrameworkMinRetryDelaySecForTransientConflictFailed))
} }
@ -177,15 +177,15 @@ func initConfig() *Config {
return &c return &c
} }
func BuildKubeConfig(cConfig *Config) (*rest.Config) { func BuildKubeConfig(cConfig *Config) *rest.Config {
kConfig, err := clientcmd.BuildConfigFromFlags( kConfig, err := clientcmd.BuildConfigFromFlags(
*cConfig.KubeApiServerAddress, *cConfig.KubeConfigFilePath) *cConfig.KubeApiServerAddress, *cConfig.KubeConfigFilePath)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to build KubeConfig, please ensure "+ panic(fmt.Errorf("Failed to build KubeConfig, please ensure "+
"config kubeApiServerAddress or config kubeConfigFilePath or "+ "config kubeApiServerAddress or config kubeConfigFilePath or "+
"${KUBE_APISERVER_ADDRESS} or ${KUBECONFIG} or ${HOME}/.kube/config or "+ "${KUBE_APISERVER_ADDRESS} or ${KUBECONFIG} or ${HOME}/.kube/config or "+
"${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT} is valid: "+ "${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT} is valid: "+
"Error: %v", err)) "Error: %v", err))
} }
return kConfig return kConfig
} }

Просмотреть файл

@ -23,8 +23,8 @@
package v1 package v1
import ( import (
"os"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
"os"
) )
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////

Просмотреть файл

@ -23,9 +23,9 @@
package v1 package v1
import ( import (
"github.com/microsoft/frameworkcontroller/pkg/common"
apiExtensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiExtensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/microsoft/frameworkcontroller/pkg/common"
) )
const ( const (

Просмотреть файл

@ -24,13 +24,13 @@ package v1
import ( import (
"fmt" "fmt"
"strings" "github.com/microsoft/frameworkcontroller/pkg/common"
"strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/microsoft/frameworkcontroller/pkg/common" "k8s.io/apimachinery/pkg/types"
"strconv"
"strings"
) )
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////
@ -63,7 +63,7 @@ func GetFrameworkAttemptInstanceUID(frameworkAttemptID int32, configMapUID *type
} }
func SplitFrameworkAttemptInstanceUID(frameworkAttemptInstanceUID *types.UID) ( func SplitFrameworkAttemptInstanceUID(frameworkAttemptInstanceUID *types.UID) (
frameworkAttemptID int32, configMapUID *types.UID) { frameworkAttemptID int32, configMapUID *types.UID) {
parts := strings.Split(string(*frameworkAttemptInstanceUID), "_") parts := strings.Split(string(*frameworkAttemptInstanceUID), "_")
i, err := strconv.ParseInt(parts[0], 10, 32) i, err := strconv.ParseInt(parts[0], 10, 32)
if err != nil { if err != nil {
@ -79,7 +79,7 @@ func GetTaskAttemptInstanceUID(taskAttemptID int32, podUID *types.UID) *types.UI
} }
func SplitTaskAttemptInstanceUID(taskAttemptInstanceUID *types.UID) ( func SplitTaskAttemptInstanceUID(taskAttemptInstanceUID *types.UID) (
taskAttemptID int32, podUID *types.UID) { taskAttemptID int32, podUID *types.UID) {
parts := strings.Split(string(*taskAttemptInstanceUID), "_") parts := strings.Split(string(*taskAttemptInstanceUID), "_")
i, err := strconv.ParseInt(parts[0], 10, 32) i, err := strconv.ParseInt(parts[0], 10, 32)
if err != nil { if err != nil {
@ -193,14 +193,14 @@ func (ts *TaskStatus) IsRunning() bool {
func (f *Framework) IsCompleting() bool { func (f *Framework) IsCompleting() bool {
return f.Status.State == FrameworkAttemptDeletionPending || return f.Status.State == FrameworkAttemptDeletionPending ||
f.Status.State == FrameworkAttemptDeletionRequested || f.Status.State == FrameworkAttemptDeletionRequested ||
f.Status.State == FrameworkAttemptDeleting f.Status.State == FrameworkAttemptDeleting
} }
func (ts *TaskStatus) IsCompleting() bool { func (ts *TaskStatus) IsCompleting() bool {
return ts.State == TaskAttemptDeletionPending || return ts.State == TaskAttemptDeletionPending ||
ts.State == TaskAttemptDeletionRequested || ts.State == TaskAttemptDeletionRequested ||
ts.State == TaskAttemptDeleting ts.State == TaskAttemptDeleting
} }
func (ct CompletionType) IsSucceeded() bool { func (ct CompletionType) IsSucceeded() bool {
@ -419,7 +419,7 @@ func (f *Framework) NewFrameworkStatus() *FrameworkStatus {
} }
func (f *Framework) NewFrameworkAttemptStatus( func (f *Framework) NewFrameworkAttemptStatus(
frameworkAttemptID int32) FrameworkAttemptStatus { frameworkAttemptID int32) FrameworkAttemptStatus {
return FrameworkAttemptStatus{ return FrameworkAttemptStatus{
ID: frameworkAttemptID, ID: frameworkAttemptID,
StartTime: meta.Now(), StartTime: meta.Now(),
@ -461,7 +461,7 @@ func (f *Framework) NewTaskStatus(taskRoleName string, taskIndex int32) *TaskSta
} }
func (f *Framework) NewTaskAttemptStatus( func (f *Framework) NewTaskAttemptStatus(
taskRoleName string, taskIndex int32, taskAttemptID int32) TaskAttemptStatus { taskRoleName string, taskIndex int32, taskAttemptID int32) TaskAttemptStatus {
return TaskAttemptStatus{ return TaskAttemptStatus{
ID: taskAttemptID, ID: taskAttemptID,
StartTime: meta.Now(), StartTime: meta.Now(),
@ -510,16 +510,16 @@ func (rd RetryDecision) String() string {
} }
func (rp RetryPolicySpec) ShouldRetry( func (rp RetryPolicySpec) ShouldRetry(
rps RetryPolicyStatus, rps RetryPolicyStatus,
cs *CompletionStatus, cs *CompletionStatus,
minDelaySecForTransientConflictFailed int64, minDelaySecForTransientConflictFailed int64,
maxDelaySecForTransientConflictFailed int64) RetryDecision { maxDelaySecForTransientConflictFailed int64) RetryDecision {
ct := cs.Type ct := cs.Type
// 0. Built-in Always-on RetryPolicy // 0. Built-in Always-on RetryPolicy
if cs.Code == CompletionCodePodSpecInvalid || if cs.Code == CompletionCodePodSpecInvalid ||
cs.Code == CompletionCodeStopFrameworkRequested || cs.Code == CompletionCodeStopFrameworkRequested ||
cs.Code == CompletionCodeFrameworkAttemptCompletion { cs.Code == CompletionCodeFrameworkAttemptCompletion {
return RetryDecision{false, true, 0, cs.Diagnostics} return RetryDecision{false, true, 0, cs.Diagnostics}
} }
@ -547,8 +547,8 @@ func (rp RetryPolicySpec) ShouldRetry(
// 2. NormalRetryPolicy // 2. NormalRetryPolicy
if (rp.MaxRetryCount == ExtendedUnlimitedValue) || if (rp.MaxRetryCount == ExtendedUnlimitedValue) ||
(ct.IsFailed() && rp.MaxRetryCount == UnlimitedValue) || (ct.IsFailed() && rp.MaxRetryCount == UnlimitedValue) ||
(ct.IsFailed() && rps.AccountableRetriedCount < rp.MaxRetryCount) { (ct.IsFailed() && rps.AccountableRetriedCount < rp.MaxRetryCount) {
return RetryDecision{true, true, 0, fmt.Sprintf( return RetryDecision{true, true, 0, fmt.Sprintf(
"AccountableRetriedCount %v has not reached MaxRetryCount %v", "AccountableRetriedCount %v has not reached MaxRetryCount %v",
rps.AccountableRetriedCount, rp.MaxRetryCount)} rps.AccountableRetriedCount, rp.MaxRetryCount)}
@ -584,7 +584,7 @@ func (f *Framework) TransitionFrameworkState(dstState FrameworkState) {
// This is the only interface to modify TaskState // This is the only interface to modify TaskState
func (f *Framework) TransitionTaskState( func (f *Framework) TransitionTaskState(
taskRoleName string, taskIndex int32, dstState TaskState) { taskRoleName string, taskIndex int32, dstState TaskState) {
taskStatus := f.TaskStatus(taskRoleName, taskIndex) taskStatus := f.TaskStatus(taskRoleName, taskIndex)
srcState := taskStatus.State srcState := taskStatus.State
if srcState == dstState { if srcState == dstState {

Просмотреть файл

@ -24,15 +24,15 @@ package v1
import ( import (
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
) )
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type FrameworkList struct { type FrameworkList struct {
meta.TypeMeta `json:",inline"` meta.TypeMeta `json:",inline"`
meta.ListMeta `json:"metadata"` meta.ListMeta `json:"metadata"`
Items []Framework `json:"items"` Items []Framework `json:"items"`
} }
// +genclient // +genclient
@ -79,10 +79,10 @@ type FrameworkList struct {
// 2. Do not change the OwnerReferences of the managed ConfigMap and Pods. // 2. Do not change the OwnerReferences of the managed ConfigMap and Pods.
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
type Framework struct { type Framework struct {
meta.TypeMeta `json:",inline"` meta.TypeMeta `json:",inline"`
meta.ObjectMeta `json:"metadata"` meta.ObjectMeta `json:"metadata"`
Spec FrameworkSpec `json:"spec"` Spec FrameworkSpec `json:"spec"`
Status *FrameworkStatus `json:"status"` Status *FrameworkStatus `json:"status"`
} }
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////

Просмотреть файл

@ -24,21 +24,21 @@ package barrier
import ( import (
"fmt" "fmt"
ci "github.com/microsoft/frameworkcontroller/pkg/apis/frameworkcontroller/v1"
frameworkClient "github.com/microsoft/frameworkcontroller/pkg/client/clientset/versioned"
"github.com/microsoft/frameworkcontroller/pkg/common"
"github.com/microsoft/frameworkcontroller/pkg/util"
log "github.com/sirupsen/logrus"
"io/ioutil"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeClient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"io/ioutil"
log "github.com/sirupsen/logrus"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
kubeClient "k8s.io/client-go/kubernetes"
frameworkClient "github.com/microsoft/frameworkcontroller/pkg/client/clientset/versioned"
ci "github.com/microsoft/frameworkcontroller/pkg/apis/frameworkcontroller/v1"
"github.com/microsoft/frameworkcontroller/pkg/util"
"github.com/microsoft/frameworkcontroller/pkg/common"
) )
// FrameworkController Extension: FrameworkBarrier // FrameworkController Extension: FrameworkBarrier
@ -160,19 +160,19 @@ func newConfig() *Config {
errPrefix := "Validation Failed: " errPrefix := "Validation Failed: "
if c.FrameworkName == "" { if c.FrameworkName == "" {
log.Errorf(errPrefix+ log.Errorf(errPrefix+
"${%v} should not be empty", "${%v} should not be empty",
ci.EnvNameFrameworkName) ci.EnvNameFrameworkName)
exit(ci.CompletionCodeContainerPermanentFailed) exit(ci.CompletionCodeContainerPermanentFailed)
} }
if c.BarrierCheckIntervalSec < 5 { if c.BarrierCheckIntervalSec < 5 {
log.Errorf(errPrefix+ log.Errorf(errPrefix+
"${%v} %v should not be less than 5", "${%v} %v should not be less than 5",
EnvNameBarrierCheckIntervalSec, c.BarrierCheckIntervalSec) EnvNameBarrierCheckIntervalSec, c.BarrierCheckIntervalSec)
exit(ci.CompletionCodeContainerPermanentFailed) exit(ci.CompletionCodeContainerPermanentFailed)
} }
if c.BarrierCheckTimeoutSec < 60 || c.BarrierCheckTimeoutSec > 20*60 { if c.BarrierCheckTimeoutSec < 60 || c.BarrierCheckTimeoutSec > 20*60 {
log.Errorf(errPrefix+ log.Errorf(errPrefix+
"${%v} %v should not be less than 60 or greater than 20 * 60", "${%v} %v should not be less than 60 or greater than 20 * 60",
EnvNameBarrierCheckTimeoutSec, c.BarrierCheckTimeoutSec) EnvNameBarrierCheckTimeoutSec, c.BarrierCheckTimeoutSec)
exit(ci.CompletionCodeContainerPermanentFailed) exit(ci.CompletionCodeContainerPermanentFailed)
} }
@ -191,14 +191,14 @@ func defaultKubeConfigFilePath() *string {
return &configPath return &configPath
} }
func buildKubeConfig(bConfig *Config) (*rest.Config) { func buildKubeConfig(bConfig *Config) *rest.Config {
kConfig, err := clientcmd.BuildConfigFromFlags( kConfig, err := clientcmd.BuildConfigFromFlags(
bConfig.KubeApiServerAddress, bConfig.KubeConfigFilePath) bConfig.KubeApiServerAddress, bConfig.KubeConfigFilePath)
if err != nil { if err != nil {
log.Errorf("Failed to build KubeConfig, please ensure "+ log.Errorf("Failed to build KubeConfig, please ensure "+
"${KUBE_APISERVER_ADDRESS} or ${KUBECONFIG} or ${HOME}/.kube/config or "+ "${KUBE_APISERVER_ADDRESS} or ${KUBECONFIG} or ${HOME}/.kube/config or "+
"${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT} is valid: "+ "${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT} is valid: "+
"Error: %v", err) "Error: %v", err)
exit(ci.CompletionCode(1)) exit(ci.CompletionCode(1))
} }
return kConfig return kConfig
@ -257,14 +257,14 @@ func (b *FrameworkBarrier) Run() {
if isPassed { if isPassed {
log.Infof("BarrierPassed: " + log.Infof("BarrierPassed: " +
"All Tasks are ready with not nil PodIP.") "All Tasks are ready with not nil PodIP.")
dumpFramework(f) dumpFramework(f)
generateInjector(f) generateInjector(f)
exit(ci.CompletionCodeSucceeded) exit(ci.CompletionCodeSucceeded)
} else { } else {
if err == nil { if err == nil {
log.Errorf("BarrierNotPassed: " + log.Errorf("BarrierNotPassed: " +
"Timeout to wait all Tasks are ready with not nil PodIP.") "Timeout to wait all Tasks are ready with not nil PodIP.")
exit(ci.CompletionCodeContainerTransientConflictFailed) exit(ci.CompletionCodeContainerTransientConflictFailed)
} else { } else {
log.Errorf("Failed to get Framework object from ApiServer: %v", err) log.Errorf("Failed to get Framework object from ApiServer: %v", err)
@ -297,12 +297,12 @@ func isBarrierPassed(f *ci.Framework) bool {
// Wait until readyTaskCount is consistent with totalTaskCount. // Wait until readyTaskCount is consistent with totalTaskCount.
if readyTaskCount == totalTaskCount { if readyTaskCount == totalTaskCount {
log.Infof("BarrierPassed: "+ log.Infof("BarrierPassed: "+
"%v/%v Tasks are ready with not nil PodIP.", "%v/%v Tasks are ready with not nil PodIP.",
readyTaskCount, totalTaskCount) readyTaskCount, totalTaskCount)
return true return true
} else { } else {
log.Warnf("BarrierNotPassed: "+ log.Warnf("BarrierNotPassed: "+
"%v/%v Tasks are ready with not nil PodIP.", "%v/%v Tasks are ready with not nil PodIP.",
readyTaskCount, totalTaskCount) readyTaskCount, totalTaskCount)
return false return false
} }
@ -310,7 +310,7 @@ func isBarrierPassed(f *ci.Framework) bool {
func isTaskReady(taskStatus *ci.TaskStatus) bool { func isTaskReady(taskStatus *ci.TaskStatus) bool {
return taskStatus.AttemptStatus.PodIP != nil && return taskStatus.AttemptStatus.PodIP != nil &&
*taskStatus.AttemptStatus.PodIP != "" *taskStatus.AttemptStatus.PodIP != ""
} }
func dumpFramework(f *ci.Framework) { func dumpFramework(f *ci.Framework) {
@ -410,16 +410,16 @@ func exit(cc ci.CompletionCode) {
log.Infof(logPfx + "success.") log.Infof(logPfx + "success.")
} else if cc == ci.CompletionCodeContainerTransientFailed { } else if cc == ci.CompletionCodeContainerTransientFailed {
log.Errorf(logPfx + log.Errorf(logPfx +
"transient failure to tell controller to retry.") "transient failure to tell controller to retry.")
} else if cc == ci.CompletionCodeContainerTransientConflictFailed { } else if cc == ci.CompletionCodeContainerTransientConflictFailed {
log.Errorf(logPfx + log.Errorf(logPfx +
"transient conflict failure to tell controller to back off retry.") "transient conflict failure to tell controller to back off retry.")
} else if cc == ci.CompletionCodeContainerPermanentFailed { } else if cc == ci.CompletionCodeContainerPermanentFailed {
log.Errorf(logPfx + log.Errorf(logPfx +
"permanent failure to tell controller not to retry.") "permanent failure to tell controller not to retry.")
} else { } else {
log.Errorf(logPfx + log.Errorf(logPfx +
"unknown failure to tell controller to retry within maxRetryCount.") "unknown failure to tell controller to retry within maxRetryCount.")
} }
os.Exit(int(cc)) os.Exit(int(cc))

Просмотреть файл

@ -23,8 +23,8 @@
package common package common
import ( import (
"io"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io"
) )
type Empty struct{} type Empty struct{}

Просмотреть файл

@ -23,19 +23,19 @@
package common package common
import ( import (
"encoding/json"
"flag"
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"math/rand"
"os"
"strings" "strings"
"time" "time"
"os"
"flag"
"math/rand"
"io/ioutil"
"gopkg.in/yaml.v2"
"encoding/json"
log "github.com/sirupsen/logrus"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/types"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func Quote(s string) string { func Quote(s string) string {

Просмотреть файл

@ -24,32 +24,32 @@ package controller
import ( import (
"fmt" "fmt"
"time" ci "github.com/microsoft/frameworkcontroller/pkg/apis/frameworkcontroller/v1"
"reflect"
"strings"
"sync"
log "github.com/sirupsen/logrus"
errorWrap "github.com/pkg/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/types"
core "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/rest"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
errorAgg "k8s.io/apimachinery/pkg/util/errors"
kubeClient "k8s.io/client-go/kubernetes"
kubeInformer "k8s.io/client-go/informers"
coreLister "k8s.io/client-go/listers/core/v1"
frameworkClient "github.com/microsoft/frameworkcontroller/pkg/client/clientset/versioned" frameworkClient "github.com/microsoft/frameworkcontroller/pkg/client/clientset/versioned"
frameworkInformer "github.com/microsoft/frameworkcontroller/pkg/client/informers/externalversions" frameworkInformer "github.com/microsoft/frameworkcontroller/pkg/client/informers/externalversions"
frameworkLister "github.com/microsoft/frameworkcontroller/pkg/client/listers/frameworkcontroller/v1" frameworkLister "github.com/microsoft/frameworkcontroller/pkg/client/listers/frameworkcontroller/v1"
ci "github.com/microsoft/frameworkcontroller/pkg/apis/frameworkcontroller/v1"
"github.com/microsoft/frameworkcontroller/pkg/util"
"github.com/microsoft/frameworkcontroller/pkg/common" "github.com/microsoft/frameworkcontroller/pkg/common"
"github.com/microsoft/frameworkcontroller/pkg/util"
errorWrap "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
core "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
errorAgg "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeInformer "k8s.io/client-go/informers"
kubeClient "k8s.io/client-go/kubernetes"
coreLister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"reflect"
"strings"
"sync"
"time"
) )
// FrameworkController maintains the lifecycle for all Frameworks in the cluster. // FrameworkController maintains the lifecycle for all Frameworks in the cluster.
@ -463,8 +463,8 @@ func (c *FrameworkController) syncFramework(key string) (returnedErr error) {
// returnedErr is already prefixed with logPfx // returnedErr is already prefixed with logPfx
log.Warnf(returnedErr.Error()) log.Warnf(returnedErr.Error())
log.Warnf(logPfx + log.Warnf(logPfx +
"Failed to due to Platform Transient Error. " + "Failed to due to Platform Transient Error. " +
"Will enqueue it again after rate limited delay") "Will enqueue it again after rate limited delay")
} }
log.Infof(logPfx+"Completed: Duration %v", time.Since(startTime)) log.Infof(logPfx+"Completed: Duration %v", time.Since(startTime))
}() }()
@ -473,8 +473,8 @@ func (c *FrameworkController) syncFramework(key string) (returnedErr error) {
if err != nil { if err != nil {
// Unreachable // Unreachable
panic(fmt.Errorf(logPfx+ panic(fmt.Errorf(logPfx+
"Failed: Got invalid key from queue, but the queue should only contain "+ "Failed: Got invalid key from queue, but the queue should only contain "+
"valid keys: %v", err)) "valid keys: %v", err))
} }
localF, err := c.fLister.Frameworks(namespace).Get(name) localF, err := c.fLister.Frameworks(namespace).Get(name)
@ -483,19 +483,19 @@ func (c *FrameworkController) syncFramework(key string) (returnedErr error) {
// GarbageCollectionController will handle the dependent object // GarbageCollectionController will handle the dependent object
// deletion according to the ownerReferences. // deletion according to the ownerReferences.
log.Infof(logPfx+ log.Infof(logPfx+
"Skipped: Framework cannot be found in local cache: %v", err) "Skipped: Framework cannot be found in local cache: %v", err)
c.deleteExpectedFrameworkStatusInfo(key) c.deleteExpectedFrameworkStatusInfo(key)
return nil return nil
} else { } else {
return fmt.Errorf(logPfx+ return fmt.Errorf(logPfx+
"Failed: Framework cannot be got from local cache: %v", err) "Failed: Framework cannot be got from local cache: %v", err)
} }
} else { } else {
if localF.DeletionTimestamp != nil { if localF.DeletionTimestamp != nil {
// Skip syncFramework to avoid fighting with GarbageCollectionController, // Skip syncFramework to avoid fighting with GarbageCollectionController,
// because GarbageCollectionController may be deleting the dependent object. // because GarbageCollectionController may be deleting the dependent object.
log.Infof(logPfx+ log.Infof(logPfx+
"Skipped: Framework is deleting: Will be deleted at %v", "Skipped: Framework is deleting: Will be deleted at %v",
localF.DeletionTimestamp) localF.DeletionTimestamp)
return nil return nil
} else { } else {
@ -550,8 +550,8 @@ func (c *FrameworkController) syncFramework(key string) (returnedErr error) {
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, updateErr == nil) c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, updateErr == nil)
} else { } else {
log.Infof(logPfx + log.Infof(logPfx +
"Skip to update the expected and remote Framework.Status since " + "Skip to update the expected and remote Framework.Status since " +
"they are unchanged") "they are unchanged")
} }
return errorAgg.NewAggregate(errs) return errorAgg.NewAggregate(errs)
@ -590,7 +590,7 @@ func (c *FrameworkController) recoverTimeoutChecks(f *ci.Framework) {
} }
func (c *FrameworkController) enqueueFrameworkAttemptCreationTimeoutCheck( func (c *FrameworkController) enqueueFrameworkAttemptCreationTimeoutCheck(
f *ci.Framework, failIfTimeout bool) bool { f *ci.Framework, failIfTimeout bool) bool {
if f.Status.State != ci.FrameworkAttemptCreationRequested { if f.Status.State != ci.FrameworkAttemptCreationRequested {
return false return false
} }
@ -609,8 +609,8 @@ func (c *FrameworkController) enqueueFrameworkAttemptCreationTimeoutCheck(
} }
func (c *FrameworkController) enqueueTaskAttemptCreationTimeoutCheck( func (c *FrameworkController) enqueueTaskAttemptCreationTimeoutCheck(
f *ci.Framework, taskRoleName string, taskIndex int32, f *ci.Framework, taskRoleName string, taskIndex int32,
failIfTimeout bool) bool { failIfTimeout bool) bool {
taskStatus := f.TaskStatus(taskRoleName, taskIndex) taskStatus := f.TaskStatus(taskRoleName, taskIndex)
if taskStatus.State != ci.TaskAttemptCreationRequested { if taskStatus.State != ci.TaskAttemptCreationRequested {
return false return false
@ -630,7 +630,7 @@ func (c *FrameworkController) enqueueTaskAttemptCreationTimeoutCheck(
} }
func (c *FrameworkController) enqueueFrameworkRetryDelayTimeoutCheck( func (c *FrameworkController) enqueueFrameworkRetryDelayTimeoutCheck(
f *ci.Framework, failIfTimeout bool) bool { f *ci.Framework, failIfTimeout bool) bool {
if f.Status.State != ci.FrameworkAttemptCompleted { if f.Status.State != ci.FrameworkAttemptCompleted {
return false return false
} }
@ -649,8 +649,8 @@ func (c *FrameworkController) enqueueFrameworkRetryDelayTimeoutCheck(
} }
func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck( func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck(
f *ci.Framework, taskRoleName string, taskIndex int32, f *ci.Framework, taskRoleName string, taskIndex int32,
failIfTimeout bool) bool { failIfTimeout bool) bool {
taskStatus := f.TaskStatus(taskRoleName, taskIndex) taskStatus := f.TaskStatus(taskRoleName, taskIndex)
if taskStatus.State != ci.TaskAttemptCompleted { if taskStatus.State != ci.TaskAttemptCompleted {
return false return false
@ -720,13 +720,13 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
} else { } else {
if c.enqueueFrameworkAttemptCreationTimeoutCheck(f, true) { if c.enqueueFrameworkAttemptCreationTimeoutCheck(f, true) {
log.Infof(logPfx + log.Infof(logPfx +
"Waiting ConfigMap to appear in the local cache or timeout") "Waiting ConfigMap to appear in the local cache or timeout")
return nil return nil
} }
diag = fmt.Sprintf( diag = fmt.Sprintf(
"ConfigMap does not appear in the local cache within timeout %v, "+ "ConfigMap does not appear in the local cache within timeout %v, "+
"so consider it was deleted and force delete it", "so consider it was deleted and force delete it",
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec)) common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
code = ci.CompletionCodeConfigMapCreationTimeout code = ci.CompletionCodeConfigMapCreationTimeout
log.Warnf(logPfx + diag) log.Warnf(logPfx + diag)
@ -774,7 +774,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
// The deletion requested object will never appear again with the same UID, // The deletion requested object will never appear again with the same UID,
// so always just wait. // so always just wait.
log.Infof(logPfx + log.Infof(logPfx +
"Waiting ConfigMap to disappearing or disappear in the local cache") "Waiting ConfigMap to disappearing or disappear in the local cache")
} else { } else {
// At this point, f.Status.State must be in: // At this point, f.Status.State must be in:
// {FrameworkAttemptCreationRequested, FrameworkAttemptPreparing, // {FrameworkAttemptCreationRequested, FrameworkAttemptPreparing,
@ -789,7 +789,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
diag := fmt.Sprintf("ConfigMap is being deleted by others") diag := fmt.Sprintf("ConfigMap is being deleted by others")
log.Warnf(logPfx + diag) log.Warnf(logPfx + diag)
f.Status.AttemptStatus.CompletionStatus = f.Status.AttemptStatus.CompletionStatus =
ci.CompletionCodeConfigMapExternalDeleted.NewCompletionStatus(diag) ci.CompletionCodeConfigMapExternalDeleted.NewCompletionStatus(diag)
} }
f.TransitionFrameworkState(ci.FrameworkAttemptDeleting) f.TransitionFrameworkState(ci.FrameworkAttemptDeleting)
@ -815,14 +815,14 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
if retryDecision.ShouldRetry { if retryDecision.ShouldRetry {
// scheduleToRetryFramework // scheduleToRetryFramework
log.Infof(logPfx+ log.Infof(logPfx+
"Will retry Framework with new FrameworkAttempt: RetryDecision: %v", "Will retry Framework with new FrameworkAttempt: RetryDecision: %v",
retryDecision) retryDecision)
f.Status.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec f.Status.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec
} else { } else {
// completeFramework // completeFramework
log.Infof(logPfx+ log.Infof(logPfx+
"Will complete Framework: RetryDecision: %v", "Will complete Framework: RetryDecision: %v",
retryDecision) retryDecision)
f.Status.CompletionTime = common.PtrNow() f.Status.CompletionTime = common.PtrNow()
@ -836,8 +836,8 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
// should be executed now. // should be executed now.
if f.Spec.ExecutionType == ci.ExecutionStop { if f.Spec.ExecutionType == ci.ExecutionStop {
log.Infof(logPfx + log.Infof(logPfx +
"User has requested to stop the Framework, " + "User has requested to stop the Framework, " +
"so immediately retry without delay") "so immediately retry without delay")
} else { } else {
if c.enqueueFrameworkRetryDelayTimeoutCheck(f, true) { if c.enqueueFrameworkRetryDelayTimeoutCheck(f, true) {
log.Infof(logPfx + "Waiting Framework to retry after delay") log.Infof(logPfx + "Waiting Framework to retry after delay")
@ -898,7 +898,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
// The ground truth cm is the local cached one instead of the remote one, // The ground truth cm is the local cached one instead of the remote one,
// so need to wait before continue the sync. // so need to wait before continue the sync.
log.Infof(logPfx + log.Infof(logPfx +
"Waiting ConfigMap to appear in the local cache or timeout") "Waiting ConfigMap to appear in the local cache or timeout")
return nil return nil
} }
// At this point, f.Status.State must be in: // At this point, f.Status.State must be in:
@ -906,9 +906,9 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
// FrameworkAttemptDeletionRequested, FrameworkAttemptDeleting} // FrameworkAttemptDeletionRequested, FrameworkAttemptDeleting}
if f.Status.State == ci.FrameworkAttemptPreparing || if f.Status.State == ci.FrameworkAttemptPreparing ||
f.Status.State == ci.FrameworkAttemptRunning || f.Status.State == ci.FrameworkAttemptRunning ||
f.Status.State == ci.FrameworkAttemptDeletionRequested || f.Status.State == ci.FrameworkAttemptDeletionRequested ||
f.Status.State == ci.FrameworkAttemptDeleting { f.Status.State == ci.FrameworkAttemptDeleting {
if !f.IsCompleting() { if !f.IsCompleting() {
if f.Spec.ExecutionType == ci.ExecutionStop { if f.Spec.ExecutionType == ci.ExecutionStop {
diag := fmt.Sprintf("User has requested to stop the Framework") diag := fmt.Sprintf("User has requested to stop the Framework")
@ -921,7 +921,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
err := c.syncTaskRoleStatuses(f, cm) err := c.syncTaskRoleStatuses(f, cm)
if f.Status.State == ci.FrameworkAttemptPreparing || if f.Status.State == ci.FrameworkAttemptPreparing ||
f.Status.State == ci.FrameworkAttemptRunning { f.Status.State == ci.FrameworkAttemptRunning {
if !f.IsAnyTaskRunning() { if !f.IsAnyTaskRunning() {
f.TransitionFrameworkState(ci.FrameworkAttemptPreparing) f.TransitionFrameworkState(ci.FrameworkAttemptPreparing)
} else { } else {
@ -933,8 +933,8 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
} else { } else {
// Unreachable // Unreachable
panic(fmt.Errorf(logPfx+ panic(fmt.Errorf(logPfx+
"Failed: At this point, FrameworkState should be in "+ "Failed: At this point, FrameworkState should be in "+
"{%v, %v, %v, %v} instead of %v", "{%v, %v, %v, %v} instead of %v",
ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning, ci.FrameworkAttemptPreparing, ci.FrameworkAttemptRunning,
ci.FrameworkAttemptDeletionRequested, ci.FrameworkAttemptDeleting, ci.FrameworkAttemptDeletionRequested, ci.FrameworkAttemptDeleting,
f.Status.State)) f.Status.State))
@ -948,7 +948,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
// Clean up instead of recovery is because the ConfigMapUID is always the ground // Clean up instead of recovery is because the ConfigMapUID is always the ground
// truth. // truth.
func (c *FrameworkController) getOrCleanupConfigMap( func (c *FrameworkController) getOrCleanupConfigMap(
f *ci.Framework, force bool) (cm *core.ConfigMap, err error) { f *ci.Framework, force bool) (cm *core.ConfigMap, err error) {
cmName := f.ConfigMapName() cmName := f.ConfigMapName()
if force { if force {
@ -991,7 +991,7 @@ func (c *FrameworkController) getOrCleanupConfigMap(
// Using UID to ensure we delete the right object. // Using UID to ensure we delete the right object.
// The cmUID should be controlled by f. // The cmUID should be controlled by f.
func (c *FrameworkController) deleteConfigMap( func (c *FrameworkController) deleteConfigMap(
f *ci.Framework, cmUID types.UID, force bool) error { f *ci.Framework, cmUID types.UID, force bool) error {
cmName := f.ConfigMapName() cmName := f.ConfigMapName()
errPfx := fmt.Sprintf( errPfx := fmt.Sprintf(
"[%v]: Failed to delete ConfigMap %v, %v: force: %v: ", "[%v]: Failed to delete ConfigMap %v, %v: force: %v: ",
@ -1014,12 +1014,12 @@ func (c *FrameworkController) deleteConfigMap(
if getErr != nil { if getErr != nil {
if !apiErrors.IsNotFound(getErr) { if !apiErrors.IsNotFound(getErr) {
return fmt.Errorf(errPfx+ return fmt.Errorf(errPfx+
"ConfigMap cannot be got from remote: %v", getErr) "ConfigMap cannot be got from remote: %v", getErr)
} }
} else { } else {
if cmUID == cm.UID { if cmUID == cm.UID {
return fmt.Errorf(errPfx+ return fmt.Errorf(errPfx+
"ConfigMap with DeletionTimestamp %v still exist after deletion", "ConfigMap with DeletionTimestamp %v still exist after deletion",
cm.DeletionTimestamp) cm.DeletionTimestamp)
} }
} }
@ -1033,7 +1033,7 @@ func (c *FrameworkController) deleteConfigMap(
} }
func (c *FrameworkController) createConfigMap( func (c *FrameworkController) createConfigMap(
f *ci.Framework) (*core.ConfigMap, error) { f *ci.Framework) (*core.ConfigMap, error) {
cm := f.NewConfigMap() cm := f.NewConfigMap()
errPfx := fmt.Sprintf( errPfx := fmt.Sprintf(
"[%v]: Failed to create ConfigMap %v: ", "[%v]: Failed to create ConfigMap %v: ",
@ -1046,9 +1046,9 @@ func (c *FrameworkController) createConfigMap(
localCM, getErr := c.cmLister.ConfigMaps(f.Namespace).Get(cm.Name) localCM, getErr := c.cmLister.ConfigMaps(f.Namespace).Get(cm.Name)
if getErr == nil && !meta.IsControlledBy(localCM, f) { if getErr == nil && !meta.IsControlledBy(localCM, f) {
return nil, fmt.Errorf(errPfx+ return nil, fmt.Errorf(errPfx+
"ConfigMap naming conflicts with others: "+ "ConfigMap naming conflicts with others: "+
"Existing ConfigMap %v with DeletionTimestamp %v is not "+ "Existing ConfigMap %v with DeletionTimestamp %v is not "+
"controlled by current Framework %v, %v: %v", "controlled by current Framework %v, %v: %v",
localCM.UID, localCM.DeletionTimestamp, f.Name, f.UID, createErr) localCM.UID, localCM.DeletionTimestamp, f.Name, f.UID, createErr)
} }
} }
@ -1063,7 +1063,7 @@ func (c *FrameworkController) createConfigMap(
} }
func (c *FrameworkController) syncTaskRoleStatuses( func (c *FrameworkController) syncTaskRoleStatuses(
f *ci.Framework, cm *core.ConfigMap) (err error) { f *ci.Framework, cm *core.ConfigMap) (err error) {
logPfx := fmt.Sprintf("[%v]: syncTaskRoleStatuses: ", f.Key()) logPfx := fmt.Sprintf("[%v]: syncTaskRoleStatuses: ", f.Key())
log.Infof(logPfx + "Started") log.Infof(logPfx + "Started")
defer func() { log.Infof(logPfx + "Completed") }() defer func() { log.Infof(logPfx + "Completed") }()
@ -1087,8 +1087,8 @@ func (c *FrameworkController) syncTaskRoleStatuses(
} }
func (c *FrameworkController) syncTaskState( func (c *FrameworkController) syncTaskState(
f *ci.Framework, cm *core.ConfigMap, f *ci.Framework, cm *core.ConfigMap,
taskRoleName string, taskIndex int32) (err error) { taskRoleName string, taskIndex int32) (err error) {
logPfx := fmt.Sprintf("[%v][%v][%v]: syncTaskState: ", logPfx := fmt.Sprintf("[%v][%v][%v]: syncTaskState: ",
f.Key(), taskRoleName, taskIndex) f.Key(), taskRoleName, taskIndex)
log.Infof(logPfx + "Started") log.Infof(logPfx + "Started")
@ -1125,13 +1125,13 @@ func (c *FrameworkController) syncTaskState(
if taskStatus.State == ci.TaskAttemptCreationRequested { if taskStatus.State == ci.TaskAttemptCreationRequested {
if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) { if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) {
log.Infof(logPfx + log.Infof(logPfx +
"Waiting Pod to appear in the local cache or timeout") "Waiting Pod to appear in the local cache or timeout")
return nil return nil
} }
diag := fmt.Sprintf( diag := fmt.Sprintf(
"Pod does not appear in the local cache within timeout %v, "+ "Pod does not appear in the local cache within timeout %v, "+
"so consider it was deleted and force delete it", "so consider it was deleted and force delete it",
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec)) common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
log.Warnf(logPfx + diag) log.Warnf(logPfx + diag)
@ -1178,7 +1178,7 @@ func (c *FrameworkController) syncTaskState(
// The deletion requested object will never appear again with the same UID, // The deletion requested object will never appear again with the same UID,
// so always just wait. // so always just wait.
log.Infof(logPfx + log.Infof(logPfx +
"Waiting Pod to disappearing or disappear in the local cache") "Waiting Pod to disappearing or disappear in the local cache")
return nil return nil
} }
@ -1196,7 +1196,7 @@ func (c *FrameworkController) syncTaskState(
// kills the Pod. // kills the Pod.
if pod.Status.Phase == core.PodUnknown { if pod.Status.Phase == core.PodUnknown {
log.Infof(logPfx+ log.Infof(logPfx+
"Waiting Pod to be deleted or deleting or transitioned from %v", "Waiting Pod to be deleted or deleting or transitioned from %v",
pod.Status.Phase) pod.Status.Phase)
return nil return nil
} }
@ -1237,7 +1237,7 @@ func (c *FrameworkController) syncTaskState(
terminated.Message)) terminated.Message))
if lastContainerExitCode == nil || if lastContainerExitCode == nil ||
lastContainerCompletionTime.Before(terminated.FinishedAt.Time) { lastContainerCompletionTime.Before(terminated.FinishedAt.Time) {
lastContainerExitCode = &terminated.ExitCode lastContainerExitCode = &terminated.ExitCode
lastContainerCompletionTime = terminated.FinishedAt.Time lastContainerCompletionTime = terminated.FinishedAt.Time
} }
@ -1247,7 +1247,7 @@ func (c *FrameworkController) syncTaskState(
if lastContainerExitCode == nil { if lastContainerExitCode == nil {
diag := fmt.Sprintf( diag := fmt.Sprintf(
"Pod failed without any non-zero container exit code, maybe " + "Pod failed without any non-zero container exit code, maybe " +
"stopped by the system") "stopped by the system")
log.Warnf(logPfx + diag) log.Warnf(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, false, c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
ci.CompletionCodePodFailedWithoutFailedContainer.NewCompletionStatus(diag)) ci.CompletionCodePodFailedWithoutFailedContainer.NewCompletionStatus(diag))
@ -1267,14 +1267,14 @@ func (c *FrameworkController) syncTaskState(
return nil return nil
} else { } else {
return fmt.Errorf(logPfx+ return fmt.Errorf(logPfx+
"Failed: Got unrecognized Pod Phase: %v", pod.Status.Phase) "Failed: Got unrecognized Pod Phase: %v", pod.Status.Phase)
} }
} else { } else {
if taskStatus.AttemptStatus.CompletionStatus == nil { if taskStatus.AttemptStatus.CompletionStatus == nil {
diag := fmt.Sprintf("Pod is being deleted by others") diag := fmt.Sprintf("Pod is being deleted by others")
log.Warnf(logPfx + diag) log.Warnf(logPfx + diag)
taskStatus.AttemptStatus.CompletionStatus = taskStatus.AttemptStatus.CompletionStatus =
ci.CompletionCodePodExternalDeleted.NewCompletionStatus(diag) ci.CompletionCodePodExternalDeleted.NewCompletionStatus(diag)
} }
f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeleting) f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeleting)
@ -1298,14 +1298,14 @@ func (c *FrameworkController) syncTaskState(
if retryDecision.ShouldRetry { if retryDecision.ShouldRetry {
// scheduleToRetryTask // scheduleToRetryTask
log.Infof(logPfx+ log.Infof(logPfx+
"Will retry Task with new TaskAttempt: RetryDecision: %v", "Will retry Task with new TaskAttempt: RetryDecision: %v",
retryDecision) retryDecision)
taskStatus.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec taskStatus.RetryPolicyStatus.RetryDelaySec = &retryDecision.DelaySec
} else { } else {
// completeTask // completeTask
log.Infof(logPfx+ log.Infof(logPfx+
"Will complete Task: RetryDecision: %v", "Will complete Task: RetryDecision: %v",
retryDecision) retryDecision)
taskStatus.CompletionTime = common.PtrNow() taskStatus.CompletionTime = common.PtrNow()
@ -1339,7 +1339,7 @@ func (c *FrameworkController) syncTaskState(
if taskStatus.State == ci.TaskAttemptCreationPending { if taskStatus.State == ci.TaskAttemptCreationPending {
if f.IsCompleting() { if f.IsCompleting() {
log.Infof(logPfx + "Skip to createTaskAttempt: " + log.Infof(logPfx + "Skip to createTaskAttempt: " +
"FrameworkAttempt is completing") "FrameworkAttempt is completing")
return nil return nil
} }
@ -1351,7 +1351,7 @@ func (c *FrameworkController) syncTaskState(
// Should be Framework Error instead of Platform Transient Error. // Should be Framework Error instead of Platform Transient Error.
diag := fmt.Sprintf( diag := fmt.Sprintf(
"Pod Spec is invalid in TaskRole [%v]: "+ "Pod Spec is invalid in TaskRole [%v]: "+
"Triggered by Task [%v][%v]: Diagnostics: %v", "Triggered by Task [%v][%v]: Diagnostics: %v",
taskRoleName, taskRoleName, taskIndex, apiErr) taskRoleName, taskRoleName, taskIndex, apiErr)
log.Infof(logPfx + diag) log.Infof(logPfx + diag)
@ -1383,7 +1383,7 @@ func (c *FrameworkController) syncTaskState(
// The ground truth pod is the local cached one instead of the remote one, // The ground truth pod is the local cached one instead of the remote one,
// so need to wait before continue the sync. // so need to wait before continue the sync.
log.Infof(logPfx + log.Infof(logPfx +
"Waiting Pod to appear in the local cache or timeout") "Waiting Pod to appear in the local cache or timeout")
return nil return nil
} }
// At this point, taskStatus.State must be in: // At this point, taskStatus.State must be in:
@ -1392,7 +1392,7 @@ func (c *FrameworkController) syncTaskState(
if taskStatus.State == ci.TaskCompleted { if taskStatus.State == ci.TaskCompleted {
if f.IsCompleting() { if f.IsCompleting() {
log.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " + log.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " +
"FrameworkAttempt is completing") "FrameworkAttempt is completing")
return nil return nil
} }
@ -1406,7 +1406,7 @@ func (c *FrameworkController) syncTaskState(
if failedTaskCount >= minFailedTaskCount { if failedTaskCount >= minFailedTaskCount {
diag := fmt.Sprintf( diag := fmt.Sprintf(
"FailedTaskCount %v has reached MinFailedTaskCount %v in TaskRole [%v]: "+ "FailedTaskCount %v has reached MinFailedTaskCount %v in TaskRole [%v]: "+
"Triggered by Task [%v][%v]: Diagnostics: %v", "Triggered by Task [%v][%v]: Diagnostics: %v",
failedTaskCount, minFailedTaskCount, taskRoleName, failedTaskCount, minFailedTaskCount, taskRoleName,
taskRoleName, taskIndex, taskStatus.AttemptStatus.CompletionStatus.Diagnostics) taskRoleName, taskIndex, taskStatus.AttemptStatus.CompletionStatus.Diagnostics)
log.Infof(logPfx + diag) log.Infof(logPfx + diag)
@ -1421,7 +1421,7 @@ func (c *FrameworkController) syncTaskState(
if succeededTaskCount >= minSucceededTaskCount { if succeededTaskCount >= minSucceededTaskCount {
diag := fmt.Sprintf( diag := fmt.Sprintf(
"SucceededTaskCount %v has reached MinSucceededTaskCount %v in TaskRole [%v]: "+ "SucceededTaskCount %v has reached MinSucceededTaskCount %v in TaskRole [%v]: "+
"Triggered by Task [%v][%v]: Diagnostics: %v", "Triggered by Task [%v][%v]: Diagnostics: %v",
succeededTaskCount, minSucceededTaskCount, taskRoleName, succeededTaskCount, minSucceededTaskCount, taskRoleName,
taskRoleName, taskIndex, taskStatus.AttemptStatus.CompletionStatus.Diagnostics) taskRoleName, taskIndex, taskStatus.AttemptStatus.CompletionStatus.Diagnostics)
log.Infof(logPfx + diag) log.Infof(logPfx + diag)
@ -1436,9 +1436,9 @@ func (c *FrameworkController) syncTaskState(
failedTaskCount := f.GetTaskCount((*ci.TaskStatus).IsFailed) failedTaskCount := f.GetTaskCount((*ci.TaskStatus).IsFailed)
diag := fmt.Sprintf( diag := fmt.Sprintf(
"All Tasks are completed and no user specified conditions in "+ "All Tasks are completed and no user specified conditions in "+
"FrameworkAttemptCompletionPolicy have ever been triggered: "+ "FrameworkAttemptCompletionPolicy have ever been triggered: "+
"TotalTaskCount: %v, FailedTaskCount: %v: "+ "TotalTaskCount: %v, FailedTaskCount: %v: "+
"Triggered by Task [%v][%v]: Diagnostics: %v", "Triggered by Task [%v][%v]: Diagnostics: %v",
totalTaskCount, failedTaskCount, totalTaskCount, failedTaskCount,
taskRoleName, taskIndex, taskStatus.AttemptStatus.CompletionStatus.Diagnostics) taskRoleName, taskIndex, taskStatus.AttemptStatus.CompletionStatus.Diagnostics)
log.Infof(logPfx + diag) log.Infof(logPfx + diag)
@ -1454,7 +1454,7 @@ func (c *FrameworkController) syncTaskState(
// Unreachable // Unreachable
panic(fmt.Errorf(logPfx+ panic(fmt.Errorf(logPfx+
"Failed: At this point, TaskState should be in {} instead of %v", "Failed: At this point, TaskState should be in {} instead of %v",
taskStatus.State)) taskStatus.State))
} }
@ -1464,8 +1464,8 @@ func (c *FrameworkController) syncTaskState(
// writable and may be outdated even if no error. // writable and may be outdated even if no error.
// Clean up instead of recovery is because the PodUID is always the ground truth. // Clean up instead of recovery is because the PodUID is always the ground truth.
func (c *FrameworkController) getOrCleanupPod( func (c *FrameworkController) getOrCleanupPod(
f *ci.Framework, cm *core.ConfigMap, f *ci.Framework, cm *core.ConfigMap,
taskRoleName string, taskIndex int32, force bool) (pod *core.Pod, err error) { taskRoleName string, taskIndex int32, force bool) (pod *core.Pod, err error) {
taskStatus := f.TaskStatus(taskRoleName, taskIndex) taskStatus := f.TaskStatus(taskRoleName, taskIndex)
podName := taskStatus.PodName() podName := taskStatus.PodName()
@ -1508,8 +1508,8 @@ func (c *FrameworkController) getOrCleanupPod(
// Using UID to ensure we delete the right object. // Using UID to ensure we delete the right object.
// The podUID should be controlled by cm. // The podUID should be controlled by cm.
func (c *FrameworkController) deletePod( func (c *FrameworkController) deletePod(
f *ci.Framework, taskRoleName string, taskIndex int32, f *ci.Framework, taskRoleName string, taskIndex int32,
podUID types.UID, force bool) error { podUID types.UID, force bool) error {
taskStatus := f.TaskStatus(taskRoleName, taskIndex) taskStatus := f.TaskStatus(taskRoleName, taskIndex)
podName := taskStatus.PodName() podName := taskStatus.PodName()
errPfx := fmt.Sprintf( errPfx := fmt.Sprintf(
@ -1533,12 +1533,12 @@ func (c *FrameworkController) deletePod(
if getErr != nil { if getErr != nil {
if !apiErrors.IsNotFound(getErr) { if !apiErrors.IsNotFound(getErr) {
return fmt.Errorf(errPfx+ return fmt.Errorf(errPfx+
"Pod cannot be got from remote: %v", getErr) "Pod cannot be got from remote: %v", getErr)
} }
} else { } else {
if podUID == pod.UID { if podUID == pod.UID {
return fmt.Errorf(errPfx+ return fmt.Errorf(errPfx+
"Pod with DeletionTimestamp %v still exist after deletion", "Pod with DeletionTimestamp %v still exist after deletion",
pod.DeletionTimestamp) pod.DeletionTimestamp)
} }
} }
@ -1552,8 +1552,8 @@ func (c *FrameworkController) deletePod(
} }
func (c *FrameworkController) createPod( func (c *FrameworkController) createPod(
f *ci.Framework, cm *core.ConfigMap, f *ci.Framework, cm *core.ConfigMap,
taskRoleName string, taskIndex int32) (*core.Pod, error) { taskRoleName string, taskIndex int32) (*core.Pod, error) {
pod := f.NewPod(cm, taskRoleName, taskIndex) pod := f.NewPod(cm, taskRoleName, taskIndex)
errPfx := fmt.Sprintf( errPfx := fmt.Sprintf(
"[%v][%v][%v]: Failed to create Pod %v", "[%v][%v][%v]: Failed to create Pod %v",
@ -1565,10 +1565,10 @@ func (c *FrameworkController) createPod(
// Best effort to judge if conflict with a not controlled object. // Best effort to judge if conflict with a not controlled object.
localPod, getErr := c.podLister.Pods(f.Namespace).Get(pod.Name) localPod, getErr := c.podLister.Pods(f.Namespace).Get(pod.Name)
if getErr == nil && !meta.IsControlledBy(localPod, cm) { if getErr == nil && !meta.IsControlledBy(localPod, cm) {
return nil, errorWrap.Wrapf(createErr, errPfx + ": "+ return nil, errorWrap.Wrapf(createErr, errPfx+": "+
"Pod naming conflicts with others: "+ "Pod naming conflicts with others: "+
"Existing Pod %v with DeletionTimestamp %v is not "+ "Existing Pod %v with DeletionTimestamp %v is not "+
"controlled by current ConfigMap %v, %v", "controlled by current ConfigMap %v, %v",
localPod.UID, localPod.DeletionTimestamp, cm.Name, cm.UID) localPod.UID, localPod.DeletionTimestamp, cm.Name, cm.UID)
} }
} }
@ -1583,8 +1583,8 @@ func (c *FrameworkController) createPod(
} }
func (c *FrameworkController) completeTaskAttempt( func (c *FrameworkController) completeTaskAttempt(
f *ci.Framework, taskRoleName string, taskIndex int32, f *ci.Framework, taskRoleName string, taskIndex int32,
force bool, completionStatus *ci.CompletionStatus) { force bool, completionStatus *ci.CompletionStatus) {
logPfx := fmt.Sprintf( logPfx := fmt.Sprintf(
"[%v][%v][%v]: completeTaskAttempt: force: %v: ", "[%v][%v][%v]: completeTaskAttempt: force: %v: ",
f.Key(), taskRoleName, taskIndex, force) f.Key(), taskRoleName, taskIndex, force)
@ -1601,11 +1601,11 @@ func (c *FrameworkController) completeTaskAttempt(
if taskStatus.TaskAttemptInstanceUID() == nil { if taskStatus.TaskAttemptInstanceUID() == nil {
log.Infof(logPfx+ log.Infof(logPfx+
"TaskAttempt %v is completed with CompletionStatus: %v", "TaskAttempt %v is completed with CompletionStatus: %v",
taskStatus.TaskAttemptID(), taskStatus.AttemptStatus.CompletionStatus) taskStatus.TaskAttemptID(), taskStatus.AttemptStatus.CompletionStatus)
} else { } else {
log.Infof(logPfx+ log.Infof(logPfx+
"TaskAttemptInstance %v is completed with CompletionStatus: %v", "TaskAttemptInstance %v is completed with CompletionStatus: %v",
*taskStatus.TaskAttemptInstanceUID(), taskStatus.AttemptStatus.CompletionStatus) *taskStatus.TaskAttemptInstanceUID(), taskStatus.AttemptStatus.CompletionStatus)
} }
@ -1625,7 +1625,7 @@ func (c *FrameworkController) completeTaskAttempt(
} }
func (c *FrameworkController) completeFrameworkAttempt( func (c *FrameworkController) completeFrameworkAttempt(
f *ci.Framework, force bool, completionStatus *ci.CompletionStatus) { f *ci.Framework, force bool, completionStatus *ci.CompletionStatus) {
logPfx := fmt.Sprintf( logPfx := fmt.Sprintf(
"[%v]: completeFrameworkAttempt: force: %v: ", "[%v]: completeFrameworkAttempt: force: %v: ",
f.Key(), force) f.Key(), force)
@ -1639,8 +1639,8 @@ func (c *FrameworkController) completeFrameworkAttempt(
for _, taskStatus := range taskRoleStatus.TaskStatuses { for _, taskStatus := range taskRoleStatus.TaskStatuses {
if taskStatus.AttemptStatus.CompletionStatus == nil { if taskStatus.AttemptStatus.CompletionStatus == nil {
taskStatus.AttemptStatus.CompletionStatus = taskStatus.AttemptStatus.CompletionStatus =
ci.CompletionCodeFrameworkAttemptCompletion. ci.CompletionCodeFrameworkAttemptCompletion.
NewCompletionStatus("Stop to complete current FrameworkAttempt") NewCompletionStatus("Stop to complete current FrameworkAttempt")
} }
} }
} }
@ -1665,11 +1665,11 @@ func (c *FrameworkController) completeFrameworkAttempt(
if f.FrameworkAttemptInstanceUID() == nil { if f.FrameworkAttemptInstanceUID() == nil {
log.Infof(logPfx+ log.Infof(logPfx+
"FrameworkAttempt %v is completed with CompletionStatus: %v", "FrameworkAttempt %v is completed with CompletionStatus: %v",
f.FrameworkAttemptID(), f.Status.AttemptStatus.CompletionStatus) f.FrameworkAttemptID(), f.Status.AttemptStatus.CompletionStatus)
} else { } else {
log.Infof(logPfx+ log.Infof(logPfx+
"FrameworkAttemptInstance %v is completed with CompletionStatus: %v", "FrameworkAttemptInstance %v is completed with CompletionStatus: %v",
*f.FrameworkAttemptInstanceUID(), f.Status.AttemptStatus.CompletionStatus) *f.FrameworkAttemptInstanceUID(), f.Status.AttemptStatus.CompletionStatus)
} }
@ -1736,8 +1736,7 @@ func (c *FrameworkController) updateRemoteFrameworkStatus(f *ci.Framework) error
} }
} }
func (c *FrameworkController) getExpectedFrameworkStatusInfo(key string) ( func (c *FrameworkController) getExpectedFrameworkStatusInfo(key string) *ExpectedFrameworkStatusInfo {
*ExpectedFrameworkStatusInfo) {
if value, ok := c.fExpectedStatusInfos.Load(key); ok { if value, ok := c.fExpectedStatusInfos.Load(key); ok {
return value.(*ExpectedFrameworkStatusInfo) return value.(*ExpectedFrameworkStatusInfo)
} else { } else {
@ -1751,7 +1750,7 @@ func (c *FrameworkController) deleteExpectedFrameworkStatusInfo(key string) {
} }
func (c *FrameworkController) updateExpectedFrameworkStatusInfo(key string, func (c *FrameworkController) updateExpectedFrameworkStatusInfo(key string,
status *ci.FrameworkStatus, remoteSynced bool) { status *ci.FrameworkStatus, remoteSynced bool) {
log.Infof("[%v]: updateExpectedFrameworkStatusInfo", key) log.Infof("[%v]: updateExpectedFrameworkStatusInfo", key)
c.fExpectedStatusInfos.Store(key, &ExpectedFrameworkStatusInfo{ c.fExpectedStatusInfos.Store(key, &ExpectedFrameworkStatusInfo{
status: status, status: status,

Просмотреть файл

@ -24,7 +24,7 @@ package util
import ( import (
"fmt" "fmt"
"reflect" "github.com/microsoft/frameworkcontroller/pkg/common"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
apiExtensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiExtensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiClient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiClient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@ -32,12 +32,12 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"github.com/microsoft/frameworkcontroller/pkg/common" "reflect"
) )
func PutCRD( func PutCRD(
config *rest.Config, crd *apiExtensions.CustomResourceDefinition, config *rest.Config, crd *apiExtensions.CustomResourceDefinition,
establishedCheckIntervalSec *int64, establishedCheckTimeoutSec *int64) { establishedCheckIntervalSec *int64, establishedCheckTimeoutSec *int64) {
client := createCRDClient(config) client := createCRDClient(config)
err := putCRDInternal(client, crd, establishedCheckIntervalSec, establishedCheckTimeoutSec) err := putCRDInternal(client, crd, establishedCheckIntervalSec, establishedCheckTimeoutSec)
@ -69,8 +69,8 @@ func createCRDClient(config *rest.Config) apiClient.Interface {
} }
func putCRDInternal( func putCRDInternal(
client apiClient.Interface, newCRD *apiExtensions.CustomResourceDefinition, client apiClient.Interface, newCRD *apiExtensions.CustomResourceDefinition,
establishedCheckIntervalSec *int64, establishedCheckTimeoutSec *int64) error { establishedCheckIntervalSec *int64, establishedCheckTimeoutSec *int64) error {
remoteCRD, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(newCRD.Name, meta.GetOptions{}) remoteCRD, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(newCRD.Name, meta.GetOptions{})
if err == nil { if err == nil {
@ -111,7 +111,7 @@ func putCRDInternal(
func isCRDEstablished(crd *apiExtensions.CustomResourceDefinition) bool { func isCRDEstablished(crd *apiExtensions.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions { for _, cond := range crd.Status.Conditions {
if cond.Status == apiExtensions.ConditionTrue && if cond.Status == apiExtensions.ConditionTrue &&
cond.Type == apiExtensions.Established { cond.Type == apiExtensions.Established {
return true return true
} }
} }

Просмотреть файл

@ -24,16 +24,16 @@ package util
import ( import (
"fmt" "fmt"
frameworkClient "github.com/microsoft/frameworkcontroller/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/rest"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
kubeClient "k8s.io/client-go/kubernetes" kubeClient "k8s.io/client-go/kubernetes"
frameworkClient "github.com/microsoft/frameworkcontroller/pkg/client/clientset/versioned" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
) )
func CreateClients(kConfig *rest.Config) ( func CreateClients(kConfig *rest.Config) (
kubeClient.Interface, frameworkClient.Interface) { kubeClient.Interface, frameworkClient.Interface) {
kClient, err := kubeClient.NewForConfig(kConfig) kClient, err := kubeClient.NewForConfig(kConfig)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to create KubeClient: %v", err)) panic(fmt.Errorf("Failed to create KubeClient: %v", err))