Merge pull request #1030 from hongchaodeng/bu
backup: use Deployment to manage sidecar instead od ReplicaSet
This commit is contained in:
Коммит
8de838dbc1
|
@ -1,5 +1,12 @@
|
|||
## [Unreleased 0.2.6]
|
||||
|
||||
### Upgrade Notice
|
||||
|
||||
- Once operator is upgraded, all backup-enabled cluster will go through an upgrade process that
|
||||
deletes backup sidecar's ReplicaSet and creates new Deployment for sidecar.
|
||||
If upgrading failed for any reason, cluster TPR's `status.phase` will be FAILED.
|
||||
Recreate of the cluster TPR is needed on failure case.
|
||||
|
||||
### Added
|
||||
|
||||
- PodPolicy provides `EtcdEnv` option to add custom env to the etcd process.
|
||||
|
@ -9,7 +16,7 @@
|
|||
|
||||
- Self-hosted etcd pod's anti-affinity label selector is changed to select `{"app": "etcd"}`.
|
||||
That is, no two etcd pods should sit on the same node, even if they belongs to different clusters.
|
||||
|
||||
- Using Deployment to manage backup sidecar instead of ReplicaSet.
|
||||
- S3 backup path is changed to `${BUCKET_NAME}/v1/${NAMESPACE}/${CLUSTER_NAME}/`.
|
||||
|
||||
### Removed
|
||||
|
|
|
@ -86,9 +86,9 @@ rules:
|
|||
verbs:
|
||||
- "*"
|
||||
- apiGroups:
|
||||
- extensions
|
||||
- apps
|
||||
resources:
|
||||
- replicasets
|
||||
- deployments
|
||||
verbs:
|
||||
- "*"
|
||||
EOF
|
||||
|
|
|
@ -43,9 +43,9 @@ rules:
|
|||
verbs:
|
||||
- "*"
|
||||
- apiGroups:
|
||||
- extensions
|
||||
- apps
|
||||
resources:
|
||||
- replicasets
|
||||
- deployments
|
||||
verbs:
|
||||
- "*"
|
||||
- apiGroups:
|
||||
|
|
|
@ -30,7 +30,10 @@ import (
|
|||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/etcd-operator/pkg/util/constants"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -115,36 +118,34 @@ func (bm *backupManager) setup() error {
|
|||
}
|
||||
|
||||
func (bm *backupManager) runSidecar() error {
|
||||
cl, c := bm.cluster, bm.config
|
||||
podSpec, err := k8sutil.NewBackupPodSpec(cl.Metadata.Name, bm.config.ServiceAccount, cl.Spec)
|
||||
if err != nil {
|
||||
return err
|
||||
if err := bm.createSidecarDeployment(); err != nil {
|
||||
return fmt.Errorf("failed to create backup sidecar Deployment: %v", err)
|
||||
}
|
||||
if err := bm.createBackupService(); err != nil {
|
||||
return fmt.Errorf("failed to create backup sidecar service: %v", err)
|
||||
}
|
||||
bm.logger.Info("backup sidecar deployment and service created")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bm *backupManager) createSidecarDeployment() error {
|
||||
cl, c := bm.cluster, bm.config
|
||||
|
||||
podSpec := k8sutil.NewBackupPodSpec(cl.Metadata.Name, bm.config.ServiceAccount, cl.Spec)
|
||||
switch cl.Spec.Backup.StorageType {
|
||||
case spec.BackupStorageTypeDefault, spec.BackupStorageTypePersistentVolume:
|
||||
podSpec = k8sutil.PodSpecWithPV(podSpec, cl.Metadata.Name)
|
||||
case spec.BackupStorageTypeS3:
|
||||
podSpec = k8sutil.PodSpecWithS3(podSpec, c.S3Context)
|
||||
}
|
||||
if err = bm.createBackupReplicaSet(*podSpec); err != nil {
|
||||
return fmt.Errorf("failed to create backup replica set: %v", err)
|
||||
}
|
||||
if err = bm.createBackupService(); err != nil {
|
||||
return fmt.Errorf("failed to create backup service: %v", err)
|
||||
}
|
||||
bm.logger.Info("backup replica set and service created")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bm *backupManager) createBackupReplicaSet(podSpec v1.PodSpec) error {
|
||||
rs := k8sutil.NewBackupReplicaSetManifest(bm.cluster.Metadata.Name, podSpec, bm.cluster.AsOwner())
|
||||
_, err := bm.config.KubeCli.ExtensionsV1beta1().ReplicaSets(bm.cluster.Metadata.Namespace).Create(rs)
|
||||
if err != nil {
|
||||
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
name := k8sutil.BackupSidecarName(cl.Metadata.Name)
|
||||
podSel := k8sutil.BackupSidecarLabels(cl.Metadata.Name)
|
||||
dplSel := k8sutil.LabelsForCluster(cl.Metadata.Name)
|
||||
|
||||
d := k8sutil.NewBackupDeploymentManifest(name, dplSel, podSel, *podSpec, bm.cluster.AsOwner())
|
||||
_, err := c.KubeCli.AppsV1beta1().Deployments(cl.Metadata.Namespace).Create(d)
|
||||
return err
|
||||
}
|
||||
|
||||
func (bm *backupManager) createBackupService() error {
|
||||
|
@ -198,3 +199,40 @@ func backupServiceStatusToTPRBackupServiceStatu(s *backupapi.ServiceStatus) *spe
|
|||
}
|
||||
return &bs
|
||||
}
|
||||
|
||||
func (bm *backupManager) upgradeIfNeeded() error {
|
||||
cl, c := bm.cluster, bm.config
|
||||
sidecarName := k8sutil.BackupSidecarName(cl.Metadata.Name)
|
||||
|
||||
// backward compatibility: backup sidecar replica set existed and we need change that to deployment.
|
||||
// TODO: remove this after v0.2.6 .
|
||||
_, err := c.KubeCli.ExtensionsV1beta1().ReplicaSets(cl.Metadata.Namespace).Get(sidecarName, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
err = c.KubeCli.ExtensionsV1beta1().ReplicaSets(cl.Metadata.Namespace).Delete(sidecarName, &metav1.DeleteOptions{
|
||||
GracePeriodSeconds: func(t int64) *int64 { return &t }(0),
|
||||
PropagationPolicy: func() *metav1.DeletionPropagation {
|
||||
foreground := metav1.DeletePropagationForeground
|
||||
return &foreground
|
||||
}(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Note: If RS was deleted but deployment failed to be created, then the cluster's phase would be switched to FAILED.
|
||||
return bm.createSidecarDeployment()
|
||||
}
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
// The following path should be run only if no previous backup RS existed.
|
||||
|
||||
d, err := c.KubeCli.AppsV1beta1().Deployments(cl.Metadata.Namespace).Get(sidecarName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cd := k8sutil.CloneDeployment(d)
|
||||
cd.Spec.Template.Spec.Containers[0].Image = k8sutil.BackupImage
|
||||
patchData, err := k8sutil.CreatePatch(d, cd, appsv1beta1.Deployment{})
|
||||
_, err = c.KubeCli.AppsV1beta1().Deployments(cl.Metadata.Namespace).Patch(d.Name, types.StrategicMergePatchType, patchData)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -157,6 +157,12 @@ func (c *Cluster) setup() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !shouldCreateCluster {
|
||||
err := c.bm.upgradeIfNeeded()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldCreateCluster {
|
||||
|
|
|
@ -82,8 +82,8 @@ func (gc *GC) collectResources(option metav1.ListOptions, runningSet map[types.U
|
|||
if err := gc.collectServices(option, runningSet); err != nil {
|
||||
gc.logger.Errorf("gc services failed: %v", err)
|
||||
}
|
||||
if err := gc.collectReplicaSet(option, runningSet); err != nil {
|
||||
gc.logger.Errorf("gc replica set failed: %v", err)
|
||||
if err := gc.collectDeployment(option, runningSet); err != nil {
|
||||
gc.logger.Errorf("gc deployments failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,34 +134,31 @@ func (gc *GC) collectServices(option metav1.ListOptions, runningSet map[types.UI
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gc *GC) collectReplicaSet(option metav1.ListOptions, runningSet map[types.UID]bool) error {
|
||||
rss, err := gc.kubecli.ExtensionsV1beta1().ReplicaSets(gc.ns).List(option)
|
||||
func (gc *GC) collectDeployment(option metav1.ListOptions, runningSet map[types.UID]bool) error {
|
||||
ds, err := gc.kubecli.AppsV1beta1().Deployments(gc.ns).List(option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rs := range rss.Items {
|
||||
if len(rs.OwnerReferences) == 0 {
|
||||
gc.logger.Warningf("failed to check replica set %s: no owner", rs.GetName())
|
||||
for _, d := range ds.Items {
|
||||
if len(d.OwnerReferences) == 0 {
|
||||
gc.logger.Warningf("failed to GC deployment (%s): no owner", d.GetName())
|
||||
continue
|
||||
}
|
||||
if !runningSet[rs.OwnerReferences[0].UID] {
|
||||
// set orphanOption to false to enable Kubernetes GC to remove the objects that
|
||||
// depends on this replica set.
|
||||
// See https://kubernetes.io/docs/user-guide/garbage-collection/ for more details.
|
||||
orphanOption := false
|
||||
// set gracePeriod to delete the replica set immediately
|
||||
gracePeriod := int64(0)
|
||||
err = gc.kubecli.ExtensionsV1beta1().ReplicaSets(gc.ns).Delete(rs.GetName(), &metav1.DeleteOptions{
|
||||
OrphanDependents: &orphanOption,
|
||||
GracePeriodSeconds: &gracePeriod,
|
||||
if !runningSet[d.OwnerReferences[0].UID] {
|
||||
err = gc.kubecli.AppsV1beta1().Deployments(gc.ns).Delete(d.GetName(), &metav1.DeleteOptions{
|
||||
GracePeriodSeconds: func(t int64) *int64 { return &t }(0),
|
||||
PropagationPolicy: func() *metav1.DeletionPropagation {
|
||||
foreground := metav1.DeletePropagationForeground
|
||||
return &foreground
|
||||
}(),
|
||||
})
|
||||
if err != nil {
|
||||
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
gc.logger.Infof("deleted replica set (%s)", rs.GetName())
|
||||
gc.logger.Infof("deleted deployment (%s)", d.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,8 +30,9 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
v1beta1extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
v1beta1storage "k8s.io/client-go/pkg/apis/storage/v1beta1"
|
||||
)
|
||||
|
||||
|
@ -165,10 +166,10 @@ func PodSpecWithS3(ps *v1.PodSpec, s3Ctx s3config.S3Context) *v1.PodSpec {
|
|||
return ps
|
||||
}
|
||||
|
||||
func NewBackupPodSpec(clusterName, account string, sp spec.ClusterSpec) (*v1.PodSpec, error) {
|
||||
func NewBackupPodSpec(clusterName, account string, sp spec.ClusterSpec) *v1.PodSpec {
|
||||
b, err := json.Marshal(sp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
panic("unexpected json error " + err.Error())
|
||||
}
|
||||
|
||||
var nsel map[string]string
|
||||
|
@ -197,45 +198,37 @@ func NewBackupPodSpec(clusterName, account string, sp spec.ClusterSpec) (*v1.Pod
|
|||
},
|
||||
},
|
||||
}
|
||||
return ps, nil
|
||||
return ps
|
||||
}
|
||||
|
||||
func backupNameAndLabel(clusterName string) (string, map[string]string) {
|
||||
labels := map[string]string{
|
||||
"app": BackupPodSelectorAppField,
|
||||
"etcd_cluster": clusterName,
|
||||
}
|
||||
name := BackupServiceName(clusterName)
|
||||
return name, labels
|
||||
}
|
||||
|
||||
func NewBackupReplicaSetManifest(clusterName string, ps v1.PodSpec, owner metav1.OwnerReference) *v1beta1extensions.ReplicaSet {
|
||||
name, labels := backupNameAndLabel(clusterName)
|
||||
rs := &v1beta1extensions.ReplicaSet{
|
||||
func NewBackupDeploymentManifest(name string, dplSel, podSel map[string]string, ps v1.PodSpec, owner metav1.OwnerReference) *appsv1beta1.Deployment {
|
||||
d := &appsv1beta1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: newLablesForCluster(clusterName),
|
||||
Labels: dplSel,
|
||||
},
|
||||
Spec: v1beta1extensions.ReplicaSetSpec{
|
||||
Selector: &metav1.LabelSelector{MatchLabels: labels},
|
||||
Spec: appsv1beta1.DeploymentSpec{
|
||||
Selector: &metav1.LabelSelector{MatchLabels: podSel},
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: labels,
|
||||
Labels: podSel,
|
||||
},
|
||||
Spec: ps,
|
||||
},
|
||||
},
|
||||
}
|
||||
addOwnerRefToObject(rs.GetObjectMeta(), owner)
|
||||
return rs
|
||||
addOwnerRefToObject(d.GetObjectMeta(), owner)
|
||||
return d
|
||||
}
|
||||
|
||||
func NewBackupServiceManifest(clusterName string, owner metav1.OwnerReference) *v1.Service {
|
||||
name, labels := backupNameAndLabel(clusterName)
|
||||
selector := BackupSidecarLabels(clusterName)
|
||||
name := BackupSidecarName(clusterName)
|
||||
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: newLablesForCluster(clusterName),
|
||||
Labels: LabelsForCluster(clusterName),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{
|
||||
|
@ -246,7 +239,7 @@ func NewBackupServiceManifest(clusterName string, owner metav1.OwnerReference) *
|
|||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Selector: labels,
|
||||
Selector: selector,
|
||||
},
|
||||
}
|
||||
addOwnerRefToObject(svc.GetObjectMeta(), owner)
|
||||
|
@ -348,3 +341,26 @@ func copyVolumePodName(clusterName string) string {
|
|||
func makePVCName(clusterName string) string {
|
||||
return fmt.Sprintf("%s-pvc", clusterName)
|
||||
}
|
||||
|
||||
func BackupServiceAddr(clusterName string) string {
|
||||
return fmt.Sprintf("%s:%d", BackupSidecarName(clusterName), constants.DefaultBackupPodHTTPPort)
|
||||
}
|
||||
|
||||
func BackupSidecarName(clusterName string) string {
|
||||
return fmt.Sprintf("%s-backup-sidecar", clusterName)
|
||||
}
|
||||
|
||||
func BackupSidecarLabels(clusterName string) map[string]string {
|
||||
return map[string]string{
|
||||
"app": BackupPodSelectorAppField,
|
||||
"etcd_cluster": clusterName,
|
||||
}
|
||||
}
|
||||
|
||||
func CloneDeployment(d *appsv1beta1.Deployment) *appsv1beta1.Deployment {
|
||||
cd, err := api.Scheme.DeepCopy(d)
|
||||
if err != nil {
|
||||
panic("cannot deep copy pod")
|
||||
}
|
||||
return cd.(*appsv1beta1.Deployment)
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
|
||||
"github.com/coreos/etcd-operator/pkg/backup/backupapi"
|
||||
"github.com/coreos/etcd-operator/pkg/spec"
|
||||
"github.com/coreos/etcd-operator/pkg/util/constants"
|
||||
"github.com/coreos/etcd-operator/pkg/util/etcdutil"
|
||||
"github.com/coreos/etcd-operator/pkg/util/retryutil"
|
||||
|
||||
|
@ -116,14 +115,6 @@ func PodWithNodeSelector(p *v1.Pod, ns map[string]string) *v1.Pod {
|
|||
return p
|
||||
}
|
||||
|
||||
func BackupServiceAddr(clusterName string) string {
|
||||
return fmt.Sprintf("%s:%d", BackupServiceName(clusterName), constants.DefaultBackupPodHTTPPort)
|
||||
}
|
||||
|
||||
func BackupServiceName(clusterName string) string {
|
||||
return fmt.Sprintf("%s-backup-sidecar", clusterName)
|
||||
}
|
||||
|
||||
func CreateClientService(kubecli kubernetes.Interface, clusterName, ns string, owner metav1.OwnerReference) error {
|
||||
return createService(kubecli, ClientServiceName(clusterName), clusterName, ns, "", 2379, owner)
|
||||
}
|
||||
|
@ -342,11 +333,11 @@ func IsKubernetesResourceNotFoundError(err error) bool {
|
|||
// We are using internal api types for cluster related.
|
||||
func ClusterListOpt(clusterName string) metav1.ListOptions {
|
||||
return metav1.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(newLablesForCluster(clusterName)).String(),
|
||||
LabelSelector: labels.SelectorFromSet(LabelsForCluster(clusterName)).String(),
|
||||
}
|
||||
}
|
||||
|
||||
func newLablesForCluster(clusterName string) map[string]string {
|
||||
func LabelsForCluster(clusterName string) map[string]string {
|
||||
return map[string]string{
|
||||
"etcd_cluster": clusterName,
|
||||
"app": "etcd",
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
appsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
|
@ -109,7 +108,7 @@ func (f *Framework) UpgradeOperator() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cd := cloneDeployment(d)
|
||||
cd := k8sutil.CloneDeployment(d)
|
||||
cd.Spec.Template.Spec.Containers[0].Image = image
|
||||
patchData, err := k8sutil.CreatePatch(d, cd, appsv1beta1.Deployment{})
|
||||
if err != nil {
|
||||
|
@ -118,11 +117,3 @@ func (f *Framework) UpgradeOperator() error {
|
|||
_, err = f.KubeCli.AppsV1beta1().Deployments(f.KubeNS).Patch(d.Name, types.StrategicMergePatchType, patchData)
|
||||
return err
|
||||
}
|
||||
|
||||
func cloneDeployment(d *appsv1beta1.Deployment) *appsv1beta1.Deployment {
|
||||
cd, err := api.Scheme.DeepCopy(d)
|
||||
if err != nil {
|
||||
panic("cannot deep copy pod")
|
||||
}
|
||||
return cd.(*appsv1beta1.Deployment)
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
|
@ -284,18 +285,18 @@ func waitResourcesDeleted(t *testing.T, f *framework.Framework, c *spec.Cluster)
|
|||
}
|
||||
|
||||
func waitBackupDeleted(f *framework.Framework, c *spec.Cluster) error {
|
||||
err := retryutil.Retry(5*time.Second, 5, func() (done bool, err error) {
|
||||
rl, err := f.KubeClient.Extensions().ReplicaSets(f.Namespace).List(k8sutil.ClusterListOpt(c.Metadata.Name))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(rl.Items) > 0 {
|
||||
err := retryutil.Retry(5*time.Second, 5, func() (bool, error) {
|
||||
_, err := f.KubeClient.AppsV1beta1().Deployments(f.Namespace).Get(k8sutil.BackupSidecarName(c.Metadata.Name), metav1.GetOptions{})
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
if apierrors.IsNotFound(err) {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait backup RS deleted: %v", err)
|
||||
return fmt.Errorf("failed to wait backup Deployment deleted: %v", err)
|
||||
}
|
||||
err = retryutil.Retry(5*time.Second, 2, func() (done bool, err error) {
|
||||
ls := labels.SelectorFromSet(map[string]string{
|
||||
|
|
Загрузка…
Ссылка в новой задаче