From fe5b92504cfb65dd9fe8f72a942924c42f402fdb Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Tue, 23 Aug 2016 14:24:21 -0700 Subject: [PATCH 1/3] refactor; create TPR on init --- cluster.go | 7 +++ controller.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 107 ---------------------------------------------- util.go | 55 ++++++++++++++++++++++-- 4 files changed, 174 insertions(+), 111 deletions(-) create mode 100644 controller.go diff --git a/cluster.go b/cluster.go index 07c336f..8b7554b 100644 --- a/cluster.go +++ b/cluster.go @@ -15,6 +15,13 @@ import ( "k8s.io/kubernetes/pkg/labels" ) +type EtcdCluster struct { + Kind string `json:"kind"` + ApiVersion string `json:"apiVersion"` + Metadata map[string]string `json:"metadata"` + Spec Spec `json: "spec"` +} + type clusterEventType string const ( diff --git a/controller.go b/controller.go new file mode 100644 index 0000000..5535f0c --- /dev/null +++ b/controller.go @@ -0,0 +1,116 @@ +package main + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/util/wait" +) + +type Event struct { + Type string + Object EtcdCluster +} + +type etcdClusterController struct { + kclient *unversioned.Client + clusters map[string]*Cluster +} + +func (c *etcdClusterController) Run() { + if err := c.createTPR(); err != nil { + panic(err) + } + log.Println("etcd cluster controller starts running...") + + eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client) + for { + select { + case event := <-eventCh: + clusterName := event.Object.Metadata["name"] + switch event.Type { + case "ADDED": + c.clusters[clusterName] = newCluster(c.kclient, clusterName, event.Object.Spec) + case "DELETED": + c.clusters[clusterName].Delete() + delete(c.clusters, clusterName) + } + case err := <-errCh: + panic(err) + } + } +} + +func (c *etcdClusterController) createTPR() error { + tpr := &extensions.ThirdPartyResource{ + ObjectMeta: api.ObjectMeta{ + Name: "etcd-cluster.coreos.com", + }, + Versions: []extensions.APIVersion{ + {Name: "v1"}, + }, + Description: "Managed etcd clusters", + } + _, err := c.kclient.ThirdPartyResources().Create(tpr) + if err != nil { + if isKubernetesResourceAlreadyExistError(err) { + return nil + } + return err + } + + err = wait.Poll(5*time.Second, 100*time.Second, + func() (done bool, err error) { + resp, err := watchETCDCluster(c.kclient.RESTClient.Client) + if err != nil { + return false, err + } + if resp.StatusCode == 200 { + return true, nil + } + if resp.StatusCode == 404 { + return false, nil + } + return false, errors.New("Invalid status code: " + resp.Status) + }) + return err +} + +func watchETCDCluster(httpClient *http.Client) (*http.Response, error) { + return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true") +} + +func monitorEtcdCluster(httpClient *http.Client) (<-chan *Event, <-chan error) { + events := make(chan *Event) + errc := make(chan error, 1) + go func() { + resp, err := watchETCDCluster(httpClient) + if err != nil { + errc <- err + return + } + if resp.StatusCode != 200 { + errc <- errors.New("Invalid status code: " + resp.Status) + return + } + log.Println("start watching...") + for { + decoder := json.NewDecoder(resp.Body) + ev := new(Event) + err = decoder.Decode(ev) + if err != nil { + errc <- err + } + log.Printf("etcd cluster event: %v %#v\n", ev.Type, ev.Object) + events <- ev + } + }() + + return events, errc +} diff --git a/main.go b/main.go index 152b46c..c7b8225 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,9 @@ package main import ( - "encoding/json" - "errors" "flag" - "fmt" - "log" - "net/http" - "net/url" "k8s.io/kubernetes/pkg/client/restclient" - "k8s.io/kubernetes/pkg/client/unversioned" ) var ( @@ -28,110 +21,10 @@ func init() { flag.Parse() } -type EtcdCluster struct { - Kind string `json:"kind"` - ApiVersion string `json:"apiVersion"` - Metadata map[string]string `json:"metadata"` - Spec Spec `json: "spec"` -} - -type Event struct { - Type string - Object EtcdCluster -} - -type etcdClusterController struct { - kclient *unversioned.Client - clusters map[string]*Cluster -} - -func (c *etcdClusterController) Run() { - eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client) - for { - select { - case event := <-eventCh: - clusterName := event.Object.Metadata["name"] - switch event.Type { - case "ADDED": - c.clusters[clusterName] = newCluster(c.kclient, clusterName, event.Object.Spec) - case "DELETED": - c.clusters[clusterName].Delete() - delete(c.clusters, clusterName) - } - case err := <-errCh: - panic(err) - } - } -} - -func monitorEtcdCluster(httpClient *http.Client) (<-chan *Event, <-chan error) { - events := make(chan *Event) - errc := make(chan error, 1) - go func() { - resp, err := httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true") - if err != nil { - errc <- err - return - } - if resp.StatusCode != 200 { - errc <- errors.New("Invalid status code: " + resp.Status) - return - } - log.Println("start watching...") - for { - decoder := json.NewDecoder(resp.Body) - ev := new(Event) - err = decoder.Decode(ev) - if err != nil { - errc <- err - } - log.Printf("etcd cluster event: %v %#v\n", ev.Type, ev.Object) - events <- ev - } - }() - - return events, errc -} - func main() { c := &etcdClusterController{ kclient: mustCreateClient(masterHost), clusters: make(map[string]*Cluster), } - log.Println("etcd cluster controller starts running...") c.Run() } - -func mustCreateClient(host string) *unversioned.Client { - if len(host) == 0 { - cfg, err := restclient.InClusterConfig() - if err != nil { - panic(err) - } - c, err := unversioned.NewInCluster() - if err != nil { - panic(err) - } - masterHost = cfg.Host - return c - } - - hostUrl, err := url.Parse(host) - if err != nil { - panic(fmt.Sprintf("error parsing host url %s : %v", host, err)) - } - cfg := &restclient.Config{ - Host: host, - QPS: 100, - Burst: 100, - } - if hostUrl.Scheme == "https" { - cfg.TLSClientConfig = tlsConfig - cfg.Insecure = tlsInsecure - } - c, err := unversioned.New(cfg) - if err != nil { - panic(err) - } - return c -} diff --git a/util.go b/util.go index 19e4a7f..c4aa3aa 100644 --- a/util.go +++ b/util.go @@ -2,6 +2,9 @@ package main import ( "encoding/json" + "fmt" + "net/http" + "net/url" "strings" "time" @@ -9,7 +12,9 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" unversionedAPI "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/watch" @@ -34,10 +39,7 @@ func createEtcdPod(kclient *unversioned.Client, initialCluster []string, m *Memb return err } _, err = watch.Until(100*time.Second, w, unversioned.PodRunningAndReady) - if err != nil { - return err - } - return nil + return err } // TODO: converge the port logic with member ClientAddr() and PeerAddr() @@ -147,6 +149,51 @@ func makeEtcdPod(m *Member, initialCluster []string, clusterName, state string, return pod } +func mustCreateClient(host string) *unversioned.Client { + if len(host) == 0 { + cfg, err := restclient.InClusterConfig() + if err != nil { + panic(err) + } + c, err := unversioned.NewInCluster() + if err != nil { + panic(err) + } + masterHost = cfg.Host + return c + } + + hostUrl, err := url.Parse(host) + if err != nil { + panic(fmt.Sprintf("error parsing host url %s : %v", host, err)) + } + cfg := &restclient.Config{ + Host: host, + QPS: 100, + Burst: 100, + } + if hostUrl.Scheme == "https" { + cfg.TLSClientConfig = tlsConfig + cfg.Insecure = tlsInsecure + } + c, err := unversioned.New(cfg) + if err != nil { + panic(err) + } + return c +} + +func isKubernetesResourceAlreadyExistError(err error) bool { + se, ok := err.(*apierrors.StatusError) + if !ok { + return false + } + if se.Status().Code == http.StatusConflict && se.Status().Reason == unversionedAPI.StatusReasonAlreadyExists { + return true + } + return false +} + func waitMemberReady(cli *clientv3.Client) error { for { _, err := cli.Get(context.TODO(), "/") From e72866dd02e4bd52c5ccf6d610e50761d218b2c6 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Tue, 23 Aug 2016 14:26:04 -0700 Subject: [PATCH 2/3] update readme --- README.md | 36 ++++++++---------------------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 4a513d9..59681ee 100644 --- a/README.md +++ b/README.md @@ -18,34 +18,6 @@ Managed etcd clusters on Kubernetes: - Backup only works for data in etcd3 storage, not etcd2 storage. -## Initialize the TPR - -(TODO: auto create TPR when deploy the controller) - -```bash -$ cat example/etcd-clusters-tpr.yaml -``` - -```yaml -apiVersion: extensions/v1beta1 -kind: ThirdPartyResource -description: "Managed etcd clusters" -metadata: - name: "etcd-cluster.coreos.com" -versions: - - name: v1 - - name: v2 -``` - -```bash -$ kubectl create -f example/etcd-clusters-tpr.yaml - -$ kubectl get thirdpartyresources -NAME DESCRIPTION VERSION(S) -etcd-cluster.coreos.com Managed etcd clusters v1,v2 -``` - - ## Deploy kube-etcd-controller ```bash @@ -53,6 +25,14 @@ $ kubectl create -f example/etcd-controller.yaml pod "kubeetcdctrl" created ``` +kube-etcd-controller will create a TPR automatically. + +```bash +$ kubectl get thirdpartyresources +NAME DESCRIPTION VERSION(S) +etcd-cluster.coreos.com Managed etcd clusters v1 +``` + ## Create an etcd cluster ```bash From ee750d3c56d4897b6de9da2a16e2f73579e08cc6 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Tue, 23 Aug 2016 14:26:19 -0700 Subject: [PATCH 3/3] remove tpr example --- example/etcd-clusters-tpr.yaml | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 example/etcd-clusters-tpr.yaml diff --git a/example/etcd-clusters-tpr.yaml b/example/etcd-clusters-tpr.yaml deleted file mode 100644 index b5b3826..0000000 --- a/example/etcd-clusters-tpr.yaml +++ /dev/null @@ -1,8 +0,0 @@ -apiVersion: extensions/v1beta1 -kind: ThirdPartyResource -description: "Managed etcd clusters" -metadata: - name: "etcd-cluster.coreos.com" -versions: - - name: v1 - - name: v2