Merge pull request #1026 from hasbro17/haseeb/refactor-e2e-util
tpr_util,e2e/util: AtomicUpdateClusterTPR to retry safe update
This commit is contained in:
Коммит
7f1b3d8e3f
|
@ -16,7 +16,6 @@ package k8sutil
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
@ -30,6 +29,10 @@ import (
|
|||
|
||||
// TODO: replace this package with Operator client
|
||||
|
||||
// ClusterTPRUpdateFunc is a function to be used when atomically
|
||||
// updating a Cluster TPR.
|
||||
type ClusterTPRUpdateFunc func(*spec.Cluster)
|
||||
|
||||
func WatchClusters(host, ns string, httpClient *http.Client, resourceVersion string) (*http.Response, error) {
|
||||
return httpClient.Get(fmt.Sprintf("%s/apis/%s/%s/namespaces/%s/clusters?watch=true&resourceVersion=%s",
|
||||
host, spec.TPRGroup, spec.TPRVersion, ns, resourceVersion))
|
||||
|
@ -74,16 +77,32 @@ func GetClusterTPRObject(restcli rest.Interface, ns, name string) (*spec.Cluster
|
|||
return readOutCluster(b)
|
||||
}
|
||||
|
||||
// UpdateClusterTPRObject updates the given TPR object.
|
||||
// ResourceVersion of the object MUST be set or update will fail.
|
||||
func UpdateClusterTPRObject(restcli rest.Interface, ns string, c *spec.Cluster) (*spec.Cluster, error) {
|
||||
if len(c.Metadata.ResourceVersion) == 0 {
|
||||
return nil, errors.New("k8sutil: resource version is not provided")
|
||||
}
|
||||
return updateClusterTPRObject(restcli, ns, c)
|
||||
// AtomicUpdateClusterTPRObject will get the latest result of a cluster,
|
||||
// let user modify it, and update the cluster with modified result
|
||||
// The entire process would be retried if there is a conflict of resource version
|
||||
func AtomicUpdateClusterTPRObject(restcli rest.Interface, name, namespace string, maxRetries int, updateFunc ClusterTPRUpdateFunc) (*spec.Cluster, error) {
|
||||
var updatedCluster *spec.Cluster
|
||||
err := retryutil.Retry(1*time.Second, maxRetries, func() (done bool, err error) {
|
||||
currCluster, err := GetClusterTPRObject(restcli, namespace, name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
updateFunc(currCluster)
|
||||
|
||||
updatedCluster, err = UpdateClusterTPRObject(restcli, namespace, currCluster)
|
||||
if err != nil {
|
||||
if apierrors.IsConflict(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
return updatedCluster, err
|
||||
}
|
||||
|
||||
func updateClusterTPRObject(restcli rest.Interface, ns string, c *spec.Cluster) (*spec.Cluster, error) {
|
||||
func UpdateClusterTPRObject(restcli rest.Interface, ns string, c *spec.Cluster) (*spec.Cluster, error) {
|
||||
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/clusters/%s", spec.TPRGroup, spec.TPRVersion, ns, c.Metadata.Name)
|
||||
b, err := restcli.Put().RequestURI(uri).Body(c).DoRaw()
|
||||
if err != nil {
|
||||
|
|
|
@ -19,7 +19,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/test/e2e/e2eutil"
|
||||
"github.com/coreos/etcd-operator/test/e2e/framework"
|
||||
)
|
||||
|
||||
|
@ -83,13 +84,10 @@ func testStopOperator(t *testing.T, kill bool) {
|
|||
}
|
||||
|
||||
if !kill {
|
||||
testEtcd, err = k8sutil.GetClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, testEtcd.Metadata.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get cluster TPR object:%v", err)
|
||||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl.Spec.Paused = true
|
||||
}
|
||||
|
||||
testEtcd.Spec.Paused = true
|
||||
if testEtcd, err = updateEtcdCluster(f, testEtcd); err != nil {
|
||||
if testEtcd, err = e2eutil.UpdateEtcdCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
t.Fatalf("failed to pause control: %v", err)
|
||||
}
|
||||
|
||||
|
@ -113,13 +111,10 @@ func testStopOperator(t *testing.T, kill bool) {
|
|||
}
|
||||
|
||||
if !kill {
|
||||
testEtcd, err = k8sutil.GetClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, testEtcd.Metadata.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get cluster TPR object:%v", err)
|
||||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl.Spec.Paused = false
|
||||
}
|
||||
|
||||
testEtcd.Spec.Paused = false
|
||||
if _, err = updateEtcdCluster(f, testEtcd); err != nil {
|
||||
if _, err = e2eutil.UpdateEtcdCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
t.Fatalf("failed to resume control: %v", err)
|
||||
}
|
||||
} else {
|
||||
|
@ -156,14 +151,10 @@ func testEtcdUpgrade(t *testing.T) {
|
|||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
|
||||
testEtcd, err = k8sutil.GetClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, testEtcd.Metadata.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get cluster TPR object:%v", err)
|
||||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl = etcdClusterWithVersion(cl, "3.1.4")
|
||||
}
|
||||
|
||||
testEtcd = etcdClusterWithVersion(testEtcd, "3.1.4")
|
||||
|
||||
if _, err := updateEtcdCluster(f, testEtcd); err != nil {
|
||||
if _, err := e2eutil.UpdateEtcdCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
t.Fatalf("fail to update cluster version: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package e2eutil
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
func UpdateEtcdCluster(kubeClient kubernetes.Interface, cl *spec.Cluster, maxRetries int, updateFunc k8sutil.ClusterTPRUpdateFunc) (*spec.Cluster, error) {
|
||||
return k8sutil.AtomicUpdateClusterTPRObject(kubeClient.CoreV1().RESTClient(), cl.Metadata.Name, cl.Metadata.Namespace, maxRetries, updateFunc)
|
||||
}
|
|
@ -20,7 +20,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/test/e2e/e2eutil"
|
||||
"github.com/coreos/etcd-operator/test/e2e/framework"
|
||||
)
|
||||
|
||||
|
@ -52,13 +53,10 @@ func testResizeCluster3to5(t *testing.T) {
|
|||
}
|
||||
fmt.Println("reached to 3 members cluster")
|
||||
|
||||
testEtcd, err = k8sutil.GetClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, testEtcd.Metadata.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get cluster TPR object:%v", err)
|
||||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl.Spec.Size = 5
|
||||
}
|
||||
|
||||
testEtcd.Spec.Size = 5
|
||||
if _, err := updateEtcdCluster(f, testEtcd); err != nil {
|
||||
if _, err := e2eutil.UpdateEtcdCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -88,13 +86,10 @@ func testResizeCluster5to3(t *testing.T) {
|
|||
}
|
||||
fmt.Println("reached to 5 members cluster")
|
||||
|
||||
testEtcd, err = k8sutil.GetClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, testEtcd.Metadata.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get cluster TPR object:%v", err)
|
||||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl.Spec.Size = 3
|
||||
}
|
||||
|
||||
testEtcd.Spec.Size = 3
|
||||
if _, err := updateEtcdCluster(f, testEtcd); err != nil {
|
||||
if _, err := e2eutil.UpdateEtcdCluster(f.KubeClient, testEtcd, 10, updateFunc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
|
||||
"github.com/coreos/etcd-operator/pkg/util/retryutil"
|
||||
"github.com/coreos/etcd-operator/test/e2e/e2eutil"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
|
@ -66,9 +67,11 @@ func TestResize(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testClus.Spec.Size = 5
|
||||
_, err = updateEtcdCluster(testClus)
|
||||
if err != nil {
|
||||
|
||||
updateFunc := func(cl *spec.Cluster) {
|
||||
cl.Spec.Size = 5
|
||||
}
|
||||
if _, err := e2eutil.UpdateEtcdCluster(testF.KubeCli, testClus, 10, updateFunc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = waitUntilSizeReached(testClus.Metadata.Name, 5, 60*time.Second)
|
||||
|
@ -102,10 +105,6 @@ func createCluster() (*spec.Cluster, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func updateEtcdCluster(cl *spec.Cluster) (*spec.Cluster, error) {
|
||||
return k8sutil.UpdateClusterTPRObject(testF.KubeCli.CoreV1().RESTClient(), testF.KubeNS, cl)
|
||||
}
|
||||
|
||||
func deleteCluster() error {
|
||||
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/clusters/%s", spec.TPRGroup, spec.TPRVersion, testF.KubeNS, "upgrade-test")
|
||||
_, err := testF.KubeCli.CoreV1().RESTClient().Delete().RequestURI(uri).DoRaw()
|
||||
|
|
|
@ -232,10 +232,6 @@ func createCluster(t *testing.T, f *framework.Framework, cl *spec.Cluster) (*spe
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func updateEtcdCluster(f *framework.Framework, c *spec.Cluster) (*spec.Cluster, error) {
|
||||
return k8sutil.UpdateClusterTPRObject(f.KubeClient.CoreV1().RESTClient(), f.Namespace, c)
|
||||
}
|
||||
|
||||
func deleteEtcdCluster(t *testing.T, f *framework.Framework, c *spec.Cluster) error {
|
||||
podList, err := f.KubeClient.CoreV1().Pods(f.Namespace).List(k8sutil.ClusterListOpt(c.Metadata.Name))
|
||||
if err != nil {
|
||||
|
|
Загрузка…
Ссылка в новой задаче