Merge pull request #9 from hongchaodeng/master
implement delete etcd cluster logic
This commit is contained in:
Коммит
2632b22aae
82
main.go
82
main.go
|
@ -12,6 +12,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
)
|
||||
|
||||
|
@ -22,25 +23,42 @@ func init() {
|
|||
flag.Parse()
|
||||
}
|
||||
|
||||
type EtcdCluster struct {
|
||||
Kind string `json:"kind"`
|
||||
ApiVersion string `json:"apiVersion"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
Size int `json:"size"`
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
Type string
|
||||
Object EtcdCluster
|
||||
}
|
||||
|
||||
type etcdClusterController struct {
|
||||
kclient *unversioned.Client
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) Run() {
|
||||
eventCh, errCh := monitorNewCluster()
|
||||
eventCh, errCh := monitorEtcdCluster()
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
c.createCluster(event)
|
||||
switch event.Type {
|
||||
case "ADDED":
|
||||
c.createCluster(event.Object)
|
||||
case "DELETED":
|
||||
c.deleteCluster(event.Object)
|
||||
}
|
||||
case err := <-errCh:
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) createCluster(event newCluster) {
|
||||
size := event.Size
|
||||
clusterName := event.Metadata["name"]
|
||||
func (c *etcdClusterController) createCluster(cluster EtcdCluster) {
|
||||
size := cluster.Size
|
||||
clusterName := cluster.Metadata["name"]
|
||||
|
||||
initialCluster := []string{}
|
||||
for i := 0; i < size; i++ {
|
||||
|
@ -64,20 +82,41 @@ func (c *etcdClusterController) createCluster(event newCluster) {
|
|||
}
|
||||
}
|
||||
|
||||
type newCluster struct {
|
||||
Kind string `json:"kind"`
|
||||
ApiVersion string `json:"apiVersion"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
Size int `json:"size"`
|
||||
func (c *etcdClusterController) deleteCluster(cluster EtcdCluster) {
|
||||
clusterName := cluster.Metadata["name"]
|
||||
option := api.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
"etcd_cluster": clusterName,
|
||||
}),
|
||||
}
|
||||
|
||||
pods, err := c.kclient.Pods("default").List(option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for i := range pods.Items {
|
||||
pod := &pods.Items[i]
|
||||
err = c.kclient.Pods("default").Delete(pod.Name, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
services, err := c.kclient.Services("default").List(option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for i := range services.Items {
|
||||
service := &services.Items[i]
|
||||
err = c.kclient.Services("default").Delete(service.Name)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
Type string
|
||||
Object newCluster
|
||||
}
|
||||
|
||||
func monitorNewCluster() (<-chan newCluster, <-chan error) {
|
||||
events := make(chan newCluster)
|
||||
func monitorEtcdCluster() (<-chan *Event, <-chan error) {
|
||||
events := make(chan *Event)
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
resp, err := http.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true")
|
||||
|
@ -92,14 +131,13 @@ func monitorNewCluster() (<-chan newCluster, <-chan error) {
|
|||
log.Println("start watching...")
|
||||
for {
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
var ev Event
|
||||
err = decoder.Decode(&ev)
|
||||
ev := new(Event)
|
||||
err = decoder.Decode(ev)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
}
|
||||
event := ev.Object
|
||||
log.Println("new cluster size:", event.Size, event.Metadata)
|
||||
events <- event
|
||||
log.Println("etcd cluster event:", ev.Type, ev.Object.Size, ev.Object.Metadata)
|
||||
events <- ev
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче