diff --git a/cmd/backup/main.go b/cmd/backup/main.go index 5954027..430a91c 100644 --- a/cmd/backup/main.go +++ b/cmd/backup/main.go @@ -1,13 +1,28 @@ package main import ( - "fmt" - "time" + "flag" + + "github.com/coreos/kube-etcd-controller/pkg/backup" + "github.com/coreos/kube-etcd-controller/pkg/util/k8sutil" ) -func main() { - fmt.Println("ok...") - for { - time.Sleep(1 * time.Minute) - } +var ( + masterHost string + clusterName string +) + +func init() { + flag.StringVar(&cfg.MasterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.") + flag.StringVar(&clusterName, "etcd-cluster", "", "") + // TODO: parse policy + flag.Parse() +} + +func main() { + if len(clusterName) == 0 { + panic("clusterName not set") + } + kclient := k8sutil.MustCreateClient(masterHost, false, nil) + backup.New(kclient, clusterName, backup.Policy{}).Run() } diff --git a/example/example-etcd-cluster.yaml b/example/example-etcd-cluster.yaml index 3438da2..b460cd5 100644 --- a/example/example-etcd-cluster.yaml +++ b/example/example-etcd-cluster.yaml @@ -4,3 +4,6 @@ metadata: name: "etcd-cluster" spec: size: 3 + backup: + maxSnapshot: 5 + volumeSizeInMB: 512 diff --git a/hack/build_backup.sh b/hack/build_backup.sh new file mode 100755 index 0000000..b07b408 --- /dev/null +++ b/hack/build_backup.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +pushd "./cmd/backup" +go build . +docker build --tag gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest . +gcloud docker push gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest +popd diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go new file mode 100644 index 0000000..1351181 --- /dev/null +++ b/pkg/backup/backup.go @@ -0,0 +1,63 @@ +package backup + +import ( + "time" + + "golang.org/x/net/context" + + "github.com/Sirupsen/logrus" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/kube-etcd-controller/pkg/util/etcdutil" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/labels" +) + +type Backup struct { + kclient *unversioned.Client + clusterName string + policy Policy +} + +func New(kclient *unversioned.Client, clusterName string, policy Policy) *Backup { + return &Backup{ + kclient: kclient, + clusterName: clusterName, + policy: policy, + } +} + +func (b *Backup) Run() { + for { + // TODO: add interval to backup policy + time.Sleep(20 * time.Second) + + pods, err := b.kclient.Pods("default").List(api.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + "app": "etcd", + "etcd_cluster": b.clusterName, + }), + }) + if err != nil { + panic(err) + } + for i := range pods.Items { + m := etcdutil.Member{Name: pods.Items[i].Name} + cfg := clientv3.Config{ + Endpoints: []string{m.ClientAddr()}, + DialTimeout: 5 * time.Second, + } + etcdcli, err := clientv3.New(cfg) + if err != nil { + logrus.Errorf("clientv3.New failed: %v", err) + continue + } + resp, err := etcdcli.Get(context.TODO(), "/", clientv3.WithSerializable()) + if err != nil { + logrus.Errorf("etcdcli.Get failed: %v", err) + continue + } + logrus.Infof("member: %s, revision: %d", m.Name, resp.Header.Revision) + } + } +} diff --git a/pkg/backup/policy.go b/pkg/backup/policy.go new file mode 100644 index 0000000..7c0c510 --- /dev/null +++ b/pkg/backup/policy.go @@ -0,0 +1,14 @@ +package backup + +type Policy struct { + // MaxSnapshot is the maximum number of snapshot files to retain. 0 is disable backup. + // If backup is disabled, the etcd cluster cannot recover from a + // disaster failure (lose more than half of its members at the same + // time). + MaxSnapshot int `json:"maxSnapshot"` + // VolumeSizeInMB specifies the required volume size to perform backups. + // Controller will claim the required size before creating the etcd cluster for backup + // purpose. + // If the snapshot size is larger than the size specified, backup fails. + VolumeSizeInMB int `json:"volumeSizeInMB"` +} diff --git a/pkg/cluster/spec.go b/pkg/cluster/spec.go index a355901..10a93bb 100644 --- a/pkg/cluster/spec.go +++ b/pkg/cluster/spec.go @@ -1,6 +1,7 @@ package cluster import ( + "github.com/coreos/kube-etcd-controller/pkg/backup" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" ) @@ -24,20 +25,7 @@ type Spec struct { // The controller will eventually make the etcd cluster version // equal to the expected version. Version string `json:"version"` - // Backup is the backup strategy for the etcd cluster. + // Backup is the backup policy for the etcd cluster. // There is no backup by default. - Backup Backup `json:"backup"` -} - -type Backup struct { - // MaxSnapshot is the maximum number of snapshot files to retain. 0 is disable backup. - // If backup is disabled, the etcd cluster cannot recover from a - // disaster failure (lose more than half of its members at the same - // time). - MaxSnapshot int - // RequiredVolumeSizeInMB specifies the required volume size to perform backups. - // Controller will claim the required size before creating the etcd cluster for backup - // purpose. - // If the snapshot size is larger than the size specified, backup fails. - RequiredVolumeSizeInMB int + Backup *backup.Policy `json:"backup,omitempty"` } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b607afd..dd73327 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -39,29 +39,18 @@ type Config struct { } func New(cfg Config) *Controller { - host, c := getClient(cfg) + kclient := k8sutil.MustCreateClient(cfg.MasterHost, cfg.TLSInsecure, &cfg.TLSConfig) + host := cfg.MasterHost + if len(host) == 0 { + host = k8sutil.MustGetInClusterMasterHost() + } return &Controller{ masterHost: host, - kclient: c, + kclient: kclient, clusters: make(map[string]*cluster.Cluster), } } -func getClient(cfg Config) (string, *unversioned.Client) { - if len(cfg.MasterHost) == 0 { - inCfg, err := restclient.InClusterConfig() - if err != nil { - panic(err) - } - client, err := unversioned.NewInCluster() - if err != nil { - panic(err) - } - return inCfg.Host, client - } - return cfg.MasterHost, k8sutil.MustCreateClient(cfg.MasterHost, cfg.TLSInsecure, cfg.TLSConfig) -} - func (c *Controller) Run() { watchVersion := "0" if err := c.createTPR(); err != nil { @@ -88,6 +77,11 @@ func (c *Controller) Run() { // TODO: combine init into New. Different fresh new and recovered new. nc.Init(&event.Object.Spec) c.clusters[clusterName] = nc + + backup := event.Object.Spec.Backup + if backup != nil && backup.MaxSnapshot != 0 { + k8sutil.CreateBackupReplicaSet(c.kclient, clusterName, *backup) + } case "MODIFIED": c.clusters[clusterName].Update(&event.Object.Spec) case "DELETED": @@ -114,6 +108,11 @@ func (c *Controller) findAllClusters() (string, error) { for _, item := range list.Items { nc := cluster.New(c.kclient, item.Name, &item.Spec) c.clusters[item.Name] = nc + + backup := item.Spec.Backup + if backup != nil && backup.MaxSnapshot != 0 { + k8sutil.CreateBackupReplicaSet(c.kclient, item.Name, *backup) + } } return list.ListMeta.ResourceVersion, nil } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c1666a6..bc04df5 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/coreos/kube-etcd-controller/pkg/backup" "github.com/coreos/kube-etcd-controller/pkg/util/etcdutil" "github.com/pborman/uuid" "k8s.io/kubernetes/pkg/api" @@ -20,7 +21,7 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -func CreateBackupReplicaSet(kclient *unversioned.Client, clusterName string) { +func CreateBackupReplicaSet(kclient *unversioned.Client, clusterName string, policy backup.Policy) { _, err := kclient.ReplicaSets("default").Create(&extensions.ReplicaSet{ ObjectMeta: api.ObjectMeta{ Name: fmt.Sprintf("%s-backup-tool", clusterName), @@ -43,7 +44,12 @@ func CreateBackupReplicaSet(kclient *unversioned.Client, clusterName string) { Containers: []api.Container{ { Name: "backup", - Image: "gcr.io/coreos-k8s-scale-testing/test:latest", + Image: "gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest", + Command: []string{ + "backup", + "--etcd-cluster", + clusterName, + }, }, }, }, @@ -186,7 +192,22 @@ func makeEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state return pod } -func MustCreateClient(host string, tlsInsecure bool, tlsConfig restclient.TLSClientConfig) *unversioned.Client { +func MustGetInClusterMasterHost() string { + cfg, err := restclient.InClusterConfig() + if err != nil { + panic(err) + } + return cfg.Host +} + +func MustCreateClient(host string, tlsInsecure bool, tlsConfig *restclient.TLSClientConfig) *unversioned.Client { + if len(host) == 0 { + c, err := unversioned.NewInCluster() + if err != nil { + panic(err) + } + return c + } cfg := &restclient.Config{ Host: host, QPS: 100, @@ -197,7 +218,7 @@ func MustCreateClient(host string, tlsInsecure bool, tlsConfig restclient.TLSCli panic(fmt.Sprintf("error parsing host url %s : %v", host, err)) } if hostUrl.Scheme == "https" { - cfg.TLSClientConfig = tlsConfig + cfg.TLSClientConfig = *tlsConfig cfg.Insecure = tlsInsecure } c, err := unversioned.New(cfg)