Add etcdRecovery maintenance type for admin update - ARO-1534

In the event that a master node changes IP addresses (or NIC's) the etcd
quorum will become degraded. The node with the change will then have
it's etcd pod in a crashloop. This is due to the hardcoded etcd spec.

This PR adds the remediation type EtcdRecovery maintenance task to
remediate this issue.

How it works:
  1. Verify this is the issue by comparing etcd's env variables to the
     node's IP address. a degradedEtcd object is returned with relevant
information.
  1. Create a batch job to backup etcd's data directory and move the
     etcd manifest to stop the pod from crash looping.
  1. A batch job is created to run a pod that ssh's into the peer etcd
     container's to remove the failing node from it's member list.
  1. Secret's for the failing pod are deleted
  1. Etcd is patched

Currently there is no endpoint to access this recovery task yet. An
endpoint will be added in a later PR.

Additional scenarios handled:

  - Sometimes the etcd deployement can remediate itself after an IP address change, but there is still data present from the previous IP address\'s member. This results in 4/5 containers running in the pod with the etcd container failing, but no IP address conflicts to use for remediation. Added code to find the failing member based on the conditions if no conflict is found
  - Check for multiple etcd pods with IP mismatches
  - Wait for jobs to reach a succeeded state, when the shell script
    exits with code 0. If this never happens the context is cancelled.
  - Return container log files to user from jobs
This commit is contained in:
Steven Fairchild 2022-12-14 14:16:46 -05:00 коммит произвёл Caden Marchese
Родитель 18d60cd7ad
Коммит 6c945b07bf
11 изменённых файлов: 1900 добавлений и 9 удалений

Просмотреть файл

@ -0,0 +1,74 @@
package frontend
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"net/http"
"path/filepath"
"strings"
"github.com/go-chi/chi/v5"
operatorclient "github.com/openshift/client-go/operator/clientset/versioned"
"github.com/sirupsen/logrus"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/frontend/middleware"
"github.com/Azure/ARO-RP/pkg/util/restconfig"
)
func (f *frontend) postAdminOpenShiftClusterEtcdRecovery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := ctx.Value(middleware.ContextKeyLog).(*logrus.Entry)
r.URL.Path = filepath.Dir(r.URL.Path)
b, err := f._postAdminOpenShiftClusterEtcdRecovery(ctx, r, log)
if err == nil {
w.Header().Set("Content-Type", "text/plain")
}
adminReply(log, w, nil, b, err)
}
// TODO write integration test that skips f.fixEtcd
func (f *frontend) _postAdminOpenShiftClusterEtcdRecovery(ctx context.Context, r *http.Request, log *logrus.Entry) ([]byte, error) {
resType, resName, resGroupName := chi.URLParam(r, "resourceType"), chi.URLParam(r, "resourceName"), chi.URLParam(r, "resourceGroupName")
resourceID := strings.TrimPrefix(r.URL.Path, "/admin")
doc, err := f.dbOpenShiftClusters.Get(ctx, resourceID)
switch {
case cosmosdb.IsErrorStatusCode(err, http.StatusNotFound):
return []byte{}, api.NewCloudError(http.StatusNotFound, api.CloudErrorCodeResourceNotFound, "", "The Resource '%s/%s' under resource group '%s' was not found.", resType, resName, resGroupName)
case err != nil:
return []byte{}, err
}
kubeActions, err := f.kubeActionsFactory(log, f.env, doc.OpenShiftCluster)
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
gvr, err := kubeActions.ResolveGVR("Etcd")
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
err = validateAdminKubernetesObjects(r.Method, gvr, namespaceEtcds, "cluster")
if err != nil {
return []byte{}, err
}
restConfig, err := restconfig.RestConfig(f.env, doc.OpenShiftCluster)
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
operatorcli, err := operatorclient.NewForConfig(restConfig)
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
return f.fixEtcd(ctx, log, f.env, doc, kubeActions, operatorcli.OperatorV1().Etcds())
}

Просмотреть файл

@ -0,0 +1,198 @@
package frontend
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"testing"
operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1"
operatorv1fake "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1/fake"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
kschema "k8s.io/apimachinery/pkg/runtime/schema"
ktesting "k8s.io/client-go/testing"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/frontend/adminactions"
"github.com/Azure/ARO-RP/pkg/metrics/noop"
mock_adminactions "github.com/Azure/ARO-RP/pkg/util/mocks/adminactions"
)
func fakeRecoveryDoc(privateEndpoint bool, resourceID, resourceName string) *api.OpenShiftClusterDocument {
netProfile := api.NetworkProfile{}
if privateEndpoint {
netProfile = api.NetworkProfile{
APIServerPrivateEndpointIP: "0.0.0.0",
}
}
doc := &api.OpenShiftClusterDocument{
Key: strings.ToLower(resourceID),
OpenShiftCluster: &api.OpenShiftCluster{
ID: resourceID,
Name: resourceName,
Type: "Microsoft.RedHatOpenShift/openshiftClusters",
Properties: api.OpenShiftClusterProperties{
NetworkProfile: netProfile,
AdminKubeconfig: api.SecureBytes(`apiVersion: v1
kind: Config
clusters:
- cluster:
server: https://server
name: cluster
`),
KubeadminPassword: api.SecureString("p"),
InfraID: "zfsbk",
},
},
}
return doc
}
func TestAdminEtcdRecovery(t *testing.T) {
const (
resourceName = "cluster"
mockSubID = "00000000-0000-0000-0000-000000000000"
mockTenantID = mockSubID
method = http.MethodPost
)
ctx := context.Background()
resourceID := fmt.Sprintf("/subscriptions/%s/resourcegroups/resourceGroup/providers/Microsoft.RedHatOpenShift/openShiftClusters/%s", mockSubID, resourceName)
gvk := &kschema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "Etcd",
}
type test struct {
name string
mocks func(ctx context.Context, ti *testInfra, k *mock_adminactions.MockKubeActions, log *logrus.Entry, env env.Interface, doc *api.OpenShiftClusterDocument, pods *corev1.PodList, etcdcli operatorv1client.EtcdInterface)
wantStatusCode int
wantResponse []byte
wantResponseContentType string
wantError string
doc *api.OpenShiftClusterDocument
kubeActionsFactory func(*logrus.Entry, env.Interface, *api.OpenShiftCluster) (adminactions.KubeActions, error)
}
for _, tt := range []*test{
{
name: "fail: parse group kind resource",
wantStatusCode: http.StatusInternalServerError,
wantResponseContentType: "application/json",
wantError: "500: InternalServerError: : failed to parse resource",
doc: fakeRecoveryDoc(true, resourceID, resourceName),
mocks: func(ctx context.Context, ti *testInfra, k *mock_adminactions.MockKubeActions, log *logrus.Entry, env env.Interface, doc *api.OpenShiftClusterDocument, pods *corev1.PodList, etcdcli operatorv1client.EtcdInterface) {
k.EXPECT().ResolveGVR("Etcd").Times(1).Return(gvk, errors.New("failed to parse resource"))
},
},
{
name: "fail: validate kubernetes objects",
wantStatusCode: http.StatusBadRequest,
wantResponseContentType: "application/json",
wantError: "400: InvalidParameter: : The provided resource is invalid.",
doc: fakeRecoveryDoc(true, resourceID, resourceName),
mocks: func(ctx context.Context, ti *testInfra, k *mock_adminactions.MockKubeActions, log *logrus.Entry, env env.Interface, doc *api.OpenShiftClusterDocument, pods *corev1.PodList, etcdcli operatorv1client.EtcdInterface) {
k.EXPECT().ResolveGVR("Etcd").Times(1).Return(nil, nil)
},
},
{
name: "fail: privateEndpointIP cannot be empty",
wantStatusCode: http.StatusInternalServerError,
wantResponseContentType: "application/json",
wantError: "500: InternalServerError: : privateEndpointIP is empty",
doc: fakeRecoveryDoc(false, resourceID, resourceName),
mocks: func(ctx context.Context, ti *testInfra, k *mock_adminactions.MockKubeActions, log *logrus.Entry, env env.Interface, doc *api.OpenShiftClusterDocument, pods *corev1.PodList, etcdcli operatorv1client.EtcdInterface) {
k.EXPECT().ResolveGVR("Etcd").Times(1).Return(gvk, nil)
},
},
{
name: "fail: kubeActionsFactory error",
wantStatusCode: http.StatusInternalServerError,
wantResponseContentType: "application/json",
wantError: "500: InternalServerError: : failed to create kubeactions",
doc: fakeRecoveryDoc(true, resourceID, resourceName),
kubeActionsFactory: func(*logrus.Entry, env.Interface, *api.OpenShiftCluster) (adminactions.KubeActions, error) {
return nil, errors.New("failed to create kubeactions")
},
},
} {
t.Run(fmt.Sprintf("%s: %s", method, tt.name), func(t *testing.T) {
ti := newTestInfra(t).WithOpenShiftClusters().WithSubscriptions()
defer ti.done()
ti.fixture.AddOpenShiftClusterDocuments(tt.doc)
ti.fixture.AddSubscriptionDocuments(&api.SubscriptionDocument{
ID: mockSubID,
Subscription: &api.Subscription{
State: api.SubscriptionStateRegistered,
Properties: &api.SubscriptionProperties{
TenantID: mockTenantID,
},
},
})
err := ti.buildFixtures(nil)
if err != nil {
t.Fatal(err)
}
k := mock_adminactions.NewMockKubeActions(ti.controller)
if tt.mocks != nil {
tt.mocks(ctx, ti, k, ti.log, ti.env, tt.doc, newEtcdPods(t, tt.doc, false, false, false), &operatorv1fake.FakeEtcds{
Fake: &operatorv1fake.FakeOperatorV1{
Fake: &ktesting.Fake{},
},
})
}
kubeActionsFactory := func(*logrus.Entry, env.Interface, *api.OpenShiftCluster) (adminactions.KubeActions, error) {
return k, nil
}
if tt.kubeActionsFactory != nil {
kubeActionsFactory = tt.kubeActionsFactory
}
f, err := NewFrontend(ctx,
ti.audit,
ti.log,
ti.env,
ti.asyncOperationsDatabase,
ti.clusterManagerDatabase,
ti.openShiftClustersDatabase,
ti.subscriptionsDatabase,
nil,
api.APIs,
&noop.Noop{},
nil,
nil,
kubeActionsFactory,
nil,
ti.enricher)
if err != nil {
t.Fatal(err)
}
go f.Run(ctx, nil, nil)
resp, b, err := ti.request(method,
fmt.Sprintf("https://server/admin%s/etcdrecovery?api-version=admin", resourceID),
nil, nil)
if err != nil {
t.Fatal(err)
}
err = validateResponse(resp, b, tt.wantStatusCode, tt.wantError, tt.wantResponse)
if err != nil {
t.Error(err)
}
if tt.wantResponseContentType != resp.Header.Get("Content-Type") {
t.Error(fmt.Errorf("unexpected \"Content-Type\" response header value \"%s\", wanted \"%s\"", resp.Header.Get("Content-Type"), tt.wantResponseContentType))
}
})
}
}

Просмотреть файл

@ -129,7 +129,7 @@ func (f *frontend) _deleteAdminKubernetesObjects(ctx context.Context, r *http.Re
return err
}
return k.KubeDelete(ctx, groupKind, namespace, name, force)
return k.KubeDelete(ctx, groupKind, namespace, name, force, nil)
}
func (f *frontend) postAdminKubernetesObjects(w http.ResponseWriter, r *http.Request) {

Просмотреть файл

@ -107,7 +107,7 @@ func TestAdminKubernetesObjectsGetAndDelete(t *testing.T) {
objName: "config",
mocks: func(tt *test, k *mock_adminactions.MockKubeActions) {
k.EXPECT().
KubeDelete(gomock.Any(), tt.objKind, tt.objNamespace, tt.objName, false).
KubeDelete(gomock.Any(), tt.objKind, tt.objNamespace, tt.objName, false, nil).
Return(nil)
k.EXPECT().ResolveGVR(tt.objKind).Return(&schema.GroupVersionResource{Resource: "configmaps"}, nil)
},
@ -123,7 +123,7 @@ func TestAdminKubernetesObjectsGetAndDelete(t *testing.T) {
force: "true",
mocks: func(tt *test, k *mock_adminactions.MockKubeActions) {
k.EXPECT().
KubeDelete(gomock.Any(), tt.objKind, tt.objNamespace, tt.objName, true).
KubeDelete(gomock.Any(), tt.objKind, tt.objNamespace, tt.objName, true, nil).
Return(nil)
k.EXPECT().ResolveGVR(tt.objKind).Return(&schema.GroupVersionResource{Resource: "pods"}, nil)
},

Просмотреть файл

@ -15,6 +15,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
@ -29,7 +30,7 @@ type KubeActions interface {
KubeGet(ctx context.Context, groupKind, namespace, name string) ([]byte, error)
KubeList(ctx context.Context, groupKind, namespace string) ([]byte, error)
KubeCreateOrUpdate(ctx context.Context, obj *unstructured.Unstructured) error
KubeDelete(ctx context.Context, groupKind, namespace, name string, force bool) error
KubeDelete(ctx context.Context, groupKind, namespace, name string, force bool, propagationPolicy *metav1.DeletionPropagation) error
ResolveGVR(groupKind string) (*schema.GroupVersionResource, error)
CordonNode(ctx context.Context, nodeName string, unschedulable bool) error
DrainNode(ctx context.Context, nodeName string) error
@ -37,6 +38,8 @@ type KubeActions interface {
ApproveAllCsrs(ctx context.Context) error
Upgrade(ctx context.Context, upgradeY bool) error
KubeGetPodLogs(ctx context.Context, namespace, name, containerName string) ([]byte, error)
// kubeWatch returns a watch object for the provided label selector key
KubeWatch(ctx context.Context, o *unstructured.Unstructured, label string) (watch.Interface, error)
}
type kubeActions struct {
@ -149,7 +152,26 @@ func (k *kubeActions) KubeCreateOrUpdate(ctx context.Context, o *unstructured.Un
return err
}
func (k *kubeActions) KubeDelete(ctx context.Context, groupKind, namespace, name string, force bool) error {
func (k *kubeActions) KubeWatch(ctx context.Context, o *unstructured.Unstructured, labelKey string) (watch.Interface, error) {
gvr, err := k.gvrResolver.Resolve(o.GroupVersionKind().GroupKind().String(), o.GroupVersionKind().Version)
if err != nil {
return nil, err
}
listOpts := metav1.ListOptions{
Limit: 1000, // just in case
LabelSelector: o.GetLabels()[labelKey],
}
w, err := k.dyn.Resource(*gvr).Namespace(o.GetNamespace()).Watch(ctx, listOpts)
if err != nil {
return nil, err
}
return w, nil
}
func (k *kubeActions) KubeDelete(ctx context.Context, groupKind, namespace, name string, force bool, propagationPolicy *metav1.DeletionPropagation) error {
gvr, err := k.gvrResolver.Resolve(groupKind, "")
if err != nil {
return err
@ -160,5 +182,9 @@ func (k *kubeActions) KubeDelete(ctx context.Context, groupKind, namespace, name
resourceDeleteOptions.GracePeriodSeconds = to.Int64Ptr(0)
}
if propagationPolicy != nil {
resourceDeleteOptions.PropagationPolicy = propagationPolicy
}
return k.dyn.Resource(*gvr).Namespace(namespace).Delete(ctx, name, resourceDeleteOptions)
}

739
pkg/frontend/fixetcd.go Normal file
Просмотреть файл

@ -0,0 +1,739 @@
package frontend
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"bytes"
"context"
_ "embed"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"time"
"github.com/Azure/go-autorest/autorest/to"
operatorv1 "github.com/openshift/api/operator/v1"
securityv1 "github.com/openshift/api/security/v1"
operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1"
"github.com/sirupsen/logrus"
"github.com/ugorji/go/codec"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/frontend/adminactions"
)
type degradedEtcd struct {
Node string
Pod string
NewIP string
OldIP string
}
const (
serviceAccountName = "etcd-recovery-privileged"
kubeServiceAccount = "system:serviceaccount" + namespaceEtcds + ":" + serviceAccountName
namespaceEtcds = "openshift-etcd"
image = "ubi8/ubi-minimal"
jobName = "etcd-recovery-"
patchOverides = "unsupportedConfigOverrides:"
patchDisableOverrides = `{"useUnsupportedUnsafeNonHANonProductionUnstableEtcd": true}`
)
// fixEtcd performs a single master node etcd recovery based on these steps and scenarios:
// https://docs.openshift.com/container-platform/4.10/backup_and_restore/control_plane_backup_and_restore/replacing-unhealthy-etcd-member.html
func (f *frontend) fixEtcd(ctx context.Context, log *logrus.Entry, env env.Interface, doc *api.OpenShiftClusterDocument, kubeActions adminactions.KubeActions, etcdcli operatorv1client.EtcdInterface) ([]byte, error) {
log.Info("Starting Etcd Recovery now")
log.Infof("Listing etcd pods now")
rawPods, err := kubeActions.KubeList(ctx, "Pod", namespaceEtcds)
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
pods := &corev1.PodList{}
err = codec.NewDecoderBytes(rawPods, &codec.JsonHandle{}).Decode(pods)
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", fmt.Sprintf("failed to decode pods, %s", err.Error()))
}
de, err := findDegradedEtcd(log, pods)
if err != nil {
return []byte{}, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
log.Infof("Found degraded endpoint: %v", de)
backupContainerLogs, err := backupEtcdData(ctx, log, doc.OpenShiftCluster.Name, de.Node, kubeActions)
if err != nil {
return backupContainerLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
fixPeersContainerLogs, err := fixPeers(ctx, log, de, pods, kubeActions, doc.OpenShiftCluster.Name)
allLogs, _ := logSeperator(backupContainerLogs, fixPeersContainerLogs)
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
rawEtcd, err := kubeActions.KubeGet(ctx, "Etcd", "", "cluster")
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
log.Info("Getting etcd operating now")
etcd := &operatorv1.Etcd{}
err = codec.NewDecoderBytes(rawEtcd, &codec.JsonHandle{}).Decode(etcd)
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", fmt.Sprintf("failed to decode etcd operator, %s", err.Error()))
}
existingOverrides := etcd.Spec.UnsupportedConfigOverrides.Raw
etcd.Spec.UnsupportedConfigOverrides = kruntime.RawExtension{
Raw: []byte(patchDisableOverrides),
}
err = patchEtcd(ctx, log, etcdcli, etcd, patchDisableOverrides)
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
err = deleteSecrets(ctx, log, kubeActions, de, doc.OpenShiftCluster.Properties.InfraID)
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
etcd.Spec.ForceRedeploymentReason = fmt.Sprintf("single-master-recovery-%s", time.Now())
err = patchEtcd(ctx, log, etcdcli, etcd, etcd.Spec.ForceRedeploymentReason)
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
etcd.Spec.OperatorSpec.UnsupportedConfigOverrides.Raw = existingOverrides
err = patchEtcd(ctx, log, etcdcli, etcd, patchOverides+string(etcd.Spec.OperatorSpec.UnsupportedConfigOverrides.Raw))
if err != nil {
return allLogs, api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
}
return allLogs, nil
}
func logSeperator(log1, log2 []byte) ([]byte, error) {
logSeperator := "\n" + strings.Repeat("#", 150) + "\n"
allLogs := append(log1, []byte(logSeperator)...)
allLogs = append(allLogs, log2...)
buf := &bytes.Buffer{}
return buf.Bytes(), codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(allLogs)
}
// patchEtcd patches the etcd object provided and logs the patch string
func patchEtcd(ctx context.Context, log *logrus.Entry, etcdcli operatorv1client.EtcdInterface, e *operatorv1.Etcd, patch string) error {
log.Infof("Preparing to patch etcd %s with %s", e.Name, patch)
// must be removed to force redeployment
e.CreationTimestamp = metav1.Time{
Time: time.Now(),
}
e.ResourceVersion = ""
e.SelfLink = ""
e.UID = ""
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(e)
if err != nil {
return err
}
_, err = etcdcli.Patch(ctx, e.Name, types.MergePatchType, buf.Bytes(), metav1.PatchOptions{})
if err != nil {
return err
}
log.Infof("Patched etcd %s with %s", e.Name, patch)
return nil
}
func deleteSecrets(ctx context.Context, log *logrus.Entry, kubeActions adminactions.KubeActions, de *degradedEtcd, infraID string) error {
for _, prefix := range []string{"etcd-peer-", "etcd-serving-", "etcd-serving-metrics-"} {
secret := prefix + de.Node
log.Infof("Deleting secret %s", secret)
err := kubeActions.KubeDelete(ctx, "Secret", namespaceEtcds, secret, false, nil)
if err != nil {
return err
}
}
return nil
}
func getPeerPods(pods []corev1.Pod, de *degradedEtcd, cluster string) (string, error) {
regNode, err := regexp.Compile(".master-[0-9]$")
if err != nil {
return "", err
}
regPod, err := regexp.Compile("etcd-" + cluster + "-[0-9A-Za-z]*-master-[0-9]$")
if err != nil {
return "", err
}
var peerPods string
for _, p := range pods {
if regNode.MatchString(p.Spec.NodeName) &&
regPod.MatchString(p.Name) &&
p.Name != de.Pod {
peerPods += p.Name + " "
}
}
return peerPods, nil
}
func newJobFixPeers(cluster, peerPods, deNode string) *unstructured.Unstructured {
const jobNameFixPeers = jobName + "fix-peers"
// Frontend kubeactions expects an unstructured type
jobFixPeers := &unstructured.Unstructured{
Object: map[string]interface{}{
"objectMeta": map[string]interface{}{
"name": jobNameFixPeers,
"namespace": namespaceEtcds,
"labels": map[string]string{"app": jobNameFixPeers},
},
"spec": map[string]interface{}{
"template": map[string]interface{}{
"objectMeta": map[string]interface{}{
"name": jobNameFixPeers,
"namespace": namespaceEtcds,
"labels": map[string]string{"app": jobNameFixPeers},
},
"activeDeadlineSeconds": to.Int64Ptr(10),
"completions": to.Int32Ptr(1),
"ttlSecondsAfterFinished": to.Int32Ptr(300),
"spec": map[string]interface{}{
"restartPolicy": corev1.RestartPolicyOnFailure,
"serviceAccountName": serviceAccountName,
"containers": []corev1.Container{
{
Name: jobNameFixPeers,
Image: image,
Command: []string{
"/bin/bash",
"-cx",
backupOrFixEtcd,
},
SecurityContext: &corev1.SecurityContext{
Privileged: to.BoolPtr(true),
},
Env: []corev1.EnvVar{
{
Name: "PEER_PODS",
Value: peerPods,
},
{
Name: "DEGRADED_NODE",
Value: deNode,
},
{
Name: "FIX_PEERS",
Value: "true",
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "host",
MountPath: "/host",
ReadOnly: false,
},
},
},
},
"volumes": []corev1.Volume{
{
Name: "host",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/",
},
},
},
},
},
},
},
},
}
// This creates an embedded "metadata" map[string]string{} in the unstructured object
// For an unknown reason, creating "metadata" directly in the object doesn't work
// and the helper functions must be used
jobFixPeers.SetKind("Job")
jobFixPeers.SetAPIVersion("batch/v1")
jobFixPeers.SetName(jobNameFixPeers)
jobFixPeers.SetNamespace(namespaceEtcds)
jobFixPeers.SetClusterName(cluster)
return jobFixPeers
}
// fixPeers creates a job that ssh's into the failing pod's peer pods, and deletes the failing pod from it's member's list
func fixPeers(ctx context.Context, log *logrus.Entry, de *degradedEtcd, pods *corev1.PodList, kubeActions adminactions.KubeActions, cluster string) ([]byte, error) {
peerPods, err := getPeerPods(pods.Items, de, cluster)
if err != nil {
return []byte{}, err
}
jobFixPeers := newJobFixPeers(cluster, peerPods, de.Node)
cleanup, err, nestedCleanupErr := createPrivilegedServiceAccount(ctx, log, serviceAccountName, cluster, kubeServiceAccount, kubeActions)
if err != nil || nestedCleanupErr != nil {
return []byte{}, fmt.Errorf("%s %s", err, nestedCleanupErr)
}
defer cleanup()
log.Infof("Creating job %s", jobFixPeers.GetName())
err = kubeActions.KubeCreateOrUpdate(ctx, jobFixPeers)
if err != nil {
return []byte{}, err
}
watcher, err := kubeActions.KubeWatch(ctx, jobFixPeers, "app")
if err != nil {
return []byte{}, err
}
containerLogs, err := waitForJobSucceed(ctx, log, watcher, jobFixPeers, kubeActions)
if err != nil {
return containerLogs, err
}
log.Infof("Deleting %s now", jobFixPeers.GetName())
propPolicy := metav1.DeletePropagationBackground
err = kubeActions.KubeDelete(ctx, "Job", namespaceEtcds, jobFixPeers.GetName(), true, &propPolicy)
if err != nil {
return containerLogs, err
}
// return errors from deferred delete functions
return containerLogs, err
}
func newServiceAccount(name, cluster string) *unstructured.Unstructured {
serviceAcc := &unstructured.Unstructured{
Object: map[string]interface{}{
"automountServiceAccountToken": to.BoolPtr(true),
},
}
serviceAcc.SetAPIVersion("v1")
serviceAcc.SetKind("ServiceAccount")
serviceAcc.SetName(name)
serviceAcc.SetNamespace(namespaceEtcds)
serviceAcc.SetClusterName(cluster)
return serviceAcc
}
func newClusterRole(usersAccount, cluster string) *unstructured.Unstructured {
clusterRole := &unstructured.Unstructured{
Object: map[string]interface{}{
"rules": []rbacv1.PolicyRule{
{
Verbs: []string{"get", "create"},
Resources: []string{"pods", "pods/exec"},
APIGroups: []string{""},
},
},
},
}
// Cluster Role isn't scoped to a namespace
clusterRole.SetAPIVersion("rbac.authorization.k8s.io/v1")
clusterRole.SetKind("ClusterRole")
clusterRole.SetName(usersAccount)
clusterRole.SetClusterName(cluster)
return clusterRole
}
func newClusterRoleBinding(name, cluster string) *unstructured.Unstructured {
crb := &unstructured.Unstructured{
Object: map[string]interface{}{
"roleRef": map[string]interface{}{
"kind": "ClusterRole",
"name": kubeServiceAccount,
"apiGroups": "",
},
"subjects": []rbacv1.Subject{
{
Kind: "ServiceAccount",
Name: name,
Namespace: namespaceEtcds,
},
},
},
}
crb.SetAPIVersion("rbac.authorization.k8s.io/v1")
crb.SetKind("ClusterRoleBinding")
crb.SetName(name)
crb.SetClusterName(cluster)
return crb
}
func newSecurityContextConstraint(name, cluster, usersAccount string) *unstructured.Unstructured {
scc := &unstructured.Unstructured{
Object: map[string]interface{}{
"groups": []string{},
"users": []string{usersAccount},
"allowPrivilegedContainer": true,
"allowPrivilegeEscalation": to.BoolPtr(true),
"allowedCapabilities": []corev1.Capability{"*"},
"runAsUser": map[string]securityv1.RunAsUserStrategyType{
"type": securityv1.RunAsUserStrategyRunAsAny,
},
"seLinuxContext": map[string]securityv1.SELinuxContextStrategyType{
"type": securityv1.SELinuxStrategyRunAsAny,
},
},
}
scc.SetAPIVersion("security.openshift.io/v1")
scc.SetKind("SecurityContextConstraints")
scc.SetName(name)
scc.SetClusterName(cluster)
return scc
}
// createPrivilegedServiceAccount creates the following objects and returns a cleanup function to delete them all after use
//
// - ServiceAccount
//
// - ClusterRole
//
// - ClusterRoleBinding
//
// - SecurityContextConstraint
func createPrivilegedServiceAccount(ctx context.Context, log *logrus.Entry, name, cluster, usersAccount string, kubeActions adminactions.KubeActions) (func() error, error, error) {
serviceAcc := newServiceAccount(name, cluster)
clusterRole := newClusterRole(usersAccount, cluster)
crb := newClusterRoleBinding(name, cluster)
scc := newSecurityContextConstraint(name, cluster, usersAccount)
// cleanup is created here incase an error occurs while creating permissions
cleanup := func() error {
log.Infof("Deleting service account %s now", serviceAcc.GetName())
err := kubeActions.KubeDelete(ctx, serviceAcc.GetKind(), serviceAcc.GetNamespace(), serviceAcc.GetName(), true, nil)
if err != nil {
return err
}
log.Infof("Deleting security context contstraint %s now", scc.GetName())
err = kubeActions.KubeDelete(ctx, scc.GetKind(), scc.GetNamespace(), scc.GetName(), true, nil)
if err != nil {
return err
}
log.Infof("Deleting cluster role %s now", clusterRole.GetName())
err = kubeActions.KubeDelete(ctx, clusterRole.GetKind(), clusterRole.GetNamespace(), clusterRole.GetName(), true, nil)
if err != nil {
return err
}
log.Infof("Deleting cluster role binding %s now", crb.GetName())
err = kubeActions.KubeDelete(ctx, crb.GetKind(), crb.GetNamespace(), crb.GetName(), true, nil)
if err != nil {
return err
}
return nil
}
log.Infof("Creating Service Account %s now", serviceAcc.GetName())
err := kubeActions.KubeCreateOrUpdate(ctx, serviceAcc)
if err != nil {
return nil, err, cleanup()
}
log.Infof("Creating Cluster Role %s now", clusterRole.GetName())
err = kubeActions.KubeCreateOrUpdate(ctx, clusterRole)
if err != nil {
return nil, err, cleanup()
}
log.Infof("Creating Cluster Role Binding %s now", crb.GetName())
err = kubeActions.KubeCreateOrUpdate(ctx, crb)
if err != nil {
return nil, err, cleanup()
}
log.Infof("Creating Security Context Constraint %s now", name)
err = kubeActions.KubeCreateOrUpdate(ctx, scc)
if err != nil {
return nil, err, cleanup()
}
return cleanup, nil, nil
}
// backupEtcdData creates a job that creates two backups on the node
//
// /etc/kubernetes/manifests/etcd-pod.yaml is moved to /var/lib/etcd-backup/etcd-pod.yaml
// the purpose of this is to stop the failing etcd pod from crashlooping by removing the manifest
//
// The second backup
// /var/lib/etcd is moved to /tmp
//
// If backups already exists the job is cowardly and refuses to overwrite them
func backupEtcdData(ctx context.Context, log *logrus.Entry, cluster, node string, kubeActions adminactions.KubeActions) ([]byte, error) {
jobDataBackup := createBackupEtcdDataJob(cluster, node)
log.Infof("Creating job %s", jobDataBackup.GetName())
err := kubeActions.KubeCreateOrUpdate(ctx, jobDataBackup)
if err != nil {
return []byte{}, err
}
log.Infof("Job %s has been created", jobDataBackup.GetName())
watcher, err := kubeActions.KubeWatch(ctx, jobDataBackup, "app")
if err != nil {
return []byte{}, err
}
containerLogs, err := waitForJobSucceed(ctx, log, watcher, jobDataBackup, kubeActions)
if err != nil {
return containerLogs, err
}
log.Infof("Deleting job %s now", jobDataBackup.GetName())
propPolicy := metav1.DeletePropagationBackground
return containerLogs, kubeActions.KubeDelete(ctx, "Job", namespaceEtcds, jobDataBackup.GetName(), true, &propPolicy)
}
func waitForJobSucceed(ctx context.Context, log *logrus.Entry, watcher watch.Interface, o *unstructured.Unstructured, k adminactions.KubeActions) ([]byte, error) {
var waitErr error
log.Infof("Waiting for %s to reach %s phase", o.GetName(), corev1.PodSucceeded)
select {
case event := <-watcher.ResultChan():
pod := event.Object.(*corev1.Pod)
if pod.Status.Phase == corev1.PodSucceeded {
log.Infof("Job %s completed with %s", pod.GetName(), pod.Status.Message)
} else if pod.Status.Phase == corev1.PodFailed {
log.Infof("Job %s reached phase %s with message: %s", pod.GetName(), pod.Status.Phase, pod.Status.Message)
waitErr = fmt.Errorf("pod %s event %s received with message %s", pod.Name, pod.Status.Phase, pod.Status.Message)
}
case <-ctx.Done():
waitErr = fmt.Errorf("context was cancelled while waiting for %s because %s", o.GetName(), ctx.Err())
}
// get container name
cxName := o.UnstructuredContent()["spec"].(map[string]interface{})["template"].(map[string]interface{})["spec"].(map[string]interface{})["containers"].([]corev1.Container)[0].Name
log.Infof("Collecting container logs for Pod %s, container %s, in namespace %s", o.GetName(), cxName, o.GetNamespace())
cxLogs, err := k.KubeGetPodLogs(ctx, o.GetNamespace(), o.GetName(), cxName)
if err != nil {
return cxLogs, err
}
log.Infof("Successfully collected logs for %s", o.GetName())
return cxLogs, waitErr
}
func createBackupEtcdDataJob(cluster, node string) *unstructured.Unstructured {
const jobNameDataBackup = jobName + "data-backup"
j := &unstructured.Unstructured{
Object: map[string]interface{}{
"objectMeta": map[string]interface{}{
"name": jobNameDataBackup,
"kind": "Job",
"namespace": namespaceEtcds,
"labels": map[string]string{"app": jobNameDataBackup},
},
"spec": map[string]interface{}{
"template": map[string]interface{}{
"objectMeta": map[string]interface{}{
"name": jobNameDataBackup,
"namespace": namespaceEtcds,
"labels": map[string]string{"app": jobNameDataBackup},
},
"activeDeadlineSeconds": to.Int64Ptr(10),
"completions": to.Int32Ptr(1),
"ttlSecondsAfterFinished": to.Int32Ptr(300),
"spec": map[string]interface{}{
"restartPolicy": corev1.RestartPolicyOnFailure,
"nodeName": node,
"containers": []corev1.Container{
{
Name: jobNameDataBackup,
Image: image,
Command: []string{
"chroot",
"/host",
"/bin/bash",
"-c",
backupOrFixEtcd,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "host",
MountPath: "/host",
ReadOnly: false,
},
},
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Add: []corev1.Capability{"SYS_CHROOT"},
},
Privileged: to.BoolPtr(true),
},
Env: []corev1.EnvVar{
{
Name: "BACKUP",
Value: "true",
},
},
},
},
"volumes": []corev1.Volume{
{
Name: "host",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/",
},
},
},
},
},
},
},
},
}
// This creates an embedded "metadata" map[string]string{} in the unstructured object
// For an unknown reason, creating "metadata" directly in the object doesn't work
// and the helper functions must be used
j.SetKind("Job")
j.SetAPIVersion("batch/v1")
j.SetName(jobNameDataBackup)
j.SetNamespace(namespaceEtcds)
return j
}
func comparePodEnvToIp(log *logrus.Entry, pods *corev1.PodList) (*degradedEtcd, error) {
degradedEtcds := []degradedEtcd{}
for _, p := range pods.Items {
envIP := ipFromEnv(p.Spec.Containers, p.Name)
for _, podIP := range p.Status.PodIPs {
if podIP.IP != envIP && envIP != "" {
log.Infof("Found conflicting IPs for etcd Pod %s: Pod IP: %s != ENV IP %s", p.Name, podIP.IP, envIP)
degradedEtcds = append(degradedEtcds, degradedEtcd{
Node: strings.ReplaceAll(p.Name, "etcd-", ""),
Pod: p.Name,
NewIP: podIP.IP,
OldIP: envIP,
})
break
}
}
}
// Check for multiple etcd pods with IP address conflicts
var de *degradedEtcd
if len(degradedEtcds) > 1 {
return nil, fmt.Errorf("found multiple etcd pods with conflicting IP addresses, only one degraded etcd is supported, unable to recover. Conflicting IPs found: %v", degradedEtcds)
// happens if the env variables are empty, check statuses next
} else if len(degradedEtcds) == 0 {
de = &degradedEtcd{}
} else {
// array is no longer needed
de = &degradedEtcds[0]
}
return de, nil
}
// comparePodEnvToIp compares the etcd container's environment variables to the pod's actual IP address
func findDegradedEtcd(log *logrus.Entry, pods *corev1.PodList) (*degradedEtcd, error) {
de, err := comparePodEnvToIp(log, pods)
if err != nil {
return &degradedEtcd{}, err
}
crashingPodSearchDe, err := findCrashloopingPods(log, pods)
log.Infof("Found degraded etcd while searching by Pod statuses: %v", crashingPodSearchDe)
if err != nil {
return &degradedEtcd{}, err
}
// Sanity check
// Since we are checking for both an etcd Pod with an IP mis match, and the statuses of all etcd pods, let's make sure the Pod's returned by both are the same
if de.Pod != crashingPodSearchDe.Pod && de.Pod != "" {
return de, fmt.Errorf("etcd Pod found in crashlooping state %s is not equal to etcd Pod with IP ENV mis match %s... failed sanity check", de.Pod, crashingPodSearchDe.Pod)
}
// If no conflict is found a recent IP change may still be causing an issue
// Sometimes etcd can recovery the deployment itself, however there is still a data directory with the previous member's IP address present causing a failure
// This can still be remediated by relying on the pod statuses
if de.Node == "" {
log.Info("Unable to find an IP address conflict, using etcd Pod found during search by statuses")
return crashingPodSearchDe, nil
}
return de, nil
}
func ipFromEnv(containers []corev1.Container, podName string) string {
for _, c := range containers {
if c.Name == "etcd" {
for _, e := range c.Env {
// The environment variable that contains etcd's IP address has the following naming convention
// NODE_cluster_name_infra_ID_master_0_IP
// while the pod looks like this
// etcd-cluster-name-infra-id-master-0
// To find the pod's IP address by variable name we use the pod's name
envName := strings.ReplaceAll(strings.ReplaceAll(podName, "-", "_"), "etcd_", "NODE_")
if e.Name == fmt.Sprintf("%s_IP", envName) {
return e.Value
}
}
}
}
return ""
}
func findCrashloopingPods(log *logrus.Entry, pods *corev1.PodList) (*degradedEtcd, error) {
// pods are collected in a list to check for multiple crashing etcd instances
// multiple etcd failures aren't supported so an error will be returned, rather than assuming the first found is the only one
crashingPods := &corev1.PodList{}
for _, p := range pods.Items {
for _, c := range p.Status.ContainerStatuses {
if !c.Ready && c.Name == "etcd" {
log.Infof("Found etcd container with status: %v", c)
crashingPods.Items = append(crashingPods.Items, p)
}
}
}
if len(crashingPods.Items) > 1 {
// log multiple names in a readable way
names := []string{}
for _, c := range crashingPods.Items {
names = append(names, c.Name)
}
return nil, fmt.Errorf("only a single degraded etcd pod can can be recovered from, more than one NotReady etcd pods were found: %v", names)
} else if len(crashingPods.Items) == 0 {
return nil, errors.New("no etcd pod's were found in a CrashLoopBackOff state, unable to remediate etcd deployment")
}
crashingPod := &crashingPods.Items[0]
return &degradedEtcd{
Node: strings.ReplaceAll(crashingPod.Name, "etcd-", ""),
Pod: crashingPod.Name,
OldIP: "unknown",
NewIP: "unknown",
}, nil
}

Просмотреть файл

@ -0,0 +1,751 @@
package frontend
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"bytes"
"context"
"errors"
"strings"
"testing"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/mock/gomock"
operatorv1fake "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1/fake"
"github.com/ugorji/go/codec"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
ktesting "k8s.io/client-go/testing"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/metrics/noop"
mock_adminactions "github.com/Azure/ARO-RP/pkg/util/mocks/adminactions"
testdatabase "github.com/Azure/ARO-RP/test/database"
)
const degradedNode = "master-2"
func TestFixEtcd(t *testing.T) {
// Context leak is intentional to make use of cancel function, and make it to our error check
ctx, ctxCancel := context.WithCancel(context.Background())
const (
mockSubID = "00000000-0000-0000-0000-000000000000"
mockTenantID = mockSubID
)
resourceID := testdatabase.GetResourcePath(mockSubID, "cluster")
doc := &api.OpenShiftClusterDocument{
Key: strings.ToLower(resourceID),
OpenShiftCluster: &api.OpenShiftCluster{
Name: "cluster",
ID: resourceID,
Type: "Microsoft.RedHatOpenShift/openshiftClusters",
Properties: api.OpenShiftClusterProperties{
InfraID: "zfsbk",
},
},
}
type test struct {
name string
mocks func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc)
wantErr string
pods *corev1.PodList
ctxCancel context.CancelFunc
cancel bool
}
for _, tt := range []*test{
{
name: "fail: list pods",
wantErr: "500: InternalServerError: : oh no, can't list pods",
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(nil, errors.New("oh no, can't list pods"))
},
},
{
name: "fail: invalid json, can't decode pods",
wantErr: "500: InternalServerError: : failed to decode pods, json decode error [pos 1]: only encoded map or array can decode into struct",
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(`{`)
if err != nil {
t.Fatalf("failed to encode pods, %s", err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
},
},
{
name: "pass: Expected degraded etcd scenario",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).Times(1).Return([]byte("Backup job doing backup things..."), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).Times(1).Return(nil)
// fixPeers
// createPrivilegedServiceAccount
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
clusterRole := newClusterRole(kubeServiceAccount, doc.OpenShiftCluster.Name)
crb := newClusterRoleBinding(serviceAccountName, doc.OpenShiftCluster.Name)
scc := newSecurityContextConstraint(serviceAccountName, doc.OpenShiftCluster.Name, kubeServiceAccount)
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).Times(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, clusterRole).Times(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, crb).Times(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, scc).Times(1).Return(nil)
de, err := findDegradedEtcd(ti.log, pods)
if err != nil {
t.Fatal(err)
}
peerPods, err := getPeerPods(pods.Items, de, doc.OpenShiftCluster.Name)
if err != nil {
t.Fatal(err)
}
jobFixPeers := newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)
k.EXPECT().KubeCreateOrUpdate(ctx, jobFixPeers).Times(1).Return(nil)
expectWatchEvent(gomock.Any(), jobFixPeers, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobFixPeers.GetNamespace(), jobFixPeers.GetName(), jobFixPeers.GetName()).Times(1).Return([]byte("Fix peer job fixing peers..."), nil)
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobFixPeers.GetName(), true, &propPolicy).Times(1).Return(nil)
// cleanup
k.EXPECT().KubeDelete(ctx, serviceAcc.GetKind(), serviceAcc.GetNamespace(), serviceAcc.GetName(), true, nil).Times(1).Return(nil)
k.EXPECT().KubeDelete(ctx, scc.GetKind(), scc.GetNamespace(), scc.GetName(), true, nil).Times(1).Return(nil)
k.EXPECT().KubeDelete(ctx, clusterRole.GetKind(), clusterRole.GetNamespace(), clusterRole.GetName(), true, nil).Times(1).Return(nil)
k.EXPECT().KubeDelete(ctx, crb.GetKind(), crb.GetNamespace(), crb.GetName(), true, nil).Times(1).Return(nil)
err = codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(&operatorv1fake.FakeEtcds{})
if err != nil {
t.Fatal(err)
}
k.EXPECT().KubeGet(ctx, "Etcd", "", doc.OpenShiftCluster.Name).MaxTimes(1).Return(buf.Bytes(), nil)
// delete secrets
for _, prefix := range []string{"etcd-peer-", "etcd-serving-", "etcd-serving-metrics-"} {
k.EXPECT().KubeDelete(ctx, "Secret", namespaceEtcds, prefix+buildNodeName(doc, degradedNode), false, nil)
}
},
},
{
name: "pass: Empty env vars scenario",
pods: newEtcdPods(t, doc, false, false, true),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("Backup job doing backup things..."), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
// fixPeers
// createPrivilegedServiceAccount
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
clusterRole := newClusterRole(kubeServiceAccount, doc.OpenShiftCluster.Name)
crb := newClusterRoleBinding(serviceAccountName, doc.OpenShiftCluster.Name)
scc := newSecurityContextConstraint(serviceAccountName, doc.OpenShiftCluster.Name, kubeServiceAccount)
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, clusterRole).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, crb).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, scc).MaxTimes(1).Return(nil)
de, err := findDegradedEtcd(ti.log, pods)
if err != nil {
t.Fatal(err)
}
peerPods, err := getPeerPods(pods.Items, de, doc.OpenShiftCluster.Name)
if err != nil {
t.Fatal(err)
}
jobFixPeers := newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)
k.EXPECT().KubeCreateOrUpdate(ctx, jobFixPeers).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobFixPeers, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobFixPeers.GetNamespace(), jobFixPeers.GetName(), jobFixPeers.GetName()).MaxTimes(1).Return([]byte("Fix peer job fixing peers..."), nil)
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobFixPeers.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
// cleanup
k.EXPECT().KubeDelete(ctx, serviceAcc.GetKind(), serviceAcc.GetNamespace(), serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, scc.GetKind(), scc.GetNamespace(), scc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, clusterRole.GetKind(), clusterRole.GetNamespace(), clusterRole.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, crb.GetKind(), crb.GetNamespace(), crb.GetName(), true, nil).MaxTimes(1).Return(nil)
err = codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(&operatorv1fake.FakeEtcds{})
if err != nil {
t.Fatal(err)
}
k.EXPECT().KubeGet(ctx, "Etcd", "", doc.OpenShiftCluster.Name).MaxTimes(1).Return(buf.Bytes(), nil)
// delete secrets
for _, prefix := range []string{"etcd-peer-", "etcd-serving-", "etcd-serving-metrics-"} {
k.EXPECT().KubeDelete(ctx, "Secret", namespaceEtcds, prefix+buildNodeName(doc, degradedNode), false, nil)
}
},
},
{
name: "fail: Multiple degraded etcd instances scenario",
wantErr: "500: InternalServerError: : only a single degraded etcd pod can can be recovered from, more than one NotReady etcd pods were found: [etcd-cluster-zfsbk-master-0 etcd-cluster-zfsbk-master-1 etcd-cluster-zfsbk-master-2]",
pods: newEtcdPods(t, doc, false, true, true),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
},
},
{
name: "fail: empty/correct pod env and no bad container statuses",
wantErr: "500: InternalServerError: : no etcd pod's were found in a CrashLoopBackOff state, unable to remediate etcd deployment",
pods: newEtcdPods(t, doc, true, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
},
},
{
name: "fail: create job data backup",
wantErr: "500: InternalServerError: : oh no, can't create job data backup",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(errors.New("oh no, can't create job data backup"))
},
},
{
name: "fail: create job fix peers",
wantErr: "500: InternalServerError: : oh no, can't create job fix peers",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("Backup job doing backup things..."), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
// fixPeers
// createPrivilegedServiceAccount
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
clusterRole := newClusterRole(kubeServiceAccount, doc.OpenShiftCluster.Name)
crb := newClusterRoleBinding(serviceAccountName, doc.OpenShiftCluster.Name)
scc := newSecurityContextConstraint(serviceAccountName, doc.OpenShiftCluster.Name, kubeServiceAccount)
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, clusterRole).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, crb).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, scc).MaxTimes(1).Return(nil)
de, err := findDegradedEtcd(ti.log, pods)
if err != nil {
t.Fatal(err)
}
peerPods, err := getPeerPods(pods.Items, de, doc.OpenShiftCluster.Name)
if err != nil {
t.Fatal(err)
}
jobFixPeers := newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)
k.EXPECT().KubeCreateOrUpdate(ctx, jobFixPeers).MaxTimes(1).Return(errors.New("oh no, can't create job fix peers"))
expectWatchEvent(gomock.Any(), jobFixPeers, k, "app", corev1.PodSucceeded, false)()
// cleanup
k.EXPECT().KubeDelete(ctx, serviceAcc.GetKind(), serviceAcc.GetNamespace(), serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, scc.GetKind(), scc.GetNamespace(), scc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, clusterRole.GetKind(), clusterRole.GetNamespace(), clusterRole.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, crb.GetKind(), crb.GetNamespace(), crb.GetName(), true, nil).MaxTimes(1).Return(nil)
},
},
{
name: "fail: create service account",
wantErr: "500: InternalServerError: : oh no, can't create service account %!!(MISSING)s(<nil>)",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("Backup job doing backup things..."), nil)
// fixPeers
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
// k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(errors.New(tt.wantErr))
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(errors.New("oh no, can't create service account"))
// nested cleanup
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "ServiceAccount", namespaceEtcds, serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "SecurityContextConstraints", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRole", "", "system:serviceaccountopenshift-etcd:etcd-recovery-privileged", true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRoleBinding", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
},
},
{
name: "fail: create cluster role",
wantErr: "500: InternalServerError: : oh no, can't create job fix peers %!!(MISSING)s(<nil>)",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("Backup job doing backup things..."), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
// fixPeers
// createPrivilegedServiceAccount
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
clusterRole := newClusterRole(kubeServiceAccount, doc.OpenShiftCluster.Name)
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, clusterRole).MaxTimes(1).Return(errors.New("oh no, can't create job fix peers"))
k.EXPECT().KubeDelete(ctx, "ServiceAccount", namespaceEtcds, serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "SecurityContextConstraints", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRole", "", "system:serviceaccountopenshift-etcd:etcd-recovery-privileged", true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRoleBinding", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
},
},
{
name: "fail: create cluster role binding",
wantErr: "500: InternalServerError: : oh no, can't create cluster role binding %!!(MISSING)s(<nil>)",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("Backup job doing backup things..."), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
// fixPeers
// createPrivilegedServiceAccount
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
clusterRole := newClusterRole(kubeServiceAccount, doc.OpenShiftCluster.Name)
crb := newClusterRoleBinding(serviceAccountName, doc.OpenShiftCluster.Name)
// cleanup
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, clusterRole).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, crb).MaxTimes(1).Return(errors.New("oh no, can't create cluster role binding"))
k.EXPECT().KubeDelete(ctx, "ServiceAccount", namespaceEtcds, serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "SecurityContextConstraints", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRole", "", "system:serviceaccountopenshift-etcd:etcd-recovery-privileged", true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRoleBinding", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
},
},
{
name: "fail: create security context constraint",
wantErr: "500: InternalServerError: : oh no, can't create security context constraint %!!(MISSING)s(<nil>)",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodSucceeded, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("Backup job doing backup things..."), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
// fixPeers
// createPrivilegedServiceAccount
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
clusterRole := newClusterRole(kubeServiceAccount, doc.OpenShiftCluster.Name)
crb := newClusterRoleBinding(serviceAccountName, doc.OpenShiftCluster.Name)
scc := newSecurityContextConstraint(serviceAccountName, doc.OpenShiftCluster.Name, kubeServiceAccount)
k.EXPECT().KubeCreateOrUpdate(ctx, serviceAcc).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, clusterRole).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, crb).MaxTimes(1).Return(nil)
k.EXPECT().KubeCreateOrUpdate(ctx, scc).MaxTimes(1).Return(errors.New("oh no, can't create security context constraint"))
// cleanup
k.EXPECT().KubeDelete(ctx, "ServiceAccount", namespaceEtcds, serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "SecurityContextConstraints", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRole", "", "system:serviceaccountopenshift-etcd:etcd-recovery-privileged", true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "ClusterRoleBinding", "", serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
},
},
{
name: "fail: Backup job Pod failed",
wantErr: "500: InternalServerError: : pod etcd-recovery-data-backup event Failed received with message Pod Failed for reasons XYZ...",
pods: newEtcdPods(t, doc, false, false, false),
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil).MaxTimes(1)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodFailed, false)()
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte("oh no, Pod is in a failed state"), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
},
},
{
name: "fail: Context cancelled",
wantErr: "500: InternalServerError: : context was cancelled while waiting for etcd-recovery-data-backup because context canceled",
pods: newEtcdPods(t, doc, false, false, false),
cancel: true,
ctxCancel: ctxCancel,
mocks: func(tt *test, t *testing.T, ti *testInfra, k *mock_adminactions.MockKubeActions, pods *corev1.PodList, ctxCancel context.CancelFunc) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(pods)
if err != nil {
t.Fatalf("%s failed to encode pods, %s", t.Name(), err.Error())
}
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)
// backupEtcd
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil).MaxTimes(1)
expectWatchEvent(gomock.Any(), jobBackupEtcd, k, "app", corev1.PodPending, true)
if tt.cancel {
tt.ctxCancel()
}
k.EXPECT().KubeGetPodLogs(ctx, jobBackupEtcd.GetNamespace(), jobBackupEtcd.GetName(), jobBackupEtcd.GetName()).MaxTimes(1).Return([]byte(tt.wantErr), nil)
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobBackupEtcd.GetName(), true, &propPolicy).MaxTimes(1).Return(nil)
},
},
} {
t.Run(tt.name, func(t *testing.T) {
ti := newTestInfra(t).WithOpenShiftClusters().WithSubscriptions()
defer ti.done()
k := mock_adminactions.NewMockKubeActions(ti.controller)
tt.mocks(tt, t, ti, k, tt.pods, ctxCancel)
ti.fixture.AddOpenShiftClusterDocuments(doc)
ti.fixture.AddSubscriptionDocuments(&api.SubscriptionDocument{
ID: mockSubID,
Subscription: &api.Subscription{
State: api.SubscriptionStateRegistered,
Properties: &api.SubscriptionProperties{
TenantID: mockTenantID,
},
},
})
f, err := NewFrontend(ctx,
ti.audit,
ti.log,
ti.env,
ti.asyncOperationsDatabase,
ti.clusterManagerDatabase,
ti.openShiftClustersDatabase,
ti.subscriptionsDatabase,
nil,
api.APIs,
&noop.Noop{},
nil,
nil,
nil,
nil,
ti.enricher)
if err != nil {
t.Fatal(err)
}
containerLogs, err := f.fixEtcd(ctx, ti.log, ti.env, doc, k, &operatorv1fake.FakeEtcds{
Fake: &operatorv1fake.FakeOperatorV1{
Fake: &ktesting.Fake{},
},
})
ti.log.Infof("Container logs: \n%s", containerLogs)
if err != nil && err.Error() != tt.wantErr ||
err == nil && tt.wantErr != "" {
t.Errorf("\n%s\n !=\n%s", err.Error(), tt.wantErr)
}
})
}
}
func expectWatchEvent(ctx gomock.Matcher, o *unstructured.Unstructured, k *mock_adminactions.MockKubeActions, labelKey string, podPhase corev1.PodPhase, noUpdates bool) func() {
message := ""
switch podPhase {
case corev1.PodSucceeded:
message = "Pod succeeded Successfully"
case corev1.PodFailed:
message = "Pod Failed for reasons XYZ..."
case corev1.PodPending:
message = "Pod is pending..."
case corev1.PodUnknown:
message = "Pod status is unknown..."
}
w := watch.NewFake()
k.EXPECT().KubeWatch(ctx, o, labelKey).MaxTimes(1).Return(watch.Interface(w), nil)
return func() {
go func() {
w.Add(&corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: o.GetName(),
Namespace: o.GetNamespace(),
},
Status: corev1.PodStatus{
Phase: podPhase,
Message: message,
},
})
w.Reset()
}()
}
}
func buildClusterName(doc *api.OpenShiftClusterDocument) string {
return doc.OpenShiftCluster.Name + "-" + doc.OpenShiftCluster.Properties.InfraID
}
func buildNodeName(doc *api.OpenShiftClusterDocument, node string) string {
c := buildClusterName(doc)
return c + "-" + node
}
func newEtcdPods(t *testing.T, doc *api.OpenShiftClusterDocument, healthy, multiDegraded, emptyEnv bool) *corev1.PodList {
var (
degradedNodeMaster2 = buildNodeName(doc, degradedNode)
nodeMaster0 = buildNodeName(doc, "master-0")
nodeMaster1 = buildNodeName(doc, "master-1")
)
const (
master0IP = "10.0.0.1"
master1IP = "10.0.0.2"
master2IP = "10.0.0.3"
master2ChangedIP = "10.0.0.9"
)
if healthy && multiDegraded {
t.Fatalf("TEST %s: healthy (value %t) and multiDegraded (value %t) cannot both be true, failed sanity check", t.Name(), healthy, multiDegraded)
}
// Used to test scenario when etcd's env vars are empty, or there is no conflict found
// then statuses will be tests
envs := []corev1.EnvVar{
{
Name: "NODE_" + doc.OpenShiftCluster.Name + "_" + doc.OpenShiftCluster.Properties.InfraID + "_master_0_IP",
Value: master0IP,
},
{
Name: "NODE_ " + doc.OpenShiftCluster.Name + "_" + doc.OpenShiftCluster.Properties.InfraID + "_master_1_IP",
Value: master1IP,
},
{
Name: "NODE_" + doc.OpenShiftCluster.Name + "_" + doc.OpenShiftCluster.Properties.InfraID + "_master_2_IP",
Value: master2IP,
},
}
if emptyEnv {
envs = []corev1.EnvVar{}
}
containerID := "quay://etcd-container-id"
badStatus := []corev1.ContainerStatus{
{
Name: "etcd",
Ready: false,
Started: to.BoolPtr(false),
RestartCount: 50,
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{
Reason: "Container is in a crashloop backoff",
Message: "Container crashloop backoff",
},
},
ContainerID: containerID,
},
}
statuses := []corev1.ContainerStatus{
{
State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}},
ContainerID: containerID,
},
}
if multiDegraded {
statuses = badStatus
}
pods := &corev1.PodList{
TypeMeta: metav1.TypeMeta{
Kind: "Etcd",
},
Items: []corev1.Pod{
// healthy pod
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "etcd-" + nodeMaster0,
Namespace: namespaceEtcds,
},
Status: corev1.PodStatus{
ContainerStatuses: statuses,
PodIPs: []corev1.PodIP{
{
IP: master0IP,
},
},
},
Spec: corev1.PodSpec{
NodeName: nodeMaster0,
Containers: []corev1.Container{
{
Name: "etcd",
Env: envs,
},
},
},
},
// healthy pod
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "etcd-" + nodeMaster1,
Namespace: namespaceEtcds,
},
Status: corev1.PodStatus{
ContainerStatuses: statuses,
PodIPs: []corev1.PodIP{
{
IP: master1IP,
},
},
},
Spec: corev1.PodSpec{
NodeName: nodeMaster1,
Containers: []corev1.Container{
{
Name: "etcd",
Env: envs,
},
},
},
},
// degraded pod
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "etcd-" + degradedNodeMaster2,
Namespace: namespaceEtcds,
},
Status: corev1.PodStatus{
ContainerStatuses: badStatus,
PodIPs: []corev1.PodIP{
{
IP: master2ChangedIP,
},
},
},
Spec: corev1.PodSpec{
NodeName: degradedNodeMaster2,
Containers: []corev1.Container{
{
Name: "etcd",
Env: envs,
},
},
},
},
},
}
if healthy {
pods.Items[len(pods.Items)-1].Status.ContainerStatuses = statuses
pods.Items[len(pods.Items)-1].Status.PodIPs = []corev1.PodIP{
{
IP: master2IP,
},
}
}
return pods
}

