Merge pull request #36 from colhom/use-member-api-retry

Use Cluster for member api retry
This commit is contained in:
colhom 2016-08-21 16:17:49 -07:00 коммит произвёл GitHub
Родитель 8a298ca2d4 c6a450991f
Коммит dbf5f03465
3 изменённых файлов: 24 добавлений и 9 удалений

Просмотреть файл

@ -220,8 +220,7 @@ func (c *Cluster) monitorMembers() {
panic("TODO: All pods removed. Impossible. Anyway, we can't create etcd client.") panic("TODO: All pods removed. Impossible. Anyway, we can't create etcd client.")
} }
c.updateMembers([]string{makeClientAddr(running.PickOne().Name)}) c.updateMembers(running.ClientURLs())
if err := c.reconcile(running); err != nil { if err := c.reconcile(running); err != nil {
panic(err) panic(err)
} }
@ -231,7 +230,8 @@ func (c *Cluster) monitorMembers() {
func (c *Cluster) updateMembers(endpoints []string) { func (c *Cluster) updateMembers(endpoints []string) {
// TODO: put this into central event handling // TODO: put this into central event handling
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: endpoints, Endpoints: endpoints,
DialTimeout: 5 * time.Second,
} }
etcdcli, err := clientv3.New(cfg) etcdcli, err := clientv3.New(cfg)
if err != nil { if err != nil {

Просмотреть файл

@ -61,3 +61,13 @@ func (ms MemberSet) Add(m Member) {
func (ms MemberSet) Remove(name string) { func (ms MemberSet) Remove(name string) {
delete(ms, name) delete(ms, name)
} }
func (ms MemberSet) ClientURLs() []string {
endpoints := make([]string, len(ms))
i := 0
for _, m := range ms {
endpoints[i] = makeClientAddr(m.Name)
i++
}
return endpoints
}

Просмотреть файл

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"log" "log"
"time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -57,14 +58,15 @@ func (c *Cluster) reconcile(running MemberSet) error {
func (c *Cluster) recoverOneMember(toRecover Member) error { func (c *Cluster) recoverOneMember(toRecover Member) error {
// Remove toRecover membership first since it's gone // Remove toRecover membership first since it's gone
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{makeClientAddr(c.members.PickOne().Name)}, Endpoints: c.members.ClientURLs(),
DialTimeout: 5 * time.Second,
} }
etcdcli, err := clientv3.New(cfg) etcdcli, err := clientv3.New(cfg)
if err != nil { if err != nil {
return err return err
} }
clustercli := clientv3.NewCluster(etcdcli)
_, err = etcdcli.MemberRemove(context.TODO(), toRecover.ID) _, err = clustercli.MemberRemove(context.TODO(), toRecover.ID)
if err != nil { if err != nil {
return err return err
} }
@ -73,17 +75,20 @@ func (c *Cluster) recoverOneMember(toRecover Member) error {
// Add a new member // Add a new member
newMember := fmt.Sprintf("%s-%04d", c.name, c.idCounter) newMember := fmt.Sprintf("%s-%04d", c.name, c.idCounter)
resp, err := etcdcli.MemberAdd(context.TODO(), []string{makeEtcdPeerAddr(newMember)})
resp, err := clustercli.MemberAdd(context.TODO(), []string{makeEtcdPeerAddr(newMember)})
if err != nil { if err != nil {
panic(err) panic(err)
} }
c.idCounter++
c.members.Add(Member{Name: resp.Member.Name, ID: resp.Member.ID}) c.members.Add(Member{Name: newMember, ID: resp.Member.ID})
initialCluster := c.members.PeerURLPairs() initialCluster := c.members.PeerURLPairs()
if err := c.createPodAndService(c.idCounter, initialCluster, "existing"); err != nil { if err := c.createPodAndService(c.idCounter, initialCluster, "existing"); err != nil {
return err return err
} }
c.idCounter++
log.Printf("added member, cluster: %s", initialCluster) log.Printf("added member, cluster: %s", initialCluster)
return nil return nil
} }