recover clusters if created before
This commit is contained in:
Родитель
f4cc1567ba
Коммит
dce7ce6ad9
|
@ -15,13 +15,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
type EtcdCluster struct {
|
||||
Kind string `json:"kind"`
|
||||
ApiVersion string `json:"apiVersion"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
Spec Spec `json: "spec"`
|
||||
}
|
||||
|
||||
type clusterEventType string
|
||||
|
||||
const (
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
@ -13,6 +14,10 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
tprName = "etcd-cluster.coreos.com"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
Type string
|
||||
Object EtcdCluster
|
||||
|
@ -24,16 +29,25 @@ type etcdClusterController struct {
|
|||
}
|
||||
|
||||
func (c *etcdClusterController) Run() {
|
||||
watchVersion := "0"
|
||||
if err := c.createTPR(); err != nil {
|
||||
panic(err)
|
||||
switch {
|
||||
case isKubernetesResourceAlreadyExistError(err):
|
||||
watchVersion, err = c.recoverAllClusters()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
default:
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
log.Println("etcd cluster controller starts running...")
|
||||
|
||||
eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client)
|
||||
eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client, watchVersion)
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
clusterName := event.Object.Metadata["name"]
|
||||
clusterName := event.Object.ObjectMeta.Name
|
||||
switch event.Type {
|
||||
case "ADDED":
|
||||
c.clusters[clusterName] = newCluster(c.kclient, clusterName, event.Object.Spec)
|
||||
|
@ -47,10 +61,27 @@ func (c *etcdClusterController) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) recoverAllClusters() (string, error) {
|
||||
log.Println("recovering clusters...")
|
||||
resp, err := listETCDCluster(c.kclient.RESTClient.Client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
d := json.NewDecoder(resp.Body)
|
||||
list := &EtcdClusterList{}
|
||||
if err := d.Decode(list); err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, cluster := range list.Items {
|
||||
fmt.Println("cluster:", cluster.Name)
|
||||
}
|
||||
return list.ListMeta.ResourceVersion, nil
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) createTPR() error {
|
||||
tpr := &extensions.ThirdPartyResource{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "etcd-cluster.coreos.com",
|
||||
Name: tprName,
|
||||
},
|
||||
Versions: []extensions.APIVersion{
|
||||
{Name: "v1"},
|
||||
|
@ -59,15 +90,12 @@ func (c *etcdClusterController) createTPR() error {
|
|||
}
|
||||
_, err := c.kclient.ThirdPartyResources().Create(tpr)
|
||||
if err != nil {
|
||||
if isKubernetesResourceAlreadyExistError(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = wait.Poll(5*time.Second, 100*time.Second,
|
||||
func() (done bool, err error) {
|
||||
resp, err := watchETCDCluster(c.kclient.RESTClient.Client)
|
||||
resp, err := watchETCDCluster(c.kclient.RESTClient.Client, "0")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -82,15 +110,11 @@ func (c *etcdClusterController) createTPR() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func watchETCDCluster(httpClient *http.Client) (*http.Response, error) {
|
||||
return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true")
|
||||
}
|
||||
|
||||
func monitorEtcdCluster(httpClient *http.Client) (<-chan *Event, <-chan error) {
|
||||
func monitorEtcdCluster(httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
|
||||
events := make(chan *Event)
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
resp, err := watchETCDCluster(httpClient)
|
||||
resp, err := watchETCDCluster(httpClient, watchVersion)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Controller recovery
|
||||
|
||||
- Create TPR
|
||||
- Create TPR
|
||||
- If the creation succeed, then the controller is a new one and does not require recovery. END.
|
||||
- Find all existing clusters
|
||||
- loop over the third part resource items to get all created clusters
|
||||
|
|
20
spec.go
20
spec.go
|
@ -1,5 +1,25 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
)
|
||||
|
||||
type EtcdClusterList struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
// Standard list metadata
|
||||
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
|
||||
unversioned.ListMeta `json:"metadata,omitempty"`
|
||||
// Items is a list of third party objects
|
||||
Items []EtcdCluster `json:"items"`
|
||||
}
|
||||
|
||||
type EtcdCluster struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
api.ObjectMeta `json:"metadata,omitempty"`
|
||||
Spec Spec `json: "spec"`
|
||||
}
|
||||
|
||||
type Spec struct {
|
||||
// Size is the expected size of the etcd cluster.
|
||||
// The controller will eventually make the size of the running
|
||||
|
|
8
util.go
8
util.go
|
@ -203,3 +203,11 @@ func waitMemberReady(cli *clientv3.Client) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func listETCDCluster(httpClient *http.Client) (*http.Response, error) {
|
||||
return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters")
|
||||
}
|
||||
|
||||
func watchETCDCluster(httpClient *http.Client, resourceVersion string) (*http.Response, error) {
|
||||
return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true&resourceVersion=" + resourceVersion)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче