use persistent storage in backup pod

This commit is contained in:
Hongchao Deng 2016-09-28 16:38:02 -07:00
Родитель 24239300c1
Коммит bed78cc116
8 изменённых файлов: 167 добавлений и 60 удалений

Просмотреть файл

@ -20,6 +20,8 @@ import (
"k8s.io/kubernetes/pkg/labels"
)
const BackupDir = "/home/backup/"
type Backup struct {
kclient *unversioned.Client
@ -27,7 +29,6 @@ type Backup struct {
namespace string
policy Policy
listenAddr string
backupDir string
backupNow chan chan error
}
@ -39,7 +40,6 @@ func New(kclient *unversioned.Client, clusterName, ns string, policy Policy, lis
namespace: ns,
policy: policy,
listenAddr: listenAddr,
backupDir: "/home/backup/",
backupNow: make(chan chan error),
}
@ -47,7 +47,7 @@ func New(kclient *unversioned.Client, clusterName, ns string, policy Policy, lis
func (b *Backup) Run() {
// It will be no-op if backup dir existed.
if err := os.MkdirAll(b.backupDir, 0700); err != nil {
if err := os.MkdirAll(BackupDir, 0700); err != nil {
panic(err)
}
@ -106,7 +106,7 @@ func (b *Backup) saveSnap(lastSnapRev int64) (int64, error) {
}
log.Printf("saving backup for cluster (%s)", b.clusterName)
if err := writeSnap(member, b.backupDir, rev); err != nil {
if err := writeSnap(member, BackupDir, rev); err != nil {
err = fmt.Errorf("write snapshot failed: %v", err)
return lastSnapRev, err
}

Просмотреть файл

@ -42,9 +42,9 @@ func (b *Backup) serveBackupNow(w http.ResponseWriter, r *http.Request) {
}
func (b *Backup) serveSnap(w http.ResponseWriter, r *http.Request) {
files, err := ioutil.ReadDir(b.backupDir)
files, err := ioutil.ReadDir(BackupDir)
if err != nil {
logrus.Errorf("failed to list dir (%s): error (%v)", b.backupDir, err)
logrus.Errorf("failed to list dir (%s): error (%v)", BackupDir, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -54,7 +54,7 @@ func (b *Backup) serveSnap(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
return
}
http.ServeFile(w, r, path.Join(b.backupDir, fname))
http.ServeFile(w, r, path.Join(BackupDir, fname))
}
func getLatestSnapshotName(files []os.FileInfo) string {

Просмотреть файл

@ -6,6 +6,13 @@ var (
defaultSnapshotInterval = 1800 * time.Second
)
// TODO: supports object store like s3
type StorageType string
const (
StorageTypePersistentVolume = "PersistentVolume"
)
type Policy struct {
// SnapshotIntervalInSecond specifies the interval between two snapshots.
// The default interval is 1800 seconds.
@ -20,4 +27,6 @@ type Policy struct {
// purpose.
// If the snapshot size is larger than the size specified, backup fails.
VolumeSizeInMB int `json:"volumeSizeInMB"`
// StorageType specifies the type of storage device to store backup files.
StorageType StorageType `json:"storageType,omitempty"`
}

Просмотреть файл

@ -54,17 +54,9 @@ func New(cfg Config) *Controller {
}
func (c *Controller) Run() {
watchVersion := "0"
if err := c.createTPR(); err != nil {
switch {
case k8sutil.IsKubernetesResourceAlreadyExistError(err):
watchVersion, err = c.findAllClusters()
if err != nil {
panic(err)
}
default:
panic(err)
}
watchVersion, err := c.initResource()
if err != nil {
panic(err)
}
log.Println("etcd cluster controller starts running...")
@ -75,10 +67,11 @@ func (c *Controller) Run() {
clusterName := event.Object.ObjectMeta.Name
switch event.Type {
case "ADDED":
nc := cluster.New(c.kclient, clusterName, c.namespace, &event.Object.Spec)
clusterSpec := &event.Object.Spec
nc := cluster.New(c.kclient, clusterName, c.namespace, clusterSpec)
c.clusters[clusterName] = nc
backup := event.Object.Spec.Backup
backup := clusterSpec.Backup
if backup != nil && backup.MaxSnapshot != 0 {
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, clusterName, c.namespace, *backup)
if err != nil {
@ -123,6 +116,31 @@ func (c *Controller) findAllClusters() (string, error) {
return list.ListMeta.ResourceVersion, nil
}
func (c *Controller) initResource() (string, error) {
err := c.createTPR()
if err != nil {
switch {
// etcd controller has been initialized before. We don't need to
// repeat the init process but recover cluster.
case k8sutil.IsKubernetesResourceAlreadyExistError(err):
watchVersion, err := c.findAllClusters()
if err != nil {
return "", err
}
return watchVersion, nil
default:
log.Errorf("fail to create TPR: %v", err)
return "", err
}
}
err = k8sutil.CreateStorageClass(c.kclient)
if err != nil {
log.Errorf("fail to create storage class: %v", err)
return "", err
}
return "0", nil
}
func (c *Controller) createTPR() error {
tpr := &extensions.ThirdPartyResource{
ObjectMeta: k8sapi.ObjectMeta{

Просмотреть файл

@ -8,13 +8,16 @@ import (
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/coreos/kube-etcd-controller/pkg/backup"
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
unversionedAPI "k8s.io/kubernetes/pkg/api/unversioned"
k8sv1api "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/storage"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/intstr"
@ -27,6 +30,8 @@ const (
etcdDir = "/var/etcd"
dataDir = etcdDir + "/data"
backupFile = "/var/etcd/latest.backup"
storageClassName = "etcd-controller-backup"
)
func makeRestoreInitContainerSpec(backupAddr, name, token string) string {
@ -67,12 +72,17 @@ func makeRestoreInitContainerSpec(backupAddr, name, token string) string {
}
func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName, ns string, policy backup.Policy) error {
claim, err := CreateAndWaitPVC(kclient, clusterName, ns, policy.VolumeSizeInMB)
if err != nil {
return err
}
labels := map[string]string{
"app": "etcd_backup_tool",
"etcd_cluster": clusterName,
}
name := makeBackupName(clusterName)
_, err := kclient.ReplicaSets(ns).Create(&extensions.ReplicaSet{
_, err = kclient.ReplicaSets(ns).Create(&extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: name,
},
@ -93,14 +103,24 @@ func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName,
"--etcd-cluster",
clusterName,
},
Env: []api.EnvVar{
{
Name: "MY_POD_NAMESPACE",
ValueFrom: &api.EnvVarSource{FieldRef: &api.ObjectFieldSelector{FieldPath: "metadata.namespace"}},
},
},
Env: []api.EnvVar{{
Name: "MY_POD_NAMESPACE",
ValueFrom: &api.EnvVarSource{FieldRef: &api.ObjectFieldSelector{FieldPath: "metadata.namespace"}},
}},
VolumeMounts: []api.VolumeMount{{
Name: "etcd-backup-storage",
MountPath: backup.BackupDir,
}},
},
},
Volumes: []api.Volume{{
Name: "etcd-backup-storage",
VolumeSource: api.VolumeSource{
PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{
ClaimName: claim.Name,
},
},
}},
},
},
},
@ -132,6 +152,65 @@ func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName,
return nil
}
func CreateStorageClass(kubecli *unversioned.Client) error {
class := &storage.StorageClass{
ObjectMeta: api.ObjectMeta{
Name: storageClassName,
},
// TODO: add aws
Provisioner: "kubernetes.io/gce-pd",
}
_, err := kubecli.StorageClasses().Create(class)
return err
}
func CreateAndWaitPVC(kubecli *unversioned.Client, clusterName, ns string, volumeSizeInMB int) (*api.PersistentVolumeClaim, error) {
claim := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("pvc-%s", clusterName),
Labels: map[string]string{
"etcd_cluster": clusterName,
},
Annotations: map[string]string{
"volume.beta.kubernetes.io/storage-class": storageClassName,
},
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceStorage: resource.MustParse(fmt.Sprintf("%dMi", volumeSizeInMB)),
},
},
},
}
retClaim, err := kubecli.PersistentVolumeClaims(ns).Create(claim)
if err != nil {
return nil, err
}
err = wait.Poll(2*time.Second, 20*time.Second, func() (bool, error) {
claim, err := kubecli.PersistentVolumeClaims(ns).Get(retClaim.Name)
if err != nil {
return false, err
}
logrus.Infof("PV claim (%s) status.phase: %v", claim.Name, claim.Status.Phase)
if claim.Status.Phase != api.ClaimBound {
return false, nil
}
return true, nil
})
if err != nil {
// TODO: remove retClaim
logrus.Errorf("fail to poll PVC (%s): %v", retClaim.Name, err)
return nil, err
}
return retClaim, nil
}
func MakeBackupHostPort(clusterName string) string {
return fmt.Sprintf("%s:19999", makeBackupName(clusterName))
}
@ -158,6 +237,7 @@ func CreateAndWaitPod(kclient *unversioned.Client, pod *api.Pod, m *etcdutil.Mem
return err
}
_, err = watch.Until(100*time.Second, w, unversioned.PodRunningAndReady)
// TODO: cleanup pod on failure
return err
}

Просмотреть файл

@ -127,6 +127,7 @@ func TestDisasterRecovery(t *testing.T) {
SnapshotIntervalInSecond: 120,
MaxSnapshot: 5,
VolumeSizeInMB: 512,
StorageType: backup.StorageTypePersistentVolume,
}
testEtcd, err := createEtcdCluster(f, makeEtcdCluster("test-etcd-", 3, backupPolicy))
if err != nil {

Просмотреть файл

@ -1,6 +1,7 @@
package framework
import (
"flag"
"fmt"
"time"
@ -19,32 +20,49 @@ type Framework struct {
Namespace *api.Namespace
}
func New(kubeconfig string, baseName string) (*Framework, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
// Setup setups a test framework and points "Global" to it.
func Setup() error {
kubeconfig := flag.String("kubeconfig", "", "kube config path, e.g. $HOME/.kube/config")
ctrlImage := flag.String("controller-image", "", "controller image, e.g. gcr.io/coreos-k8s-scale-testing/kube-etcd-controller")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
return nil, err
return err
}
cli, err := unversioned.New(config)
if err != nil {
return nil, err
return err
}
namespace, err := cli.Namespaces().Create(&api.Namespace{
ObjectMeta: api.ObjectMeta{
GenerateName: fmt.Sprintf("e2e-test-%v-", baseName),
GenerateName: fmt.Sprintf("e2e-test-"),
},
})
if err != nil {
return nil, err
return err
}
return &Framework{
Global = &Framework{
MasterHost: config.Host,
KubeClient: cli,
Namespace: namespace,
}, nil
}
return Global.setup(*ctrlImage)
}
func (f *Framework) Setup(ctrlImage string) error {
func Teardown() error {
// TODO: delete TPR
if err := Global.KubeClient.Namespaces().Delete(Global.Namespace.Name); err != nil {
return err
}
// TODO: check all deleted and wait
Global = nil
logrus.Info("e2e teardown successfully")
return nil
}
func (f *Framework) setup(ctrlImage string) error {
if err := f.setupEtcdController(ctrlImage); err != nil {
logrus.Errorf("fail to setup etcd controller: %v", err)
return err
@ -53,15 +71,6 @@ func (f *Framework) Setup(ctrlImage string) error {
return nil
}
func (f *Framework) Teardown() error {
// TODO: delete TPR
if err := f.KubeClient.Namespaces().Delete(f.Namespace.Name); err != nil {
return err
}
logrus.Info("e2e teardown successfully")
return nil
}
func (f *Framework) setupEtcdController(ctrlImage string) error {
// TODO: unify this and the yaml file in example/
pod := &api.Pod{
@ -82,6 +91,7 @@ func (f *Framework) setupEtcdController(ctrlImage string) error {
},
},
},
RestartPolicy: api.RestartPolicyNever,
},
}

Просмотреть файл

@ -1,7 +1,6 @@
package e2e
import (
"flag"
"os"
"testing"
@ -10,25 +9,15 @@ import (
)
func TestMain(m *testing.M) {
kubeconfig := flag.String("kubeconfig", "", "kube config path, e.g. $HOME/.kube/config")
ctrlImage := flag.String("controller-image", "", "controller image, e.g. gcr.io/coreos-k8s-scale-testing/kube-etcd-controller")
flag.Parse()
f, err := framework.New(*kubeconfig, "top-level")
if err != nil {
logrus.Errorf("fail to create new framework: %v", err)
os.Exit(1)
}
if err := f.Setup(*ctrlImage); err != nil {
logrus.Errorf("fail to setup test environment: %v", err)
if err := framework.Setup(); err != nil {
logrus.Errorf("fail to setup framework: %v", err)
os.Exit(1)
}
framework.Global = f
code := m.Run()
if err := f.Teardown(); err != nil {
logrus.Errorf("fail to teardown test environment: %v", err)
if err := framework.Teardown(); err != nil {
logrus.Errorf("fail to teardown framework: %v", err)
os.Exit(1)
}
os.Exit(code)