setup backup workflow
This commit is contained in:
Родитель
021dff6fe3
Коммит
d5abd2c1b4
|
@ -1,13 +1,28 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"flag"
|
||||
|
||||
"github.com/coreos/kube-etcd-controller/pkg/backup"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("ok...")
|
||||
for {
|
||||
time.Sleep(1 * time.Minute)
|
||||
}
|
||||
var (
|
||||
masterHost string
|
||||
clusterName string
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&cfg.MasterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
|
||||
flag.StringVar(&clusterName, "etcd-cluster", "", "")
|
||||
// TODO: parse policy
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
func main() {
|
||||
if len(clusterName) == 0 {
|
||||
panic("clusterName not set")
|
||||
}
|
||||
kclient := k8sutil.MustCreateClient(masterHost, false, nil)
|
||||
backup.New(kclient, clusterName, backup.Policy{}).Run()
|
||||
}
|
||||
|
|
|
@ -4,3 +4,6 @@ metadata:
|
|||
name: "etcd-cluster"
|
||||
spec:
|
||||
size: 3
|
||||
backup:
|
||||
maxSnapshot: 5
|
||||
volumeSizeInMB: 512
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
pushd "./cmd/backup"
|
||||
go build .
|
||||
docker build --tag gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest .
|
||||
gcloud docker push gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest
|
||||
popd
|
|
@ -0,0 +1,63 @@
|
|||
package backup
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
type Backup struct {
|
||||
kclient *unversioned.Client
|
||||
clusterName string
|
||||
policy Policy
|
||||
}
|
||||
|
||||
func New(kclient *unversioned.Client, clusterName string, policy Policy) *Backup {
|
||||
return &Backup{
|
||||
kclient: kclient,
|
||||
clusterName: clusterName,
|
||||
policy: policy,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Backup) Run() {
|
||||
for {
|
||||
// TODO: add interval to backup policy
|
||||
time.Sleep(20 * time.Second)
|
||||
|
||||
pods, err := b.kclient.Pods("default").List(api.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
"app": "etcd",
|
||||
"etcd_cluster": b.clusterName,
|
||||
}),
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for i := range pods.Items {
|
||||
m := etcdutil.Member{Name: pods.Items[i].Name}
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: []string{m.ClientAddr()},
|
||||
DialTimeout: 5 * time.Second,
|
||||
}
|
||||
etcdcli, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
logrus.Errorf("clientv3.New failed: %v", err)
|
||||
continue
|
||||
}
|
||||
resp, err := etcdcli.Get(context.TODO(), "/", clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
logrus.Errorf("etcdcli.Get failed: %v", err)
|
||||
continue
|
||||
}
|
||||
logrus.Infof("member: %s, revision: %d", m.Name, resp.Header.Revision)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package backup
|
||||
|
||||
type Policy struct {
|
||||
// MaxSnapshot is the maximum number of snapshot files to retain. 0 is disable backup.
|
||||
// If backup is disabled, the etcd cluster cannot recover from a
|
||||
// disaster failure (lose more than half of its members at the same
|
||||
// time).
|
||||
MaxSnapshot int `json:"maxSnapshot"`
|
||||
// VolumeSizeInMB specifies the required volume size to perform backups.
|
||||
// Controller will claim the required size before creating the etcd cluster for backup
|
||||
// purpose.
|
||||
// If the snapshot size is larger than the size specified, backup fails.
|
||||
VolumeSizeInMB int `json:"volumeSizeInMB"`
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"github.com/coreos/kube-etcd-controller/pkg/backup"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
)
|
||||
|
@ -24,20 +25,7 @@ type Spec struct {
|
|||
// The controller will eventually make the etcd cluster version
|
||||
// equal to the expected version.
|
||||
Version string `json:"version"`
|
||||
// Backup is the backup strategy for the etcd cluster.
|
||||
// Backup is the backup policy for the etcd cluster.
|
||||
// There is no backup by default.
|
||||
Backup Backup `json:"backup"`
|
||||
}
|
||||
|
||||
type Backup struct {
|
||||
// MaxSnapshot is the maximum number of snapshot files to retain. 0 is disable backup.
|
||||
// If backup is disabled, the etcd cluster cannot recover from a
|
||||
// disaster failure (lose more than half of its members at the same
|
||||
// time).
|
||||
MaxSnapshot int
|
||||
// RequiredVolumeSizeInMB specifies the required volume size to perform backups.
|
||||
// Controller will claim the required size before creating the etcd cluster for backup
|
||||
// purpose.
|
||||
// If the snapshot size is larger than the size specified, backup fails.
|
||||
RequiredVolumeSizeInMB int
|
||||
Backup *backup.Policy `json:"backup,omitempty"`
|
||||
}
|
||||
|
|
|
@ -39,29 +39,18 @@ type Config struct {
|
|||
}
|
||||
|
||||
func New(cfg Config) *Controller {
|
||||
host, c := getClient(cfg)
|
||||
kclient := k8sutil.MustCreateClient(cfg.MasterHost, cfg.TLSInsecure, &cfg.TLSConfig)
|
||||
host := cfg.MasterHost
|
||||
if len(host) == 0 {
|
||||
host = k8sutil.MustGetInClusterMasterHost()
|
||||
}
|
||||
return &Controller{
|
||||
masterHost: host,
|
||||
kclient: c,
|
||||
kclient: kclient,
|
||||
clusters: make(map[string]*cluster.Cluster),
|
||||
}
|
||||
}
|
||||
|
||||
func getClient(cfg Config) (string, *unversioned.Client) {
|
||||
if len(cfg.MasterHost) == 0 {
|
||||
inCfg, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
client, err := unversioned.NewInCluster()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return inCfg.Host, client
|
||||
}
|
||||
return cfg.MasterHost, k8sutil.MustCreateClient(cfg.MasterHost, cfg.TLSInsecure, cfg.TLSConfig)
|
||||
}
|
||||
|
||||
func (c *Controller) Run() {
|
||||
watchVersion := "0"
|
||||
if err := c.createTPR(); err != nil {
|
||||
|
@ -88,6 +77,11 @@ func (c *Controller) Run() {
|
|||
// TODO: combine init into New. Different fresh new and recovered new.
|
||||
nc.Init(&event.Object.Spec)
|
||||
c.clusters[clusterName] = nc
|
||||
|
||||
backup := event.Object.Spec.Backup
|
||||
if backup != nil && backup.MaxSnapshot != 0 {
|
||||
k8sutil.CreateBackupReplicaSet(c.kclient, clusterName, *backup)
|
||||
}
|
||||
case "MODIFIED":
|
||||
c.clusters[clusterName].Update(&event.Object.Spec)
|
||||
case "DELETED":
|
||||
|
@ -114,6 +108,11 @@ func (c *Controller) findAllClusters() (string, error) {
|
|||
for _, item := range list.Items {
|
||||
nc := cluster.New(c.kclient, item.Name, &item.Spec)
|
||||
c.clusters[item.Name] = nc
|
||||
|
||||
backup := item.Spec.Backup
|
||||
if backup != nil && backup.MaxSnapshot != 0 {
|
||||
k8sutil.CreateBackupReplicaSet(c.kclient, item.Name, *backup)
|
||||
}
|
||||
}
|
||||
return list.ListMeta.ResourceVersion, nil
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/kube-etcd-controller/pkg/backup"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
|
||||
"github.com/pborman/uuid"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -20,7 +21,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func CreateBackupReplicaSet(kclient *unversioned.Client, clusterName string) {
|
||||
func CreateBackupReplicaSet(kclient *unversioned.Client, clusterName string, policy backup.Policy) {
|
||||
_, err := kclient.ReplicaSets("default").Create(&extensions.ReplicaSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-backup-tool", clusterName),
|
||||
|
@ -43,7 +44,12 @@ func CreateBackupReplicaSet(kclient *unversioned.Client, clusterName string) {
|
|||
Containers: []api.Container{
|
||||
{
|
||||
Name: "backup",
|
||||
Image: "gcr.io/coreos-k8s-scale-testing/test:latest",
|
||||
Image: "gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest",
|
||||
Command: []string{
|
||||
"backup",
|
||||
"--etcd-cluster",
|
||||
clusterName,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -186,7 +192,22 @@ func makeEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state
|
|||
return pod
|
||||
}
|
||||
|
||||
func MustCreateClient(host string, tlsInsecure bool, tlsConfig restclient.TLSClientConfig) *unversioned.Client {
|
||||
func MustGetInClusterMasterHost() string {
|
||||
cfg, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return cfg.Host
|
||||
}
|
||||
|
||||
func MustCreateClient(host string, tlsInsecure bool, tlsConfig *restclient.TLSClientConfig) *unversioned.Client {
|
||||
if len(host) == 0 {
|
||||
c, err := unversioned.NewInCluster()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
cfg := &restclient.Config{
|
||||
Host: host,
|
||||
QPS: 100,
|
||||
|
@ -197,7 +218,7 @@ func MustCreateClient(host string, tlsInsecure bool, tlsConfig restclient.TLSCli
|
|||
panic(fmt.Sprintf("error parsing host url %s : %v", host, err))
|
||||
}
|
||||
if hostUrl.Scheme == "https" {
|
||||
cfg.TLSClientConfig = tlsConfig
|
||||
cfg.TLSClientConfig = *tlsConfig
|
||||
cfg.Insecure = tlsInsecure
|
||||
}
|
||||
c, err := unversioned.New(cfg)
|
||||
|
|
Загрузка…
Ссылка в новой задаче