save snapshot of max revision periodically

This commit is contained in:
Hongchao Deng 2016-08-30 14:49:04 -07:00
Родитель 66fa2e8a8e
Коммит f07324ee10
4 изменённых файлов: 101 добавлений и 64 удалений

Просмотреть файл

@ -13,7 +13,7 @@ var (
) )
func init() { func init() {
flag.StringVar(&cfg.MasterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.") flag.StringVar(&masterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
flag.StringVar(&clusterName, "etcd-cluster", "", "") flag.StringVar(&clusterName, "etcd-cluster", "", "")
// TODO: parse policy // TODO: parse policy
flag.Parse() flag.Parse()

Просмотреть файл

@ -1,5 +1,9 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
pushd "./cmd/backup" pushd "./cmd/backup"
go build . go build .
docker build --tag gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest . docker build --tag gcr.io/coreos-k8s-scale-testing/kubeetcdbackup:latest .

Просмотреть файл

@ -1,6 +1,11 @@
package backup package backup
import ( import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -28,6 +33,7 @@ func New(kclient *unversioned.Client, clusterName string, policy Policy) *Backup
} }
func (b *Backup) Run() { func (b *Backup) Run() {
lastSnapRev := int64(0)
for { for {
// TODO: add interval to backup policy // TODO: add interval to backup policy
time.Sleep(20 * time.Second) time.Sleep(20 * time.Second)
@ -41,23 +47,96 @@ func (b *Backup) Run() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
for i := range pods.Items { if len(pods.Items) == 0 {
m := etcdutil.Member{Name: pods.Items[i].Name} logrus.Warning("no running pods found.")
cfg := clientv3.Config{ continue
Endpoints: []string{m.ClientAddr()},
DialTimeout: 5 * time.Second,
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
logrus.Errorf("clientv3.New failed: %v", err)
continue
}
resp, err := etcdcli.Get(context.TODO(), "/", clientv3.WithSerializable())
if err != nil {
logrus.Errorf("etcdcli.Get failed: %v", err)
continue
}
logrus.Infof("member: %s, revision: %d", m.Name, resp.Header.Revision)
} }
member, rev, err := getMemberWithMaxRev(pods)
if err != nil {
logrus.Error(err)
}
if member == nil {
logrus.Warning("no reachable member")
continue
}
if rev == lastSnapRev {
logrus.Info("skipped creating new backup: no change since last time")
continue
}
log.Printf("saving backup for cluster (%s)", b.clusterName)
if err := writeSnap(member, rev); err != nil {
logrus.Errorf("write snapshot failed: %v", err)
continue
}
lastSnapRev = rev
} }
} }
func writeSnap(m *etcdutil.Member, rev int64) error {
cfg := clientv3.Config{
Endpoints: []string{m.ClientAddr()},
DialTimeout: 5 * time.Second,
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
return fmt.Errorf("failed to create etcd client (%v)", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
rc, err := etcdcli.Maintenance.Snapshot(ctx)
if err != nil {
return fmt.Errorf("failed to receive snapshot (%v)", err)
}
defer rc.Close()
// TODO: custom backup dir
tmpfile, err := ioutil.TempFile("", "snapshot")
n, err := io.Copy(tmpfile, rc)
if err != nil {
tmpfile.Close()
os.Remove(tmpfile.Name())
return fmt.Errorf("failed to save snapshot: %v", err)
}
cancel()
tmpfile.Close()
nextSnapshotName := makeFilename(rev)
err = os.Rename(tmpfile.Name(), nextSnapshotName)
if err != nil {
os.Remove(tmpfile.Name())
return fmt.Errorf("rename snapshot from %s to %s failed: %v", tmpfile.Name(), nextSnapshotName, err)
}
log.Printf("saved snapshot %s (size: %d) successfully", nextSnapshotName, n)
return nil
}
func makeFilename(rev int64) string {
return fmt.Sprintf("%016x.backup", rev)
}
func getMemberWithMaxRev(pods *api.PodList) (*etcdutil.Member, int64, error) {
var member *etcdutil.Member
maxRev := int64(0)
for i := range pods.Items {
m := &etcdutil.Member{Name: pods.Items[i].Name}
cfg := clientv3.Config{
Endpoints: []string{m.ClientAddr()},
DialTimeout: 5 * time.Second,
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
return nil, 0, fmt.Errorf("failed to create etcd client (%v)", err)
}
resp, err := etcdcli.Get(context.TODO(), "/", clientv3.WithSerializable())
if err != nil {
return nil, 0, fmt.Errorf("etcdcli.Get failed: %v", err)
}
logrus.Infof("member: %s, revision: %d", m.Name, resp.Header.Revision)
if resp.Header.Revision > maxRev {
maxRev = resp.Header.Revision
member = m
}
}
return member, maxRev, nil
}

Просмотреть файл

@ -2,9 +2,6 @@ package cluster
import ( import (
"fmt" "fmt"
"io"
"io/ioutil"
"os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -205,49 +202,6 @@ func (c *Cluster) removePodAndService(name string) error {
return nil return nil
} }
func (c *Cluster) backup() error {
clientAddr := "todo"
nextSnapshotName := "todo"
cfg := clientv3.Config{
Endpoints: []string{clientAddr},
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
log.Println("saving snapshot from cluster", c.name)
rc, err := etcdcli.Maintenance.Snapshot(ctx)
cancel()
if err != nil {
return err
}
tmpfile, err := ioutil.TempFile(c.backupDir, "snapshot")
n, err := io.Copy(tmpfile, rc)
if err != nil {
tmpfile.Close()
os.Remove(tmpfile.Name())
log.Printf("saving snapshot from cluster %s error: %v", c.name, err)
return err
}
err = os.Rename(tmpfile.Name(), nextSnapshotName)
if err != nil {
os.Remove(tmpfile.Name())
log.Printf("renaming snapshot from cluster %s error: %v", c.name, err)
return err
}
log.Printf("saved snapshot %v (size: %d) from cluster %s", n, nextSnapshotName, c.name)
return nil
}
func (c *Cluster) monitorPods() { func (c *Cluster) monitorPods() {
opts := k8sapi.ListOptions{ opts := k8sapi.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{ LabelSelector: labels.SelectorFromSet(map[string]string{