Merge pull request #183 from hongchaodeng/r2
refactor controller config
This commit is contained in:
Коммит
12cef04b3f
|
@ -27,6 +27,6 @@ func init() {
|
|||
}
|
||||
|
||||
func main() {
|
||||
c := controller.New(cfg)
|
||||
c := controller.New(&cfg)
|
||||
c.Run()
|
||||
}
|
||||
|
|
|
@ -34,11 +34,9 @@ type Event struct {
|
|||
}
|
||||
|
||||
type Controller struct {
|
||||
masterHost string
|
||||
namespace string
|
||||
*Config
|
||||
kclient *unversioned.Client
|
||||
clusters map[string]*cluster.Cluster
|
||||
pvProvisioner string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
|
@ -59,21 +57,18 @@ func (c *Config) validate() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func New(cfg Config) *Controller {
|
||||
func New(cfg *Config) *Controller {
|
||||
if err := cfg.validate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
kclient := k8sutil.MustCreateClient(cfg.MasterHost, cfg.TLSInsecure, &cfg.TLSConfig)
|
||||
host := cfg.MasterHost
|
||||
if len(host) == 0 {
|
||||
host = k8sutil.MustGetInClusterMasterHost()
|
||||
if len(cfg.MasterHost) == 0 {
|
||||
cfg.MasterHost = k8sutil.MustGetInClusterMasterHost()
|
||||
}
|
||||
return &Controller{
|
||||
masterHost: host,
|
||||
Config: cfg,
|
||||
kclient: kclient,
|
||||
clusters: make(map[string]*cluster.Cluster),
|
||||
namespace: cfg.Namespace,
|
||||
pvProvisioner: cfg.PVProvisioner,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,7 +79,7 @@ func (c *Controller) Run() {
|
|||
}
|
||||
log.Println("etcd cluster controller starts running...")
|
||||
|
||||
eventCh, errCh := monitorEtcdCluster(c.masterHost, c.namespace, c.kclient.RESTClient.Client, watchVersion)
|
||||
eventCh, errCh := monitorEtcdCluster(c.MasterHost, c.Namespace, c.kclient.RESTClient.Client, watchVersion)
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
|
@ -92,12 +87,12 @@ func (c *Controller) Run() {
|
|||
switch event.Type {
|
||||
case "ADDED":
|
||||
clusterSpec := &event.Object.Spec
|
||||
nc := cluster.New(c.kclient, clusterName, c.namespace, clusterSpec)
|
||||
nc := cluster.New(c.kclient, clusterName, c.Namespace, clusterSpec)
|
||||
c.clusters[clusterName] = nc
|
||||
|
||||
backup := clusterSpec.Backup
|
||||
if backup != nil && backup.MaxSnapshot != 0 {
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, clusterName, c.namespace, *backup)
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, clusterName, c.Namespace, *backup)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -116,7 +111,7 @@ func (c *Controller) Run() {
|
|||
|
||||
func (c *Controller) findAllClusters() (string, error) {
|
||||
log.Println("finding existing clusters...")
|
||||
resp, err := k8sutil.ListETCDCluster(c.masterHost, c.namespace, c.kclient.RESTClient.Client)
|
||||
resp, err := k8sutil.ListETCDCluster(c.MasterHost, c.Namespace, c.kclient.RESTClient.Client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -126,12 +121,12 @@ func (c *Controller) findAllClusters() (string, error) {
|
|||
return "", err
|
||||
}
|
||||
for _, item := range list.Items {
|
||||
nc := cluster.Restore(c.kclient, item.Name, c.namespace, &item.Spec)
|
||||
nc := cluster.Restore(c.kclient, item.Name, c.Namespace, &item.Spec)
|
||||
c.clusters[item.Name] = nc
|
||||
|
||||
backup := item.Spec.Backup
|
||||
if backup != nil && backup.MaxSnapshot != 0 {
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, item.Name, c.namespace, *backup)
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, item.Name, c.Namespace, *backup)
|
||||
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -157,7 +152,7 @@ func (c *Controller) initResource() (string, error) {
|
|||
return "", err
|
||||
}
|
||||
}
|
||||
err = k8sutil.CreateStorageClass(c.kclient, c.pvProvisioner)
|
||||
err = k8sutil.CreateStorageClass(c.kclient, c.PVProvisioner)
|
||||
if err != nil {
|
||||
log.Errorf("fail to create storage class: %v", err)
|
||||
return "", err
|
||||
|
@ -180,7 +175,7 @@ func (c *Controller) createTPR() error {
|
|||
return err
|
||||
}
|
||||
|
||||
return k8sutil.WaitEtcdTPRReady(c.kclient.Client, 3*time.Second, 90*time.Second, c.masterHost, c.namespace)
|
||||
return k8sutil.WaitEtcdTPRReady(c.kclient.Client, 3*time.Second, 90*time.Second, c.MasterHost, c.Namespace)
|
||||
}
|
||||
|
||||
func monitorEtcdCluster(host, ns string, httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче