controller: seed member migration

This commit is contained in:
Hongchao Deng 2016-10-01 11:39:39 -07:00
Родитель b05f2f6843
Коммит 0d0556cc05
4 изменённых файлов: 242 добавлений и 5 удалений

Просмотреть файл

@ -8,6 +8,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/kube-etcd-controller/pkg/util/constants"
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
"github.com/coreos/kube-etcd-controller/pkg/util/k8sutil"
@ -16,6 +17,7 @@ import (
k8sapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/wait"
)
type clusterEventType string
@ -150,16 +152,136 @@ func (c *Cluster) restoreSeedMember() error {
}
func (c *Cluster) migrateSeedMember() error {
// add a new member into the existing seed cluster
log.Infof("migrating seed member (%s)", c.spec.Seed.MemberClientEndpoints)
cfg := clientv3.Config{
Endpoints: c.spec.Seed.MemberClientEndpoints,
DialTimeout: constants.DefaultDialTimeout,
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.MemberList(ctx)
cancel()
if err != nil {
return err
}
if len(resp.Members) != 1 {
return fmt.Errorf("seed cluster contains more than one member")
}
seedMember := resp.Members[0]
seedID := resp.Members[0].ID
log.Infof("adding a new member")
// create service with node port on peer port
// the seed member outside the Kubernetes cluster then can access the service
etcdName := fmt.Sprintf("%s-%04d", c.name, c.idCounter)
npsrv, nperr := k8sutil.CreateEtcdNodePortService(c.kclient, etcdName, c.name, c.namespace)
if nperr != nil {
return nperr
}
// create the first member inside Kubernetes for migration
m := &etcdutil.Member{Name: etcdName, AdditionalPeerURL: "http://127.0.0.1:" + k8sutil.GetNodePortString(npsrv)}
mpurls := []string{fmt.Sprintf("http://%s:2380", m.Name), m.AdditionalPeerURL}
err = wait.Poll(1*time.Second, 20*time.Second, func() (done bool, err error) {
ctx, _ := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
_, err = etcdcli.MemberAdd(ctx, mpurls)
if err != nil {
if err == rpctypes.ErrUnhealthy {
return false, nil
}
return false, fmt.Errorf("etcdcli failed to add member: %v", err)
}
return true, nil
})
if err != nil {
return err
}
if err := k8sutil.CreateEtcdService(c.kclient, m.Name, c.name, c.namespace); err != nil {
return err
}
initialCluster := make([]string, 0)
for _, purl := range seedMember.PeerURLs {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", seedMember.Name, purl))
}
for _, purl := range mpurls {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", m.Name, purl))
}
pod := k8sutil.MakeEtcdPod(m, initialCluster, c.name, "existing", "", c.spec.AntiAffinity, c.spec.HostNetwork)
if err := k8sutil.CreateAndWaitPod(c.kclient, pod, m, c.namespace); err != nil {
return err
}
c.idCounter++
etcdcli.Close()
log.Infof("added the new member")
// wait for the delay
delay := time.Duration(c.spec.Seed.RemoveDelay) * time.Second
log.Infof("wait %v before remove the original seed member", delay)
log.Infof("wait %v before remove the seed member", delay)
time.Sleep(delay)
log.Infof("removing the seed member")
cfg = clientv3.Config{
Endpoints: []string{m.ClientAddr()},
DialTimeout: constants.DefaultDialTimeout,
}
etcdcli, err = clientv3.New(cfg)
if err != nil {
return err
}
// delete the original seed member from the etcd cluster.
// now we have migrate the seed member into kubernetes.
// our controller not takes control over it.
ctx, cancel = context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
_, err = etcdcli.Cluster.MemberRemove(ctx, seedID)
cancel()
if err != nil {
return fmt.Errorf("etcdcli failed to remove seed member: %v", err)
}
log.Infof("removed the seed member")
// remove the external nodeport service and change the peerURL that only
// contains the internal service
err = c.kclient.Services(c.namespace).Delete(npsrv.ObjectMeta.Name)
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
}
ctx, cancel = context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err = etcdcli.MemberList(ctx)
cancel()
if err != nil {
return err
}
log.Infof("updating the peer urls (%v) for the member %x", resp.Members[0].PeerURLs, resp.Members[0].ID)
m.AdditionalPeerURL = ""
for {
ctx, cancel = context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
_, err = etcdcli.MemberUpdate(ctx, resp.Members[0].ID, []string{m.PeerAddr()})
cancel()
if err == nil {
break
}
if err != context.DeadlineExceeded {
return err
}
}
etcdcli.Close()
log.Infof("finished the member migration")
return nil
}

Просмотреть файл

