cluster: dynamically update backup policy
This commit is contained in:
Родитель
1cc0d13ea9
Коммит
84ee7b587c
|
@ -11,12 +11,13 @@
|
|||
|
||||
### Added
|
||||
|
||||
- Added support for backup policy to be dynamically added, updated
|
||||
|
||||
### Changed
|
||||
|
||||
- Backup sidecar deployment created with `Recreate` strategy.
|
||||
- Spec.Backup.MaxBackups meaning change: 0 means unlimited backups; < 0 will be rejected.
|
||||
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/etcd-operator/pkg/util/constants"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
)
|
||||
|
||||
|
@ -72,7 +71,6 @@ func newBackupManager(c Config, cl *spec.Cluster, l *logrus.Entry) (*backupManag
|
|||
|
||||
// setupStorage will only set up the necessary structs in order for backup manager to
|
||||
// use the storage. It doesn't creates the actual storage here.
|
||||
// We also need to check some restore logic in setup().
|
||||
func (bm *backupManager) setupStorage() (s backupstorage.Storage, err error) {
|
||||
cl, c := bm.cluster, bm.config
|
||||
|
||||
|
@ -128,8 +126,29 @@ func (bm *backupManager) runSidecar() error {
|
|||
}
|
||||
|
||||
func (bm *backupManager) createSidecarDeployment() error {
|
||||
cl, c := bm.cluster, bm.config
|
||||
d := bm.makeSidecarDeployment()
|
||||
_, err := bm.config.KubeCli.AppsV1beta1().Deployments(bm.cluster.Metadata.Namespace).Create(d)
|
||||
return err
|
||||
}
|
||||
|
||||
func (bm *backupManager) updateSidecar(cl *spec.Cluster) error {
|
||||
// change local structs
|
||||
bm.cluster = cl
|
||||
var err error
|
||||
bm.s, err = bm.setupStorage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ns, n := cl.Metadata.Namespace, k8sutil.BackupSidecarName(cl.Metadata.Name)
|
||||
// change k8s objects
|
||||
uf := func(d *appsv1beta1.Deployment) {
|
||||
d.Spec = bm.makeSidecarDeployment().Spec
|
||||
}
|
||||
return k8sutil.PatchDeployment(bm.config.KubeCli, ns, n, uf)
|
||||
}
|
||||
|
||||
func (bm *backupManager) makeSidecarDeployment() *appsv1beta1.Deployment {
|
||||
cl, c := bm.cluster, bm.config
|
||||
podTemplate := k8sutil.NewBackupPodTemplate(cl.Metadata.Name, bm.config.ServiceAccount, cl.Spec)
|
||||
switch cl.Spec.Backup.StorageType {
|
||||
case spec.BackupStorageTypeDefault, spec.BackupStorageTypePersistentVolume:
|
||||
|
@ -137,13 +156,9 @@ func (bm *backupManager) createSidecarDeployment() error {
|
|||
case spec.BackupStorageTypeS3:
|
||||
k8sutil.PodSpecWithS3(&podTemplate.Spec, c.S3Context)
|
||||
}
|
||||
|
||||
name := k8sutil.BackupSidecarName(cl.Metadata.Name)
|
||||
dplSel := k8sutil.LabelsForCluster(cl.Metadata.Name)
|
||||
|
||||
d := k8sutil.NewBackupDeploymentManifest(name, dplSel, podTemplate, bm.cluster.AsOwner())
|
||||
_, err := c.KubeCli.AppsV1beta1().Deployments(cl.Metadata.Namespace).Create(d)
|
||||
return err
|
||||
return k8sutil.NewBackupDeploymentManifest(name, dplSel, podTemplate, bm.cluster.AsOwner())
|
||||
}
|
||||
|
||||
func (bm *backupManager) createBackupService() error {
|
||||
|
@ -199,27 +214,25 @@ func backupServiceStatusToTPRBackupServiceStatu(s *backupapi.ServiceStatus) *spe
|
|||
}
|
||||
|
||||
func (bm *backupManager) upgradeIfNeeded() error {
|
||||
cl, c := bm.cluster, bm.config
|
||||
sidecarName := k8sutil.BackupSidecarName(cl.Metadata.Name)
|
||||
ns, n := bm.cluster.Metadata.Namespace, k8sutil.BackupSidecarName(bm.cluster.Metadata.Name)
|
||||
|
||||
d, err := c.KubeCli.AppsV1beta1().Deployments(cl.Metadata.Namespace).Get(sidecarName, metav1.GetOptions{})
|
||||
d, err := bm.config.KubeCli.AppsV1beta1().Deployments(ns).Get(n, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if d.Spec.Template.Spec.Containers[0].Image == k8sutil.BackupImage {
|
||||
return nil
|
||||
}
|
||||
|
||||
bm.logger.Infof("upgrading backup sidecar from (%v) to (%v)",
|
||||
d.Spec.Template.Spec.Containers[0].Image, k8sutil.BackupImage)
|
||||
cd := k8sutil.CloneDeployment(d)
|
||||
|
||||
// TODO: backward compatibility for v0.2.6 . Remove this after v0.2.7 .
|
||||
cd.Spec.Strategy = appsv1beta1.DeploymentStrategy{
|
||||
Type: appsv1beta1.RecreateDeploymentStrategyType,
|
||||
uf := func(d *appsv1beta1.Deployment) {
|
||||
d.Spec.Template.Spec.Containers[0].Image = k8sutil.BackupImage
|
||||
// TODO: backward compatibility for v0.2.6 . Remove this after v0.2.7 .
|
||||
d.Spec.Strategy = appsv1beta1.DeploymentStrategy{
|
||||
Type: appsv1beta1.RecreateDeploymentStrategyType,
|
||||
}
|
||||
}
|
||||
|
||||
cd.Spec.Template.Spec.Containers[0].Image = k8sutil.BackupImage
|
||||
patchData, err := k8sutil.CreatePatch(d, cd, appsv1beta1.Deployment{})
|
||||
_, err = c.KubeCli.AppsV1beta1().Deployments(cl.Metadata.Namespace).Patch(d.Name, types.StrategicMergePatchType, patchData)
|
||||
return err
|
||||
return k8sutil.PatchDeployment(bm.config.KubeCli, ns, n, uf)
|
||||
}
|
||||
|
|
|
@ -270,8 +270,20 @@ func (c *Cluster) run(stopC <-chan struct{}) {
|
|||
}
|
||||
// TODO: we can't handle another upgrade while an upgrade is in progress
|
||||
c.logger.Infof("spec update: from: %v to: %v", c.cluster.Spec, event.cluster.Spec)
|
||||
|
||||
ob, nb := c.cluster.Spec.Backup, event.cluster.Spec.Backup
|
||||
c.cluster = event.cluster
|
||||
|
||||
if !isBackupPolicyEqual(ob, nb) {
|
||||
err := c.updateBackupPolicy(ob, nb)
|
||||
if err != nil {
|
||||
c.logger.Errorf("failed to update backup policy: %v", err)
|
||||
clusterFailed = true
|
||||
c.status.SetReason(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
case eventDeleteCluster:
|
||||
c.logger.Infof("cluster is deleted by the user")
|
||||
clusterFailed = true
|
||||
|
@ -341,11 +353,34 @@ func (c *Cluster) run(stopC <-chan struct{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) updateBackupPolicy(ob, nb *spec.BackupPolicy) error {
|
||||
var err error
|
||||
switch {
|
||||
case ob == nil && nb != nil:
|
||||
c.bm, err = newBackupManager(c.config, c.cluster, c.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.bm.setup()
|
||||
case ob != nil && nb == nil:
|
||||
// TODO: delete backup sidecar
|
||||
case ob != nil && nb != nil:
|
||||
return c.bm.updateSidecar(c.cluster)
|
||||
default:
|
||||
panic("unexpected backup spec comparison")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isSpecEqual(s1, s2 spec.ClusterSpec) bool {
|
||||
if s1.Size != s2.Size || s1.Paused != s2.Paused || s1.Version != s2.Version {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return isBackupPolicyEqual(s1.Backup, s2.Backup)
|
||||
}
|
||||
|
||||
func isBackupPolicyEqual(b1, b2 *spec.BackupPolicy) bool {
|
||||
return reflect.DeepEqual(b1, b2)
|
||||
}
|
||||
|
||||
func (c *Cluster) startSeedMember(recoverFromBackup bool) error {
|
||||
|
|
|
@ -169,6 +169,7 @@ func (c *Controller) handleClusterEvent(event *Event) error {
|
|||
return fmt.Errorf("ignore failed cluster (%s). Please delete its TPR", clus.Metadata.Name)
|
||||
}
|
||||
|
||||
// TODO: add validation to spec update.
|
||||
clus.Spec.Cleanup()
|
||||
|
||||
switch event.Type {
|
||||
|
|
|
@ -30,7 +30,6 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
v1beta1storage "k8s.io/client-go/pkg/apis/storage/v1beta1"
|
||||
|
@ -357,11 +356,3 @@ func BackupSidecarLabels(clusterName string) map[string]string {
|
|||
"etcd_cluster": clusterName,
|
||||
}
|
||||
}
|
||||
|
||||
func CloneDeployment(d *appsv1beta1.Deployment) *appsv1beta1.Deployment {
|
||||
cd, err := api.Scheme.DeepCopy(d)
|
||||
if err != nil {
|
||||
panic("cannot deep copy pod")
|
||||
}
|
||||
return cd.(*appsv1beta1.Deployment)
|
||||
}
|
||||
|
|
|
@ -33,11 +33,13 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // for gcp auth
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
@ -364,6 +366,29 @@ func ClonePod(p *v1.Pod) *v1.Pod {
|
|||
return np.(*v1.Pod)
|
||||
}
|
||||
|
||||
func cloneDeployment(d *appsv1beta1.Deployment) *appsv1beta1.Deployment {
|
||||
cd, err := api.Scheme.DeepCopy(d)
|
||||
if err != nil {
|
||||
panic("cannot deep copy pod")
|
||||
}
|
||||
return cd.(*appsv1beta1.Deployment)
|
||||
}
|
||||
|
||||
func PatchDeployment(kubecli kubernetes.Interface, namespace, name string, updateFunc func(*appsv1beta1.Deployment)) error {
|
||||
od, err := kubecli.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nd := cloneDeployment(od)
|
||||
updateFunc(nd)
|
||||
patchData, err := CreatePatch(od, nd, appsv1beta1.Deployment{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = kubecli.AppsV1beta1().Deployments(namespace).Patch(name, types.StrategicMergePatchType, patchData)
|
||||
return err
|
||||
}
|
||||
|
||||
// mergeLables merges l2 into l1. Conflicting label will be skipped.
|
||||
func mergeLabels(l1, l2 map[string]string) {
|
||||
for k, v := range l2 {
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/retryutil"
|
||||
"github.com/coreos/etcd-operator/test/e2e/e2eutil"
|
||||
"github.com/coreos/etcd-operator/test/e2e/framework"
|
||||
)
|
||||
|
@ -33,14 +34,13 @@ func TestFailureRecovery(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestS3DisasterRecovery(t *testing.T) {
|
||||
func TestS3Backup(t *testing.T) {
|
||||
if os.Getenv("AWS_TEST_ENABLED") != "true" {
|
||||
t.Skip("skipping test since AWS_TEST_ENABLED is not set.")
|
||||
}
|
||||
t.Run("disaster recovery on S3", func(t *testing.T) {
|
||||
t.Run("2 members (majority) down", testS3MajorityDown)
|
||||
t.Run("3 members (all) down", testS3AllDown)
|
||||
})
|
||||
t.Run("2 members (majority) down", testS3MajorityDown)
|
||||
t.Run("3 members (all) down", testS3AllDown)
|
||||
t.Run("dynamically add backup policy", testDynamicAddBackupPolicy)
|
||||
}
|
||||
|
||||
func testOneMemberRecovery(t *testing.T) {
|
||||
|
@ -64,7 +64,7 @@ func testOneMemberRecovery(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
|
||||
}
|
||||
fmt.Println("reached to 3 members cluster")
|
||||
t.Log("reached to 3 members cluster")
|
||||
|
||||
// The last pod could have not come up serving yet. If we are not killing the last pod, we should wait.
|
||||
if err := e2eutil.KillMembers(f.KubeClient, f.Namespace, names[2]); err != nil {
|
||||
|
@ -179,3 +179,40 @@ func testS3AllDown(t *testing.T) {
|
|||
|
||||
testDisasterRecoveryWithBackupPolicy(t, 3, e2eutil.NewS3BackupPolicy())
|
||||
}
|
||||
|
||||
func testDynamicAddBackupPolicy(t *testing.T) {
|
||||
if os.Getenv(envParallelTest) == envParallelTestTrue {
|
||||
t.Parallel()
|
||||
}
|
||||
|
||||
f := framework.Global
|
||||
clus, err := e2eutil.CreateCluster(t, f.KubeClient, f.Namespace, e2eutil.NewCluster("test-etcd-", 3))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
err := e2eutil.DeleteCluster(t, f.KubeClient, clus)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = e2eutil.WaitBackupPodUp(t, f.KubeClient, clus.Metadata.Namespace, clus.Metadata.Name, 60*time.Second)
|
||||
if !retryutil.IsRetryFailure(err) {
|
||||
t.Fatalf("expecting retry failure, but err = %v", err)
|
||||
}
|
||||
|
||||
uf := func(cl *spec.Cluster) {
|
||||
bp := e2eutil.NewS3BackupPolicy()
|
||||
bp.CleanupBackupsOnClusterDelete = true
|
||||
cl.Spec.Backup = bp
|
||||
}
|
||||
clus, err = e2eutil.UpdateCluster(f.KubeClient, clus, 10, uf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = e2eutil.WaitBackupPodUp(t, f.KubeClient, clus.Metadata.Namespace, clus.Metadata.Name, 60*time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
|
@ -128,21 +127,14 @@ func (f *Framework) DeleteOperator() error {
|
|||
}
|
||||
|
||||
func (f *Framework) UpgradeOperator() error {
|
||||
image := f.NewImage
|
||||
d, err := f.KubeCli.AppsV1beta1().Deployments(f.KubeNS).Get("etcd-operator", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cd := k8sutil.CloneDeployment(d)
|
||||
cd.Spec.Template.Spec.Containers[0].Image = image
|
||||
patchData, err := k8sutil.CreatePatch(d, cd, appsv1beta1.Deployment{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.KubeCli.AppsV1beta1().Deployments(f.KubeNS).Patch(d.Name, types.StrategicMergePatchType, patchData)
|
||||
uf := func(d *appsv1beta1.Deployment) {
|
||||
d.Spec.Template.Spec.Containers[0].Image = f.NewImage
|
||||
}
|
||||
err := k8sutil.PatchDeployment(f.KubeCli, f.KubeNS, "etcd-operator", uf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lo := metav1.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{"name": "etcd-operator"}).String(),
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче