Merge pull request #46 from hongchaodeng/master

Create TPR on init
This commit is contained in:
Xiang Li 2016-08-23 14:29:17 -07:00 коммит произвёл GitHub
Родитель c38bea8141 ee750d3c56
Коммит f4cc1567ba
6 изменённых файлов: 182 добавлений и 147 удалений

Просмотреть файл

@ -18,34 +18,6 @@ Managed etcd clusters on Kubernetes:
- Backup only works for data in etcd3 storage, not etcd2 storage.
## Initialize the TPR
(TODO: auto create TPR when deploy the controller)
```bash
$ cat example/etcd-clusters-tpr.yaml
```
```yaml
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "Managed etcd clusters"
metadata:
name: "etcd-cluster.coreos.com"
versions:
- name: v1
- name: v2
```
```bash
$ kubectl create -f example/etcd-clusters-tpr.yaml
$ kubectl get thirdpartyresources
NAME DESCRIPTION VERSION(S)
etcd-cluster.coreos.com Managed etcd clusters v1,v2
```
## Deploy kube-etcd-controller
```bash
@ -53,6 +25,14 @@ $ kubectl create -f example/etcd-controller.yaml
pod "kubeetcdctrl" created
```
kube-etcd-controller will create a TPR automatically.
```bash
$ kubectl get thirdpartyresources
NAME DESCRIPTION VERSION(S)
etcd-cluster.coreos.com Managed etcd clusters v1
```
## Create an etcd cluster
```bash

Просмотреть файл

@ -15,6 +15,13 @@ import (
"k8s.io/kubernetes/pkg/labels"
)
type EtcdCluster struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Metadata map[string]string `json:"metadata"`
Spec Spec `json: "spec"`
}
type clusterEventType string
const (

116
controller.go Normal file
Просмотреть файл

@ -0,0 +1,116 @@
package main
import (
"encoding/json"
"errors"
"log"
"net/http"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/wait"
)
type Event struct {
Type string
Object EtcdCluster
}
type etcdClusterController struct {
kclient *unversioned.Client
clusters map[string]*Cluster
}
func (c *etcdClusterController) Run() {
if err := c.createTPR(); err != nil {
panic(err)
}
log.Println("etcd cluster controller starts running...")
eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client)
for {
select {
case event := <-eventCh:
clusterName := event.Object.Metadata["name"]
switch event.Type {
case "ADDED":
c.clusters[clusterName] = newCluster(c.kclient, clusterName, event.Object.Spec)
case "DELETED":
c.clusters[clusterName].Delete()
delete(c.clusters, clusterName)
}
case err := <-errCh:
panic(err)
}
}
}
func (c *etcdClusterController) createTPR() error {
tpr := &extensions.ThirdPartyResource{
ObjectMeta: api.ObjectMeta{
Name: "etcd-cluster.coreos.com",
},
Versions: []extensions.APIVersion{
{Name: "v1"},
},
Description: "Managed etcd clusters",
}
_, err := c.kclient.ThirdPartyResources().Create(tpr)
if err != nil {
if isKubernetesResourceAlreadyExistError(err) {
return nil
}
return err
}
err = wait.Poll(5*time.Second, 100*time.Second,
func() (done bool, err error) {
resp, err := watchETCDCluster(c.kclient.RESTClient.Client)
if err != nil {
return false, err
}
if resp.StatusCode == 200 {
return true, nil
}
if resp.StatusCode == 404 {
return false, nil
}
return false, errors.New("Invalid status code: " + resp.Status)
})
return err
}
func watchETCDCluster(httpClient *http.Client) (*http.Response, error) {
return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true")
}
func monitorEtcdCluster(httpClient *http.Client) (<-chan *Event, <-chan error) {
events := make(chan *Event)
errc := make(chan error, 1)
go func() {
resp, err := watchETCDCluster(httpClient)
if err != nil {
errc <- err
return
}
if resp.StatusCode != 200 {
errc <- errors.New("Invalid status code: " + resp.Status)
return
}
log.Println("start watching...")
for {
decoder := json.NewDecoder(resp.Body)
ev := new(Event)
err = decoder.Decode(ev)
if err != nil {
errc <- err
}
log.Printf("etcd cluster event: %v %#v\n", ev.Type, ev.Object)
events <- ev
}
}()
return events, errc
}

Просмотреть файл

@ -1,8 +0,0 @@
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "Managed etcd clusters"
metadata:
name: "etcd-cluster.coreos.com"
versions:
- name: v1
- name: v2

107
main.go
Просмотреть файл

@ -1,16 +1,9 @@
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"net/http"
"net/url"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned"
)
var (
@ -28,110 +21,10 @@ func init() {
flag.Parse()
}
type EtcdCluster struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Metadata map[string]string `json:"metadata"`
Spec Spec `json: "spec"`
}
type Event struct {
Type string
Object EtcdCluster
}
type etcdClusterController struct {
kclient *unversioned.Client
clusters map[string]*Cluster
}
func (c *etcdClusterController) Run() {
eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client)
for {
select {
case event := <-eventCh:
clusterName := event.Object.Metadata["name"]
switch event.Type {
case "ADDED":
c.clusters[clusterName] = newCluster(c.kclient, clusterName, event.Object.Spec)
case "DELETED":
c.clusters[clusterName].Delete()
delete(c.clusters, clusterName)
}
case err := <-errCh:
panic(err)
}
}
}
func monitorEtcdCluster(httpClient *http.Client) (<-chan *Event, <-chan error) {
events := make(chan *Event)
errc := make(chan error, 1)
go func() {
resp, err := httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true")
if err != nil {
errc <- err
return
}
if resp.StatusCode != 200 {
errc <- errors.New("Invalid status code: " + resp.Status)
return
}
log.Println("start watching...")
for {
decoder := json.NewDecoder(resp.Body)
ev := new(Event)
err = decoder.Decode(ev)
if err != nil {
errc <- err
}
log.Printf("etcd cluster event: %v %#v\n", ev.Type, ev.Object)
events <- ev
}
}()
return events, errc
}
func main() {
c := &etcdClusterController{
kclient: mustCreateClient(masterHost),
clusters: make(map[string]*Cluster),
}
log.Println("etcd cluster controller starts running...")
c.Run()
}
func mustCreateClient(host string) *unversioned.Client {
if len(host) == 0 {
cfg, err := restclient.InClusterConfig()
if err != nil {
panic(err)
}
c, err := unversioned.NewInCluster()
if err != nil {
panic(err)
}
masterHost = cfg.Host
return c
}
hostUrl, err := url.Parse(host)
if err != nil {
panic(fmt.Sprintf("error parsing host url %s : %v", host, err))
}
cfg := &restclient.Config{
Host: host,
QPS: 100,
Burst: 100,
}
if hostUrl.Scheme == "https" {
cfg.TLSClientConfig = tlsConfig
cfg.Insecure = tlsInsecure
}
c, err := unversioned.New(cfg)
if err != nil {
panic(err)
}
return c
}

55
util.go
Просмотреть файл

@ -2,6 +2,9 @@ package main
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"
@ -9,7 +12,9 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
unversionedAPI "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/watch"
@ -34,10 +39,7 @@ func createEtcdPod(kclient *unversioned.Client, initialCluster []string, m *Memb
return err
}
_, err = watch.Until(100*time.Second, w, unversioned.PodRunningAndReady)
if err != nil {
return err
}
return nil
return err
}
// TODO: converge the port logic with member ClientAddr() and PeerAddr()
@ -147,6 +149,51 @@ func makeEtcdPod(m *Member, initialCluster []string, clusterName, state string,
return pod
}
func mustCreateClient(host string) *unversioned.Client {
if len(host) == 0 {
cfg, err := restclient.InClusterConfig()
if err != nil {
panic(err)
}
c, err := unversioned.NewInCluster()
if err != nil {
panic(err)
}
masterHost = cfg.Host
return c
}
hostUrl, err := url.Parse(host)
if err != nil {
panic(fmt.Sprintf("error parsing host url %s : %v", host, err))
}
cfg := &restclient.Config{
Host: host,
QPS: 100,
Burst: 100,
}
if hostUrl.Scheme == "https" {
cfg.TLSClientConfig = tlsConfig
cfg.Insecure = tlsInsecure
}
c, err := unversioned.New(cfg)
if err != nil {
panic(err)
}
return c
}
func isKubernetesResourceAlreadyExistError(err error) bool {
se, ok := err.(*apierrors.StatusError)
if !ok {
return false
}
if se.Status().Code == http.StatusConflict && se.Status().Reason == unversionedAPI.StatusReasonAlreadyExists {
return true
}
return false
}
func waitMemberReady(cli *clientv3.Client) error {
for {
_, err := cli.Get(context.TODO(), "/")