@ -11,6 +11,9 @@ type Member struct {
// We know the ID of a member when we get the member information from etcd,
// but not from Kubernetes pod list.
ID uint64
// AdditionalPeerURL is only used for bootstrapping a member for seed cluster migration.
AdditionalPeerURL string
}
func (m *Member) ClientAddr() string {
@ -18,7 +21,11 @@ func (m *Member) ClientAddr() string {
}
func (m *Member) PeerAddr() string {
return fmt.Sprintf("http://%s:2380", m.Name)
pa := fmt.Sprintf("http://%s:2380", m.Name)
if len(m.AdditionalPeerURL) == 0 {
return pa
}
return strings.Join([]string{pa, m.AdditionalPeerURL}, ",")
}
type MemberSet map[string]*Member

Просмотреть файл

@ -26,6 +26,8 @@ import (
)
const (
etcdImage = "quay.io/coreos/etcd:v3.1.0-alpha.1"
// TODO: This is constant for current purpose. We might make it configurable later.
etcdDir = "/var/etcd"
dataDir = etcdDir + "/data"
@ -49,7 +51,7 @@ func makeRestoreInitContainerSpec(backupAddr, name, token string) string {
},
{
Name: "restore-datadir",
Image: "quay.io/coreos/etcd:latest",
Image: etcdImage,
Command: []string{
"/bin/sh", "-c",
fmt.Sprintf("ETCDCTL_API=3 etcdctl snapshot restore %[1]s"+
@ -152,6 +154,10 @@ func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName,
return nil
}
func GetNodePortString(srv *api.Service) string {
return fmt.Sprint(srv.Spec.Ports[0].NodePort)
}
func CreateStorageClass(kubecli *unversioned.Client) error {
class := &storage.StorageClass{
ObjectMeta: api.ObjectMeta{
@ -227,6 +233,11 @@ func CreateEtcdService(kclient *unversioned.Client, etcdName, clusterName, ns st
return nil
}
func CreateEtcdNodePortService(kclient *unversioned.Client, etcdName, clusterName, ns string) (*api.Service, error) {
svc := makeEtcdNodePortService(etcdName, clusterName)
return kclient.Services(ns).Create(svc)
}
// TODO: use a struct to replace the huge arg list.
func CreateAndWaitPod(kclient *unversioned.Client, pod *api.Pod, m *etcdutil.Member, ns string) error {
if _, err := kclient.Pods(ns).Create(pod); err != nil {
@ -273,6 +284,32 @@ func makeEtcdService(etcdName, clusterName string) *api.Service {
return svc
}
func makeEtcdNodePortService(etcdName, clusterName string) *api.Service {
labels := map[string]string{
"etcd_node": etcdName,
"etcd_cluster": clusterName,
}
svc := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: etcdName + "-nodeport",
Labels: labels,
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{
{
Name: "server",
Port: 2380,
TargetPort: intstr.FromInt(2380),
Protocol: api.ProtocolTCP,
},
},
Type: api.ServiceTypeNodePort,
Selector: labels,
},
}
return svc
}
func AddRecoveryToPod(pod *api.Pod, clusterName, name, token string) {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
@ -318,7 +355,7 @@ func MakeEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state
{
Command: commands,
Name: m.Name,
Image: "quay.io/coreos/etcd:latest",
Image: etcdImage,
Ports: []api.ContainerPort{
{
Name: "server",

Просмотреть файл

@ -0,0 +1,71 @@
package e2e
import (
"fmt"
"net/url"
"os"
"path"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"github.com/coreos/etcd/embed"
"github.com/coreos/kube-etcd-controller/pkg/cluster"
"github.com/coreos/kube-etcd-controller/test/e2e/framework"
)
func TestCreateClusterWithSeedMember(t *testing.T) {
dir := path.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(dir)
defer os.RemoveAll(dir)
embedCfg := embed.NewConfig()
embedCfg.Dir = dir
lpurl, _ := url.Parse("http://0.0.0.0:2380")
lcurl, _ := url.Parse("http://0.0.0.0:2379")
embedCfg.LCUrls = []url.URL{*lcurl}
embedCfg.LPUrls = []url.URL{*lpurl}
e, err := embed.StartEtcd(embedCfg)
if err != nil {
t.Fatal(err)
}
<-e.Server.ReadyNotify()
fmt.Println("etcdserver is ready")
f := framework.Global
c := &cluster.EtcdCluster{
TypeMeta: unversioned.TypeMeta{
Kind: "EtcdCluster",
APIVersion: "coreos.com/v1",
},
ObjectMeta: api.ObjectMeta{
GenerateName: "etcd-test-seed-",
},
Spec: cluster.Spec{
Size: 3,
Seed: &cluster.SeedPolicy{
MemberClientEndpoints: []string{embedCfg.ACUrls[0].String()},
RemoveDelay: 30,
},
},
}
testEtcd, err := createEtcdCluster(f, c)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := deleteEtcdCluster(f, testEtcd.Name); err != nil {
t.Fatal(err)
}
}()
if _, err := waitUntilSizeReached(f, testEtcd.Name, 3, 60); err != nil {
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
}
}