Просмотреть файл

@ -294,6 +294,11 @@ func (f *frontend) chiAuthenticatedRoutes(router chi.Router) {
})
r.Get("/supportedvmsizes", f.supportedvmsizes)
r.Route("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/{resourceProviderNamespace}/{resourceType}/{resourceName}/etcdrecovery",
func(r chi.Router) {
r.Post("/", f.postAdminOpenShiftClusterEtcdRecovery)
})
r.Route("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/{resourceProviderNamespace}/{resourceType}/{resourceName}/kubernetesobjects",
func(r chi.Router) {
r.Get("/", f.getAdminKubernetesObjects)

9
pkg/frontend/scripts.go Normal file
Просмотреть файл

@ -0,0 +1,9 @@
package frontend
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import _ "embed"
//go:embed scripts/backupandfixetcd.sh
var backupOrFixEtcd string

Просмотреть файл

@ -0,0 +1,72 @@
#!/bin/bash
#
# See for more information: https://docs.openshift.com/container-platform/4.10/backup_and_restore/control_plane_backup_and_restore/replacing-unhealthy-etcd-member.html
remove_peer_members() {
echo "${PEER_PODS}"
for p in ${PEER_PODS}; do
echo "Attempting to get ID for pod/${p}"
members="$(oc rsh -n openshift-etcd -c etcdctl "pod/${p}" etcdctl member list -w json --hex true)"
id="$(jq -r --arg node "$DEGRADED_NODE" '.members[] | select( .name == $node).ID' <<< "$members")"
echo "id: ${id:-Not Found}"
if [[ -n $id ]]; then
echo "rshing into pod/${p} now to remove member id $id"
oc rsh \
-n openshift-etcd \
-c etcdctl \
"pod/${p}" etcdctl member remove "$id"
else
echo "${DEGRADED_NODE} id not found in etcd member list for pod ${p}"
fi
done
}
# jq expects it's required shared libraries to be present in /usr/lib64, not /host/usr/lib64.
# Because we are using jq mount under /host and haven't installed jq, those libraries exist under /host/usr/lib64 rather than /usr/lib64.
# Creating the symbolic links allows jq to resolve it's libraries without the need for installing.
create_sym_links() {
jq_lib1="/usr/lib64/libjq.so.1"
jq_lib2="/usr/lib64/libonig.so.5"
if [[ ! -f $jq_lib1 ]]; then
ln -s "/host${jq_lib1}" "$jq_lib1"
fi
if [[ ! -f $jq_lib2 ]]; then
ln -s "/host${jq_lib2}" "$jq_lib2"
fi
}
backup_etcd() {
local bdir etcd_yaml etcd_dir
bdir=/var/lib/etcd-backup
etcd_yaml=/etc/kubernetes/manifests/etcd-pod.yaml
etcd_dir=/var/lib/etcd
if [[ -d $etcd_dir ]] && [[ -f $etcd_yaml ]]; then
echo "Creating $bdir"
mkdir -p "$bdir" || abort "failed to make backup directory"
echo "Moving $etcd_yaml to $bdir"
mv "$etcd_yaml" "$bdir" || abort "failed to move $etcd_yaml to $bdir"
echo "Moving $etcd_dir to /host/tmp"
mv "$etcd_dir" /tmp || abort "failed to move $etcd_dir to /tmp"
else
echo "$etcd_dir doesn't exist or $etcd_yaml has already been moved"
echo "Not taking host etcd backup"
fi
}
abort() {
echo "${1}, Aborting."
exit 1
}
if [[ -n $FIX_PEERS ]]; then
PATH+="${PATH}:/host/usr/bin"
create_sym_links
echo "Starting peer etcd member removal"
remove_peer_members
elif [[ -n $BACKUP ]]; then
echo "Starting etcd data backup"
backup_etcd
else
abort "BACKUP and FIX_PEERS are unset, no actions taken."
fi

Просмотреть файл

@ -14,8 +14,10 @@ import (
features "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2019-07-01/features"
gomock "github.com/golang/mock/gomock"
logrus "github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
schema "k8s.io/apimachinery/pkg/runtime/schema"
watch "k8s.io/apimachinery/pkg/watch"
)
// MockKubeActions is a mock of KubeActions interface.
@ -112,17 +114,17 @@ func (mr *MockKubeActionsMockRecorder) KubeCreateOrUpdate(arg0, arg1 interface{}
}
// KubeDelete mocks base method.
func (m *MockKubeActions) KubeDelete(arg0 context.Context, arg1, arg2, arg3 string, arg4 bool) error {
func (m *MockKubeActions) KubeDelete(arg0 context.Context, arg1, arg2, arg3 string, arg4 bool, arg5 *v1.DeletionPropagation) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "KubeDelete", arg0, arg1, arg2, arg3, arg4)
ret := m.ctrl.Call(m, "KubeDelete", arg0, arg1, arg2, arg3, arg4, arg5)
ret0, _ := ret[0].(error)
return ret0
}
// KubeDelete indicates an expected call of KubeDelete.
func (mr *MockKubeActionsMockRecorder) KubeDelete(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
func (mr *MockKubeActionsMockRecorder) KubeDelete(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KubeDelete", reflect.TypeOf((*MockKubeActions)(nil).KubeDelete), arg0, arg1, arg2, arg3, arg4)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KubeDelete", reflect.TypeOf((*MockKubeActions)(nil).KubeDelete), arg0, arg1, arg2, arg3, arg4, arg5)
}
// KubeGet mocks base method.
@ -170,6 +172,21 @@ func (mr *MockKubeActionsMockRecorder) KubeList(arg0, arg1, arg2 interface{}) *g
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KubeList", reflect.TypeOf((*MockKubeActions)(nil).KubeList), arg0, arg1, arg2)
}
// KubeWatch mocks base method.
func (m *MockKubeActions) KubeWatch(arg0 context.Context, arg1 *unstructured.Unstructured, arg2 string) (watch.Interface, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "KubeWatch", arg0, arg1, arg2)
ret0, _ := ret[0].(watch.Interface)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// KubeWatch indicates an expected call of KubeWatch.
func (mr *MockKubeActionsMockRecorder) KubeWatch(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KubeWatch", reflect.TypeOf((*MockKubeActions)(nil).KubeWatch), arg0, arg1, arg2)
}
// ResolveGVR mocks base method.
func (m *MockKubeActions) ResolveGVR(arg0 string) (*schema.GroupVersionResource, error) {
m.ctrl.T.Helper()