Merge pull request #14 from hongchaodeng/master

code refactor
This commit is contained in:
Hongchao Deng 2016-08-10 15:55:50 -07:00 коммит произвёл GitHub
Родитель fc5098f24d 151728185e
Коммит 42e9bda906
4 изменённых файлов: 114 добавлений и 86 удалений

Просмотреть файл

@ -16,45 +16,47 @@ import (
"k8s.io/kubernetes/pkg/util/intstr"
)
type clusterEventType string
const (
eventNewCluster clusterEventType = "Add"
eventDeleteCluster clusterEventType = "Delete"
eventMemberDeleted clusterEventType = "MemberDeleted"
)
type clusterEvent struct {
typ clusterEventType
size int
}
type Cluster struct {
kclient *unversioned.Client
name string
idCounter int
eventCh chan *Event
eventCh chan *clusterEvent
stopCh chan struct{}
}
func newCluster(kclient *unversioned.Client, name string) *Cluster {
return &Cluster{
func newCluster(kclient *unversioned.Client, name string, size int) *Cluster {
c := &Cluster{
kclient: kclient,
name: name,
eventCh: make(chan *Event, 100),
eventCh: make(chan *clusterEvent, 100),
stopCh: make(chan struct{}),
}
go c.run()
c.send(&clusterEvent{
typ: eventNewCluster,
size: size,
})
return c
}
func (c *Cluster) Run() {
go c.monitorMembers()
for {
select {
case event := <-c.eventCh:
switch event.Type {
case "ADDED":
c.createCluster(event.Object)
case "DELETED":
c.deleteCluster(event.Object)
}
case <-c.stopCh:
}
}
func (c *Cluster) Delete() {
c.send(&clusterEvent{typ: eventDeleteCluster})
}
func (c *Cluster) Stop() {
close(c.stopCh)
}
func (c *Cluster) Handle(ev *Event) {
func (c *Cluster) send(ev *clusterEvent) {
select {
case c.eventCh <- ev:
case <-c.stopCh:
@ -63,40 +65,59 @@ func (c *Cluster) Handle(ev *Event) {
}
}
func (c *Cluster) createCluster(cluster EtcdCluster) {
size := cluster.Size
clusterName := cluster.Metadata["name"]
func (c *Cluster) run() {
go c.monitorMembers()
for {
select {
case event := <-c.eventCh:
switch event.typ {
case eventNewCluster:
c.create(event.size)
case eventMemberDeleted:
case eventDeleteCluster:
c.delete()
close(c.stopCh)
return
}
}
}
}
func (c *Cluster) create(size int) {
initialCluster := []string{}
for i := 0; i < size; i++ {
initialCluster = append(initialCluster, fmt.Sprintf("%s-%04d=http://%s-%04d:2380", clusterName, i, clusterName, i))
etcdName := fmt.Sprintf("%s-%04d", c.name, i)
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", etcdName, makeEtcdPeerAddr(etcdName)))
}
for i := 0; i < size; i++ {
c.launchMember(clusterName, i, initialCluster, "new")
if err := c.launchMember(i, initialCluster, "new"); err != nil {
// TODO: we need to clean up already created ones.
panic(err)
}
}
c.idCounter = size
}
func (c *Cluster) launchMember(clusterName string, id int, initialCluster []string, state string) {
etcdName := fmt.Sprintf("%s-%04d", clusterName, id)
svc := makeEtcdService(etcdName, clusterName)
_, err := c.kclient.Services("default").Create(svc)
if err != nil {
panic(err)
func (c *Cluster) launchMember(id int, initialCluster []string, state string) error {
etcdName := fmt.Sprintf("%s-%04d", c.name, id)
svc := makeEtcdService(etcdName, c.name)
if _, err := c.kclient.Services("default").Create(svc); err != nil {
return err
}
pod := makeEtcdPod(etcdName, clusterName, initialCluster, state)
_, err = c.kclient.Pods("default").Create(pod)
if err != nil {
panic(err)
pod := makeEtcdPod(etcdName, c.name, initialCluster, state)
if _, err := c.kclient.Pods("default").Create(pod); err != nil {
return err
}
return nil
}
func (c *Cluster) deleteCluster(cluster EtcdCluster) {
clusterName := cluster.Metadata["name"]
func (c *Cluster) delete() {
option := api.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
"etcd_cluster": clusterName,
"etcd_cluster": c.name,
}),
}
@ -133,8 +154,7 @@ func (c *Cluster) monitorMembers() {
}
var prevPods []*api.Pod
var currPods []*api.Pod
// TODO: assuming delete one pod and add one pod. Handle more complex case later.
// TODO: What about unremoved service?
// TODO: Select "etcd_node" to remove left service.
for {
select {
case <-c.stopCh:
@ -151,30 +171,23 @@ func (c *Cluster) monitorMembers() {
currPods = append(currPods, &podList.Items[i])
}
// DEBUGGING..
fmt.Printf("previous pods: ")
for _, pod := range prevPods {
fmt.Printf("%s, ", pod.Name)
}
fmt.Println("")
fmt.Printf("current pods: ")
for _, pod := range currPods {
fmt.Printf("%s, ", pod.Name)
}
fmt.Println("")
deletedPod := findDeleted(prevPods, currPods)
prevPods = currPods
// We are recovering one member at a time now.
deletedPod, remainingPods := findDeletedOne(prevPods, currPods)
if deletedPod == nil {
// This will change prevPods if it keeps adding initially.
prevPods = currPods
continue
}
// currPods could be less than remainingPods.
prevPods = remainingPods
// Only using currPods is safe
if len(currPods) == 0 {
panic("unexpected")
panic("TODO: All removed. Impossible. Anyway, we can't use etcd client to change membership.")
}
// TODO: put this into central event handling
cfg := clientv3.Config{
Endpoints: []string{fmt.Sprintf("http://%s:2379", currPods[0].Name)},
Endpoints: []string{makeClientAddr(currPods[0].Name)},
}
etcdcli, err := clientv3.New(cfg)
if err != nil {
@ -185,7 +198,7 @@ func (c *Cluster) monitorMembers() {
panic(err)
}
member := findLostMember(resp.Members, deletedPod)
member := findLostMember(resp.Members, deletedPod.Name)
_, err = etcdcli.MemberRemove(context.TODO(), member.ID)
if err != nil {
panic(err)
@ -193,49 +206,58 @@ func (c *Cluster) monitorMembers() {
log.Printf("removed member %v with ID %d\n", member.Name, member.ID)
etcdName := fmt.Sprintf("%s-%04d", c.name, c.idCounter)
initialCluster := buildInitialClusters(resp.Members, member, etcdName)
_, err = etcdcli.MemberAdd(context.TODO(), []string{fmt.Sprintf("http://%s:2380", etcdName)})
initialCluster := buildInitialCluster(resp.Members, member, etcdName)
_, err = etcdcli.MemberAdd(context.TODO(), []string{makeEtcdPeerAddr(etcdName)})
if err != nil {
panic(err)
}
log.Printf("added member, cluster: %s", initialCluster)
c.launchMember(c.name, c.idCounter, initialCluster, "existing")
c.launchMember(c.idCounter, initialCluster, "existing")
c.idCounter++
}
}
func buildInitialClusters(members []*etcdserverpb.Member, removed *etcdserverpb.Member, newMember string) (res []string) {
func buildInitialCluster(members []*etcdserverpb.Member, removed *etcdserverpb.Member, newMember string) (res []string) {
for _, m := range members {
if m.Name == removed.Name {
continue
}
res = append(res, fmt.Sprintf("%s=http://%s:2380", m.Name, m.Name))
res = append(res, fmt.Sprintf("%s=%s", m.Name, makeEtcdPeerAddr(m.Name)))
}
res = append(res, fmt.Sprintf("%s=http://%s:2380", newMember, newMember))
res = append(res, fmt.Sprintf("%s=%s", newMember, makeEtcdPeerAddr(newMember)))
return res
}
func findLostMember(members []*etcdserverpb.Member, deletedPod *api.Pod) *etcdserverpb.Member {
func findLostMember(members []*etcdserverpb.Member, lostMemberName string) *etcdserverpb.Member {
for _, m := range members {
if m.Name == deletedPod.Name {
if m.Name == lostMemberName {
return m
}
}
return nil
}
func findDeleted(pl1, pl2 []*api.Pod) *api.Pod {
// Find one deleted pod in l2 from l1. Return the deleted pod and the remaining pods.
func findDeletedOne(l1, l2 []*api.Pod) (*api.Pod, []*api.Pod) {
exist := map[string]struct{}{}
for _, pod := range pl2 {
for _, pod := range l2 {
exist[pod.Name] = struct{}{}
}
for _, pod := range pl1 {
for i, pod := range l1 {
if _, ok := exist[pod.Name]; !ok {
return pod
return pod, append(l1[:i], l1[i+1:]...)
}
}
return nil
return nil, l2
}
func makeClientAddr(name string) string {
return fmt.Sprintf("http://%s:2379", name)
}
func makeEtcdPeerAddr(etcdName string) string {
return fmt.Sprintf("http://%s:2380", etcdName)
}
func makeEtcdService(etcdName, clusterName string) *api.Service {
@ -287,13 +309,13 @@ func makeEtcdPod(etcdName, clusterName string, initialCluster []string, state st
"--name",
etcdName,
"--initial-advertise-peer-urls",
fmt.Sprintf("http://%s:2380", etcdName),
makeEtcdPeerAddr(etcdName),
"--listen-peer-urls",
"http://0.0.0.0:2380",
"--listen-client-urls",
"http://0.0.0.0:2379",
"--advertise-client-urls",
fmt.Sprintf("http://%s:2379", etcdName),
makeClientAddr(etcdName),
"--initial-cluster",
strings.Join(initialCluster, ","),
"--initial-cluster-state",

Просмотреть файл

@ -6,5 +6,6 @@ metadata:
name: kubeetcdctrl
spec:
containers:
-
name: kubeetcdctrl
image: gcr.io/coreos-k8s-scale-testing/kubeetcdctrl:latest

Просмотреть файл

@ -4,6 +4,16 @@ set -o errexit
set -o nounset
set -o pipefail
if ! which go > /dev/null; then
echo "go needs to be installed"
exit 1
fi
if ! which docker > /dev/null; then
echo "docker needs to be installed"
exit 1
fi
go build .
docker build --tag gcr.io/coreos-k8s-scale-testing/kubeetcdctrl:latest .
gcloud docker push gcr.io/coreos-k8s-scale-testing/kubeetcdctrl:latest
gcloud docker push gcr.io/coreos-k8s-scale-testing/kubeetcdctrl:latest

11
main.go
Просмотреть файл

@ -43,14 +43,9 @@ func (c *etcdClusterController) Run() {
clusterName := event.Object.Metadata["name"]
switch event.Type {
case "ADDED":
clus := newCluster(c.kclient, clusterName)
c.clusters[clusterName] = clus
go clus.Run()
clus.Handle(event)
c.clusters[clusterName] = newCluster(c.kclient, clusterName, event.Object.Size)
case "DELETED":
clus := c.clusters[clusterName]
clus.Handle(event)
clus.Stop()
c.clusters[clusterName].Delete()
delete(c.clusters, clusterName)
}
case err := <-errCh:
@ -80,7 +75,7 @@ func monitorEtcdCluster(httpClient *http.Client) (<-chan *Event, <-chan error) {
if err != nil {
errc <- err
}
log.Println("etcd cluster event:", ev.Type, ev.Object.Size, ev.Object.Metadata)
log.Printf("etcd cluster event: %v %#v\n", ev.Type, ev.Object)
events <- ev
}
}()