k8s-cronjob-prescaler/controllers/prescaledcronjob_controller...

220 строки
7.7 KiB
Go

package controllers
import (
"context"
"fmt"
"strings"
"time"
pscv1alpha1 "cronprimer.local/api/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/robfig/cron/v3"
)
type testCase struct {
minsApart int
warmUpMins int
shouldPass bool
shouldError bool
}
const (
timeFormat = "15:04:05"
)
var _ = Describe("PrescaledCronJob Controller - Integration Tests", func() {
DescribeTable("Integration test configurations",
func(testCase testCase) {
// generate a random name for the job so parallel jobs don't clash
jobName := "psc-test-int-" + randString()
// make sure we clean up this psc even if the assertions fail / something goes wrong
defer deletePsc(jobName)
result, err := runTest(testCase.minsApart, testCase.warmUpMins, jobName)
Expect(result).To(Equal(testCase.shouldPass))
Expect(err != nil).To(Equal(testCase.shouldError))
},
// add a line per test case here...
Entry("LONG TEST: 1 minute, 1 warmup", testCase{minsApart: 1, warmUpMins: 1, shouldPass: true, shouldError: false}),
Entry("LONG TEST: 3 minutes, 1 warmup", testCase{minsApart: 3, warmUpMins: 2, shouldPass: true, shouldError: false}),
Entry("LONG TEST: 4 minutes, 1 warmup", testCase{minsApart: 4, warmUpMins: 2, shouldPass: true, shouldError: false}),
Entry("LONG TEST: 4 minutes, 3 warmup", testCase{minsApart: 4, warmUpMins: 3, shouldPass: true, shouldError: false}),
Entry("LONG TEST: 5 minutes, 1 warmup", testCase{minsApart: 5, warmUpMins: 4, shouldPass: true, shouldError: false}),
Entry("LONG TEST: 5 minutes, 5 warmup", testCase{minsApart: 5, warmUpMins: 5, shouldPass: true, shouldError: false}),
Entry("LONG TEST: 10 minutes, 2 warmup", testCase{minsApart: 10, warmUpMins: 4, shouldPass: true, shouldError: false}),
)
})
func runTest(minsApart int, warmUpMins int, jobName string) (passed bool, errored error) {
ctx := context.Background()
// construct a prescaled cron in code + post to K8s
toCreate := generatePSCSpec()
toCreate.Name = jobName
autoGenCronName := "autogen-" + jobName
toCreate.Spec.CronJob.Spec.Schedule = fmt.Sprintf("*/%d * * * *", minsApart)
toCreate.Spec.WarmUpTimeMins = warmUpMins
if err := k8sClient.Create(ctx, &toCreate); err != nil {
return false, err
}
time.Sleep(time.Second * 10)
// get the autogenerated cron back
fetchedAutogenCron := &batchv1beta1.CronJob{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: autoGenCronName, Namespace: namespace}, fetchedAutogenCron); err != nil {
return false, err
}
// get the next iteration of the prescaled cron. 'next' = the time the *workload* container should start
schedule, schErr := cron.ParseStandard(toCreate.Spec.CronJob.Spec.Schedule)
if schErr != nil {
return false, schErr
}
next := schedule.Next(fetchedAutogenCron.GetCreationTimestamp().Time)
// if we're within the 'warmup time' when we post this, the actual cron won't get triggered until the next iteration.
if fetchedAutogenCron.GetCreationTimestamp().Time.After(next.Add(time.Duration(-toCreate.Spec.WarmUpTimeMins) * time.Minute)) {
fmt.Println(fmt.Sprintf("\nWithin warmup zone, will wait for next iteration."))
next = next.Add(time.Duration(minsApart) * time.Minute)
}
fmt.Println(fmt.Sprintf("\nWorkload container should start at: %v", next.Format(timeFormat)))
hasSuspended := false
// before workload container should start - either the cluster is scaling or the init container running here
for next.After(time.Now()) {
// try and get the pod
if jobPod, err := getPod(ctx, autoGenCronName); jobPod.Name != "" && err == nil {
if len(jobPod.Status.InitContainerStatuses) > 0 {
if jobPod.Status.InitContainerStatuses[0].State.Waiting != nil {
fmt.Println(fmt.Sprintf("Time: %v :: Pod exists, phase --> Init container starting", time.Now().Format(timeFormat)))
}
if jobPod.Status.InitContainerStatuses[0].State.Running != nil {
fmt.Println(fmt.Sprintf("Time: %v :: Pod exists, phase --> Init container running", time.Now().Format(timeFormat)))
}
} else {
fmt.Println(fmt.Sprintf("Time: %v :: Pod exists, phase --> %v", time.Now().Format(timeFormat), jobPod.Status.Phase))
}
// suspend the cron to prevent clogging up the cluster with pods we don't care about
if !hasSuspended {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: autoGenCronName, Namespace: namespace}, fetchedAutogenCron); err != nil {
return false, err
}
suspend := true
fetchedAutogenCron.Spec.Suspend = &suspend
if err := k8sClient.Update(ctx, fetchedAutogenCron); err != nil {
fmt.Println("Failed to suspend cronjob. We need it suspended so multiple instances of the CRON job don't start - we want to test with just one execution")
return false, err
}
hasSuspended = true
}
// expect workload container to *not* be running
if len(jobPod.Status.ContainerStatuses) > 1 && jobPod.Status.ContainerStatuses[0].State.Running != nil {
fmt.Println("Workload container should not be running before the schedule: FAIL")
return false, nil
}
} else {
// no pod there - not supposed to have started yet, or cluster is scaling
fmt.Println(fmt.Sprintf("Time: %v :: No pod --> waiting", time.Now().Format(timeFormat)))
}
time.Sleep(time.Second * 5)
}
// let the pod get scheduled
time.Sleep(time.Second * 30)
fmt.Println(fmt.Sprintf("\nTime: %v :: Cron iteration passed - job should now be running. Checking status ...", time.Now().Format(timeFormat)))
// get the pod to check what's happening
jobPod, err := getPod(ctx, autoGenCronName)
if err != nil {
return false, err
}
fmt.Println(fmt.Sprintf("Got job: %v :: Phase --> Workload container %v", jobPod.Name, jobPod.Status.Phase))
// nicer check for the phase / status here
if len(jobPod.Status.ContainerStatuses) == 0 || len(jobPod.Status.InitContainerStatuses) == 0 {
fmt.Println("Pod has no status. Check your test setup, it's possible the pod couldn't be scheduled: FAIL")
return false, nil
}
// expect the workload to be running and the init to be terminated. NOTE: This may fail if the workload image is being pulled - TBC
if jobPod.Status.ContainerStatuses[0].State.Running == nil {
fmt.Println("Workload container not running: FAIL")
return false, nil
}
if jobPod.Status.ContainerStatuses[0].State.Terminated != nil {
fmt.Println("Workload container terminated: FAIL")
return false, nil
}
if jobPod.Status.InitContainerStatuses[0].State.Terminated == nil {
fmt.Println("Init container still running: FAIL")
return false, nil
}
fmt.Println(fmt.Sprintf("\n\nWorkload started at: %v", jobPod.Status.ContainerStatuses[0].State.Running.StartedAt))
fmt.Println(fmt.Sprintf("Init container completed at: %v", jobPod.Status.InitContainerStatuses[0].State.Terminated.FinishedAt))
fmt.Println(fmt.Sprintf("Workload started %v seconds after schedule\n\n", jobPod.Status.ContainerStatuses[0].State.Running.StartedAt.Sub(next).Seconds()))
return true, nil
}
func getPod(ctx context.Context, podPrefix string) (v1.Pod, error) {
podList := &v1.PodList{}
opts := client.ListOptions{
Namespace: namespace,
}
err := k8sClient.List(ctx, podList, &opts)
fetchedPod := v1.Pod{}
for _, pod := range podList.Items {
if strings.HasPrefix(pod.Name, podPrefix) {
fetchedPod = pod
break
}
}
return fetchedPod, err
}
func deletePsc(jobName string) {
// Delete this psc
ctx := context.Background()
psc := &pscv1alpha1.PreScaledCronJob{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: namespace}, psc); err != nil {
return
}
if err := k8sClient.Delete(ctx, psc); err != nil {
return
}
}