e2e/util.go: Move waitUntilSize functions to e2eutil for common use
This commit is contained in:
Родитель
2361781f9f
Коммит
235640b724
|
@ -52,7 +52,7 @@ func testCreateCluster(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ func testStopOperator(t *testing.T, kill bool) {
|
|||
}
|
||||
}()
|
||||
|
||||
names, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second)
|
||||
names, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
@ -103,10 +103,10 @@ func testStopOperator(t *testing.T, kill bool) {
|
|||
if err := killMembers(f, names[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 2, 10*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 2, 10*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to wait for killed member to die: %v", err)
|
||||
}
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 10*time.Second); err == nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 10*time.Second, testEtcd); err == nil {
|
||||
t.Fatalf("cluster should not be recovered: control is paused")
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ func testStopOperator(t *testing.T, kill bool) {
|
|||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl.Spec.Paused = false
|
||||
}
|
||||
if _, err = e2eutil.UpdateCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
if testEtcd, err = e2eutil.UpdateCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
t.Fatalf("failed to resume control: %v", err)
|
||||
}
|
||||
} else {
|
||||
|
@ -123,7 +123,7 @@ func testStopOperator(t *testing.T, kill bool) {
|
|||
}
|
||||
}
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to resize to 3 members etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -146,7 +146,7 @@ func testEtcdUpgrade(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
err = waitSizeAndVersionReached(t, f, testEtcd.Metadata.Name, "3.0.16", 3, 60*time.Second)
|
||||
err = e2eutil.WaitSizeAndVersionReached(t, f.KubeClient, "3.0.16", 3, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ func testEtcdUpgrade(t *testing.T) {
|
|||
t.Fatalf("fail to update cluster version: %v", err)
|
||||
}
|
||||
|
||||
err = waitSizeAndVersionReached(t, f, testEtcd.Metadata.Name, "3.1.4", 3, 60*time.Second)
|
||||
err = e2eutil.WaitSizeAndVersionReached(t, f.KubeClient, "3.1.4", 3, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to wait new version etcd cluster: %v", err)
|
||||
}
|
||||
|
|
|
@ -51,18 +51,18 @@ func testReadyMembersStatus(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, size, 30*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, size, 30*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to create %d members etcd cluster: %v", size, err)
|
||||
}
|
||||
|
||||
err = retryutil.Retry(5*time.Second, 3, func() (done bool, err error) {
|
||||
currEtcd, err := k8sutil.GetClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, testEtcd.Metadata.Name)
|
||||
if err != nil {
|
||||
logfWithTimestamp(t, "failed to get updated cluster object: %v", err)
|
||||
e2eutil.LogfWithTimestamp(t, "failed to get updated cluster object: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if len(currEtcd.Status.Members.Ready) != size {
|
||||
logfWithTimestamp(t, "size of ready members want = %d, get = %d ReadyMembers(%v) UnreadyMembers(%v). Will retry checking ReadyMembers", size, len(currEtcd.Status.Members.Ready), currEtcd.Status.Members.Ready, currEtcd.Status.Members.Unready)
|
||||
e2eutil.LogfWithTimestamp(t, "size of ready members want = %d, get = %d ReadyMembers(%v) UnreadyMembers(%v). Will retry checking ReadyMembers", size, len(currEtcd.Status.Members.Ready), currEtcd.Status.Members.Ready, currEtcd.Status.Members.Unready)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
|
@ -101,7 +101,7 @@ func testBackupStatus(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
_, err = waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 1, 60*time.Second)
|
||||
_, err = e2eutil.WaitUntilSizeReached(t, f.KubeClient, 1, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 1 members etcd cluster: %v", err)
|
||||
}
|
||||
|
@ -119,12 +119,12 @@ func testBackupStatus(t *testing.T) {
|
|||
}
|
||||
bs := c.Status.BackupServiceStatus
|
||||
if bs == nil {
|
||||
logfWithTimestamp(t, "backup status is nil")
|
||||
e2eutil.LogfWithTimestamp(t, "backup status is nil")
|
||||
return false, nil
|
||||
}
|
||||
// We expect it will make one backup eventually.
|
||||
if bs.Backups < 1 {
|
||||
logfWithTimestamp(t, "backup number is %v", bs.Backups)
|
||||
e2eutil.LogfWithTimestamp(t, "backup number is %v", bs.Backups)
|
||||
return false, nil
|
||||
}
|
||||
if bs.BackupSize == 0 {
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
|
@ -42,7 +41,7 @@ func CreateCluster(t *testing.T, kubeClient kubernetes.Interface, namespace stri
|
|||
if err := json.Unmarshal(b, res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logfWithTimestamp(t, "created etcd cluster: %v", res.Metadata.Name)
|
||||
LogfWithTimestamp(t, "created etcd cluster: %v", res.Metadata.Name)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
@ -66,7 +65,3 @@ func DeleteCluster(t *testing.T, kubeClient kubernetes.Interface, cl *spec.Clust
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func logfWithTimestamp(t *testing.T, format string, args ...interface{}) {
|
||||
t.Log(time.Now(), fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
|
|
@ -20,132 +20,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
"github.com/coreos/etcd-operator/pkg/util/retryutil"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
)
|
||||
|
||||
func waitResourcesDeleted(t *testing.T, kubeClient kubernetes.Interface, cl *spec.Cluster) error {
|
||||
err := retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
list, err := kubeClient.CoreV1().Pods(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(list.Items) > 0 {
|
||||
p := list.Items[0]
|
||||
logfWithTimestamp(t, "waiting pod (%s) to be deleted.", p.Name)
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
buf.WriteString("init container status:\n")
|
||||
printContainerStatus(buf, p.Status.InitContainerStatuses)
|
||||
buf.WriteString("container status:\n")
|
||||
printContainerStatus(buf, p.Status.ContainerStatuses)
|
||||
t.Logf("pod (%s) status.phase is (%s): %v", p.Name, p.Status.Phase, buf.String())
|
||||
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to wait pods deleted: %v", err)
|
||||
}
|
||||
|
||||
err = retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
list, err := kubeClient.CoreV1().Services(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(list.Items) > 0 {
|
||||
logfWithTimestamp(t, "waiting service (%s) to be deleted", list.Items[0].Name)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to wait services deleted: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitBackupDeleted(kubeClient kubernetes.Interface, cl *spec.Cluster, storageCheckerOptions *StorageCheckerOptions) error {
|
||||
err := retryutil.Retry(5*time.Second, 5, func() (bool, error) {
|
||||
_, err := kubeClient.AppsV1beta1().Deployments(cl.Metadata.Namespace).Get(k8sutil.BackupSidecarName(cl.Metadata.Name), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
if apierrors.IsNotFound(err) {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait backup Deployment deleted: %v", err)
|
||||
}
|
||||
err = retryutil.Retry(5*time.Second, 2, func() (done bool, err error) {
|
||||
ls := labels.SelectorFromSet(map[string]string{
|
||||
"app": k8sutil.BackupPodSelectorAppField,
|
||||
"etcd_cluster": cl.Metadata.Name,
|
||||
}).String()
|
||||
pl, err := kubeClient.CoreV1().Pods(cl.Metadata.Namespace).List(metav1.ListOptions{
|
||||
LabelSelector: ls,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pl.Items) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
if pl.Items[0].DeletionTimestamp != nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait backup pod terminated: %v", err)
|
||||
}
|
||||
// The rest is to track backup storage, e.g. PV or S3 "dir" deleted.
|
||||
// If CleanupBackupsOnClusterDelete=false, we don't delete them and thus don't check them.
|
||||
if !cl.Spec.Backup.CleanupBackupsOnClusterDelete {
|
||||
return nil
|
||||
}
|
||||
err = retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
switch cl.Spec.Backup.StorageType {
|
||||
case spec.BackupStorageTypePersistentVolume, spec.BackupStorageTypeDefault:
|
||||
pl, err := kubeClient.CoreV1().PersistentVolumeClaims(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pl.Items) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
case spec.BackupStorageTypeS3:
|
||||
resp, err := storageCheckerOptions.S3Cli.ListObjects(&s3.ListObjectsInput{
|
||||
Bucket: aws.String(storageCheckerOptions.S3Bucket),
|
||||
Prefix: aws.String(cl.Metadata.Name + "/"),
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(resp.Contents) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait storage (%s) to be deleted: %v", cl.Spec.Backup.StorageType, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func printContainerStatus(buf *bytes.Buffer, ss []v1.ContainerStatus) {
|
||||
for _, s := range ss {
|
||||
if s.State.Waiting != nil {
|
||||
|
@ -156,3 +33,7 @@ func printContainerStatus(buf *bytes.Buffer, ss []v1.ContainerStatus) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func LogfWithTimestamp(t *testing.T, format string, args ...interface{}) {
|
||||
t.Log(time.Now(), fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
// Copyright 2017 The etcd-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package e2eutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
"github.com/coreos/etcd-operator/pkg/util/retryutil"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
)
|
||||
|
||||
type acceptFunc func(*v1.Pod) bool
|
||||
|
||||
func WaitUntilSizeReached(t *testing.T, kubeClient kubernetes.Interface, size int, timeout time.Duration, cl *spec.Cluster) ([]string, error) {
|
||||
return waitSizeReachedWithAccept(t, kubeClient, size, timeout, cl)
|
||||
}
|
||||
|
||||
func WaitSizeAndVersionReached(t *testing.T, kubeClient kubernetes.Interface, version string, size int, timeout time.Duration, cl *spec.Cluster) error {
|
||||
_, err := waitSizeReachedWithAccept(t, kubeClient, size, timeout, cl, func(pod *v1.Pod) bool {
|
||||
return k8sutil.GetEtcdVersion(pod) == version
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func waitSizeReachedWithAccept(t *testing.T, kubeClient kubernetes.Interface, size int, timeout time.Duration, cl *spec.Cluster, accepts ...acceptFunc) ([]string, error) {
|
||||
var names []string
|
||||
err := retryutil.Retry(10*time.Second, int(timeout/(10*time.Second)), func() (done bool, err error) {
|
||||
podList, err := kubeClient.Core().Pods(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
names = nil
|
||||
var nodeNames []string
|
||||
for i := range podList.Items {
|
||||
pod := &podList.Items[i]
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
continue
|
||||
}
|
||||
accepted := true
|
||||
for _, acceptPod := range accepts {
|
||||
if !acceptPod(pod) {
|
||||
accepted = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !accepted {
|
||||
continue
|
||||
}
|
||||
names = append(names, pod.Name)
|
||||
nodeNames = append(nodeNames, pod.Spec.NodeName)
|
||||
}
|
||||
LogfWithTimestamp(t, "waiting size (%d), etcd pods: names (%v), nodes (%v)", size, names, nodeNames)
|
||||
// TODO: Change this to check for ready members and not just cluster pods
|
||||
if len(names) != size {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func waitResourcesDeleted(t *testing.T, kubeClient kubernetes.Interface, cl *spec.Cluster) error {
|
||||
err := retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
list, err := kubeClient.CoreV1().Pods(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(list.Items) > 0 {
|
||||
p := list.Items[0]
|
||||
LogfWithTimestamp(t, "waiting pod (%s) to be deleted.", p.Name)
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
buf.WriteString("init container status:\n")
|
||||
printContainerStatus(buf, p.Status.InitContainerStatuses)
|
||||
buf.WriteString("container status:\n")
|
||||
printContainerStatus(buf, p.Status.ContainerStatuses)
|
||||
t.Logf("pod (%s) status.phase is (%s): %v", p.Name, p.Status.Phase, buf.String())
|
||||
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to wait pods deleted: %v", err)
|
||||
}
|
||||
|
||||
err = retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
list, err := kubeClient.CoreV1().Services(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(list.Items) > 0 {
|
||||
LogfWithTimestamp(t, "waiting service (%s) to be deleted", list.Items[0].Name)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to wait services deleted: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitBackupDeleted(kubeClient kubernetes.Interface, cl *spec.Cluster, storageCheckerOptions *StorageCheckerOptions) error {
|
||||
err := retryutil.Retry(5*time.Second, 5, func() (bool, error) {
|
||||
_, err := kubeClient.AppsV1beta1().Deployments(cl.Metadata.Namespace).Get(k8sutil.BackupSidecarName(cl.Metadata.Name), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
if apierrors.IsNotFound(err) {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait backup Deployment deleted: %v", err)
|
||||
}
|
||||
err = retryutil.Retry(5*time.Second, 2, func() (done bool, err error) {
|
||||
ls := labels.SelectorFromSet(map[string]string{
|
||||
"app": k8sutil.BackupPodSelectorAppField,
|
||||
"etcd_cluster": cl.Metadata.Name,
|
||||
}).String()
|
||||
pl, err := kubeClient.CoreV1().Pods(cl.Metadata.Namespace).List(metav1.ListOptions{
|
||||
LabelSelector: ls,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pl.Items) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
if pl.Items[0].DeletionTimestamp != nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait backup pod terminated: %v", err)
|
||||
}
|
||||
// The rest is to track backup storage, e.g. PV or S3 "dir" deleted.
|
||||
// If CleanupBackupsOnClusterDelete=false, we don't delete them and thus don't check them.
|
||||
if !cl.Spec.Backup.CleanupBackupsOnClusterDelete {
|
||||
return nil
|
||||
}
|
||||
err = retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
switch cl.Spec.Backup.StorageType {
|
||||
case spec.BackupStorageTypePersistentVolume, spec.BackupStorageTypeDefault:
|
||||
pl, err := kubeClient.CoreV1().PersistentVolumeClaims(cl.Metadata.Namespace).List(k8sutil.ClusterListOpt(cl.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pl.Items) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
case spec.BackupStorageTypeS3:
|
||||
resp, err := storageCheckerOptions.S3Cli.ListObjects(&s3.ListObjectsInput{
|
||||
Bucket: aws.String(storageCheckerOptions.S3Bucket),
|
||||
Prefix: aws.String(cl.Metadata.Name + "/"),
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(resp.Contents) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait storage (%s) to be deleted: %v", cl.Spec.Backup.StorageType, err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -59,7 +59,7 @@ func testOneMemberRecovery(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
names, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second)
|
||||
names, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func testOneMemberRecovery(t *testing.T) {
|
|||
if err := killMembers(f, names[2]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to resize to 3 members etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func testDisasterRecoveryWithBackupPolicy(t *testing.T, numToKill int, backupPol
|
|||
}
|
||||
}()
|
||||
|
||||
names, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second)
|
||||
names, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
@ -146,12 +146,12 @@ func testDisasterRecoveryWithBackupPolicy(t *testing.T, numToKill int, backupPol
|
|||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
toKill := names[:numToKill]
|
||||
logfWithTimestamp(t, "killing pods: %v", toKill)
|
||||
e2eutil.LogfWithTimestamp(t, "killing pods: %v", toKill)
|
||||
// TODO: race: members could be recovered between being deleted one by one.
|
||||
if err := killMembers(f, toKill...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 120*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 120*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to resize to 3 members etcd cluster: %v", err)
|
||||
}
|
||||
// TODO: add checking of data in etcd
|
||||
|
|
|
@ -48,7 +48,7 @@ func testResizeCluster3to5(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
fmt.Println("reached to 3 members cluster")
|
||||
|
@ -60,7 +60,7 @@ func testResizeCluster3to5(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 5, 60*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 5, 60*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to resize to 5 members etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ func testResizeCluster5to3(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 5, 90*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 5, 90*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to create 5 members etcd cluster: %v", err)
|
||||
}
|
||||
fmt.Println("reached to 5 members cluster")
|
||||
|
@ -93,7 +93,7 @@ func testResizeCluster5to3(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to resize to 3 members etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ func testClusterRestoreWithBackupPolicy(t *testing.T, needDataClone bool, backup
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
names, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 60*time.Second)
|
||||
names, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ func testClusterRestoreWithBackupPolicy(t *testing.T, needDataClone bool, backup
|
|||
}
|
||||
}()
|
||||
|
||||
names, err = waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, waitRestoreTimeout)
|
||||
names, err = e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, waitRestoreTimeout, testEtcd)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ func testCreateSelfHostedCluster(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 240*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 240*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to create 3 members self-hosted etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ func testCreateSelfHostedClusterWithBootMember(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
if _, err := waitUntilSizeReached(t, f, testEtcd.Metadata.Name, 3, 120*time.Second); err != nil {
|
||||
if _, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 120*time.Second, testEtcd); err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func TestPeerTLS(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
members, err := waitUntilSizeReached(t, f, c.Metadata.Name, 3, 60*time.Second)
|
||||
members, err := e2eutil.WaitUntilSizeReached(t, f.KubeClient, 3, 60*time.Second, c)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
|
|
@ -15,16 +15,12 @@
|
|||
package upgradetest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
"github.com/coreos/etcd-operator/pkg/util/retryutil"
|
||||
"github.com/coreos/etcd-operator/test/e2e/e2eutil"
|
||||
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
)
|
||||
|
||||
func TestResize(t *testing.T) {
|
||||
|
@ -49,7 +45,7 @@ func TestResize(t *testing.T) {
|
|||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
}()
|
||||
_, err = waitUntilSizeReached(testClus.Metadata.Name, 3, 60*time.Second)
|
||||
_, err = e2eutil.WaitUntilSizeReached(t, testF.KubeCli, 3, 60*time.Second, testClus)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -70,37 +66,8 @@ func TestResize(t *testing.T) {
|
|||
if _, err := e2eutil.UpdateCluster(testF.KubeCli, testClus, 10, updateFunc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = waitUntilSizeReached(testClus.Metadata.Name, 5, 60*time.Second)
|
||||
_, err = e2eutil.WaitUntilSizeReached(t, testF.KubeCli, 5, 60*time.Second, testClus)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func waitUntilSizeReached(clusterName string, size int, timeout time.Duration) ([]string, error) {
|
||||
var names []string
|
||||
err := retryutil.Retry(10*time.Second, int(timeout/(10*time.Second)), func() (bool, error) {
|
||||
podList, err := testF.KubeCli.CoreV1().Pods(testF.KubeNS).List(k8sutil.ClusterListOpt(clusterName))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
names = nil
|
||||
var nodeNames []string
|
||||
for i := range podList.Items {
|
||||
pod := &podList.Items[i]
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
continue
|
||||
}
|
||||
names = append(names, pod.Name)
|
||||
nodeNames = append(nodeNames, pod.Spec.NodeName)
|
||||
}
|
||||
fmt.Println("names:", names)
|
||||
if len(names) != size {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/client/experimentalclient"
|
||||
|
@ -41,8 +40,6 @@ const (
|
|||
etcdValBar = "bar"
|
||||
)
|
||||
|
||||
type acceptFunc func(*v1.Pod) bool
|
||||
|
||||
func waitBackupPodUp(f *framework.Framework, clusterName string, timeout time.Duration) error {
|
||||
ls := labels.SelectorFromSet(map[string]string{
|
||||
"app": k8sutil.BackupPodSelectorAppField,
|
||||
|
@ -93,56 +90,6 @@ func makeBackup(f *framework.Framework, clusterName string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func waitUntilSizeReached(t *testing.T, f *framework.Framework, clusterName string, size int, timeout time.Duration) ([]string, error) {
|
||||
return waitSizeReachedWithAccept(t, f, clusterName, size, timeout)
|
||||
}
|
||||
|
||||
func waitSizeAndVersionReached(t *testing.T, f *framework.Framework, clusterName, version string, size int, timeout time.Duration) error {
|
||||
_, err := waitSizeReachedWithAccept(t, f, clusterName, size, timeout, func(pod *v1.Pod) bool {
|
||||
return k8sutil.GetEtcdVersion(pod) == version
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func waitSizeReachedWithAccept(t *testing.T, f *framework.Framework, clusterName string, size int, timeout time.Duration, accepts ...acceptFunc) ([]string, error) {
|
||||
var names []string
|
||||
err := retryutil.Retry(10*time.Second, int(timeout/(10*time.Second)), func() (done bool, err error) {
|
||||
podList, err := f.KubeClient.Core().Pods(f.Namespace).List(k8sutil.ClusterListOpt(clusterName))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
names = nil
|
||||
var nodeNames []string
|
||||
for i := range podList.Items {
|
||||
pod := &podList.Items[i]
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
continue
|
||||
}
|
||||
accepted := true
|
||||
for _, acceptPod := range accepts {
|
||||
if !acceptPod(pod) {
|
||||
accepted = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !accepted {
|
||||
continue
|
||||
}
|
||||
names = append(names, pod.Name)
|
||||
nodeNames = append(nodeNames, pod.Spec.NodeName)
|
||||
}
|
||||
logfWithTimestamp(t, "waiting size (%d), etcd pods: names (%v), nodes (%v)", size, names, nodeNames)
|
||||
if len(names) != size {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func killMembers(f *framework.Framework, names ...string) error {
|
||||
for _, name := range names {
|
||||
err := f.KubeClient.CoreV1().Pods(f.Namespace).Delete(name, metav1.NewDeleteOptions(0))
|
||||
|
@ -153,10 +100,6 @@ func killMembers(f *framework.Framework, names ...string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func logfWithTimestamp(t *testing.T, format string, args ...interface{}) {
|
||||
t.Log(time.Now(), fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
func createEtcdClient(addr string) (*clientv3.Client, error) {
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: []string{addr},
|
||||
|
|
Загрузка…
Ссылка в новой задаче