make namespace configurable
This commit is contained in:
Родитель
2a076f5948
Коммит
d3f9a815f1
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/kube-etcd-controller/pkg/backup"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/k8sutil"
|
||||
|
@ -11,6 +12,7 @@ var (
|
|||
masterHost string
|
||||
clusterName string
|
||||
listenAddr string
|
||||
namespace string
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -19,6 +21,11 @@ func init() {
|
|||
flag.StringVar(&listenAddr, "listen", "0.0.0.0:19999", "")
|
||||
// TODO: parse policy
|
||||
flag.Parse()
|
||||
|
||||
namespace = os.Getenv("MY_POD_NAMESPACE")
|
||||
if len(namespace) == 0 {
|
||||
namespace = "default"
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -26,5 +33,5 @@ func main() {
|
|||
panic("clusterName not set")
|
||||
}
|
||||
kclient := k8sutil.MustCreateClient(masterHost, false, nil)
|
||||
backup.New(kclient, clusterName, backup.Policy{}, listenAddr).Run()
|
||||
backup.New(kclient, clusterName, namespace, backup.Policy{}, listenAddr).Run()
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/kube-etcd-controller/pkg/controller"
|
||||
)
|
||||
|
@ -17,6 +18,11 @@ func init() {
|
|||
flag.StringVar(&cfg.TLSConfig.CAFile, "ca-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to TLS CA file.")
|
||||
flag.BoolVar(&cfg.TLSInsecure, "tls-insecure", false, "- NOT RECOMMENDED FOR PRODUCTION - Don't verify API server's CA certificate.")
|
||||
flag.Parse()
|
||||
|
||||
cfg.Namespace = os.Getenv("MY_POD_NAMESPACE")
|
||||
if len(cfg.Namespace) == 0 {
|
||||
cfg.Namespace = "default"
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
|
|
@ -24,6 +24,7 @@ type Backup struct {
|
|||
kclient *unversioned.Client
|
||||
|
||||
clusterName string
|
||||
namespace string
|
||||
policy Policy
|
||||
listenAddr string
|
||||
backupDir string
|
||||
|
@ -31,10 +32,11 @@ type Backup struct {
|
|||
backupNow chan chan error
|
||||
}
|
||||
|
||||
func New(kclient *unversioned.Client, clusterName string, policy Policy, listenAddr string) *Backup {
|
||||
func New(kclient *unversioned.Client, clusterName, ns string, policy Policy, listenAddr string) *Backup {
|
||||
return &Backup{
|
||||
kclient: kclient,
|
||||
clusterName: clusterName,
|
||||
namespace: ns,
|
||||
policy: policy,
|
||||
listenAddr: listenAddr,
|
||||
backupDir: "/home/backup/",
|
||||
|
@ -77,7 +79,7 @@ func (b *Backup) Run() {
|
|||
}
|
||||
|
||||
func (b *Backup) saveSnap(lastSnapRev int64) (int64, error) {
|
||||
pods, err := b.kclient.Pods("default").List(api.ListOptions{
|
||||
pods, err := b.kclient.Pods(b.namespace).List(api.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
"app": "etcd",
|
||||
"etcd_cluster": b.clusterName,
|
||||
|
|
|
@ -35,7 +35,8 @@ type Cluster struct {
|
|||
|
||||
spec *Spec
|
||||
|
||||
name string
|
||||
name string
|
||||
namespace string
|
||||
|
||||
idCounter int
|
||||
eventCh chan *clusterEvent
|
||||
|
@ -49,21 +50,22 @@ type Cluster struct {
|
|||
backupDir string
|
||||
}
|
||||
|
||||
func New(c *unversioned.Client, name string, spec *Spec) *Cluster {
|
||||
return new(c, name, spec, true)
|
||||
func New(c *unversioned.Client, name, ns string, spec *Spec) *Cluster {
|
||||
return new(c, name, ns, spec, true)
|
||||
}
|
||||
|
||||
func Restore(c *unversioned.Client, name string, spec *Spec) *Cluster {
|
||||
return new(c, name, spec, false)
|
||||
func Restore(c *unversioned.Client, name, ns string, spec *Spec) *Cluster {
|
||||
return new(c, name, ns, spec, false)
|
||||
}
|
||||
|
||||
func new(kclient *unversioned.Client, name string, spec *Spec, isNewCluster bool) *Cluster {
|
||||
func new(kclient *unversioned.Client, name, ns string, spec *Spec, isNewCluster bool) *Cluster {
|
||||
c := &Cluster{
|
||||
kclient: kclient,
|
||||
name: name,
|
||||
eventCh: make(chan *clusterEvent, 100),
|
||||
stopCh: make(chan struct{}),
|
||||
spec: spec,
|
||||
kclient: kclient,
|
||||
name: name,
|
||||
namespace: ns,
|
||||
eventCh: make(chan *clusterEvent, 100),
|
||||
stopCh: make(chan struct{}),
|
||||
spec: spec,
|
||||
}
|
||||
if isNewCluster {
|
||||
if err := c.newSeedMember(); err != nil {
|
||||
|
@ -188,7 +190,7 @@ func (c *Cluster) delete() {
|
|||
}),
|
||||
}
|
||||
|
||||
pods, err := c.kclient.Pods("default").List(option)
|
||||
pods, err := c.kclient.Pods(c.namespace).List(option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -200,7 +202,7 @@ func (c *Cluster) delete() {
|
|||
}
|
||||
|
||||
func (c *Cluster) createPodAndService(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery bool) error {
|
||||
if err := k8sutil.CreateEtcdService(c.kclient, m.Name, c.name); err != nil {
|
||||
if err := k8sutil.CreateEtcdService(c.kclient, m.Name, c.name, c.namespace); err != nil {
|
||||
return err
|
||||
}
|
||||
token := ""
|
||||
|
@ -211,17 +213,17 @@ func (c *Cluster) createPodAndService(members etcdutil.MemberSet, m *etcdutil.Me
|
|||
if needRecovery {
|
||||
k8sutil.AddRecoveryToPod(pod, c.name, m.Name, token)
|
||||
}
|
||||
return k8sutil.CreateAndWaitPod(c.kclient, pod, m)
|
||||
return k8sutil.CreateAndWaitPod(c.kclient, pod, m, c.namespace)
|
||||
}
|
||||
|
||||
func (c *Cluster) removePodAndService(name string) error {
|
||||
err := c.kclient.Pods("default").Delete(name, nil)
|
||||
err := c.kclient.Pods(c.namespace).Delete(name, nil)
|
||||
if err != nil {
|
||||
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = c.kclient.Services("default").Delete(name)
|
||||
err = c.kclient.Services(c.namespace).Delete(name)
|
||||
if err != nil {
|
||||
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
|
||||
return err
|
||||
|
@ -238,7 +240,7 @@ func (c *Cluster) getRunning() (etcdutil.MemberSet, error) {
|
|||
}),
|
||||
}
|
||||
|
||||
podList, err := c.kclient.Pods("default").List(opts)
|
||||
podList, err := c.kclient.Pods(c.namespace).List(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list running pods: %v", err)
|
||||
}
|
||||
|
|
|
@ -28,11 +28,13 @@ type Event struct {
|
|||
|
||||
type Controller struct {
|
||||
masterHost string
|
||||
namespace string
|
||||
kclient *unversioned.Client
|
||||
clusters map[string]*cluster.Cluster
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Namespace string
|
||||
MasterHost string
|
||||
TLSInsecure bool
|
||||
TLSConfig restclient.TLSClientConfig
|
||||
|
@ -48,6 +50,7 @@ func New(cfg Config) *Controller {
|
|||
masterHost: host,
|
||||
kclient: kclient,
|
||||
clusters: make(map[string]*cluster.Cluster),
|
||||
namespace: cfg.Namespace,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,19 +69,19 @@ func (c *Controller) Run() {
|
|||
}
|
||||
log.Println("etcd cluster controller starts running...")
|
||||
|
||||
eventCh, errCh := monitorEtcdCluster(c.masterHost, c.kclient.RESTClient.Client, watchVersion)
|
||||
eventCh, errCh := monitorEtcdCluster(c.masterHost, c.namespace, c.kclient.RESTClient.Client, watchVersion)
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
clusterName := event.Object.ObjectMeta.Name
|
||||
switch event.Type {
|
||||
case "ADDED":
|
||||
nc := cluster.New(c.kclient, clusterName, &event.Object.Spec)
|
||||
nc := cluster.New(c.kclient, clusterName, c.namespace, &event.Object.Spec)
|
||||
c.clusters[clusterName] = nc
|
||||
|
||||
backup := event.Object.Spec.Backup
|
||||
if backup != nil && backup.MaxSnapshot != 0 {
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, clusterName, *backup)
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, clusterName, c.namespace, *backup)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -97,7 +100,7 @@ func (c *Controller) Run() {
|
|||
|
||||
func (c *Controller) findAllClusters() (string, error) {
|
||||
log.Println("finding existing clusters...")
|
||||
resp, err := k8sutil.ListETCDCluster(c.masterHost, c.kclient.RESTClient.Client)
|
||||
resp, err := k8sutil.ListETCDCluster(c.masterHost, c.namespace, c.kclient.RESTClient.Client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -107,12 +110,12 @@ func (c *Controller) findAllClusters() (string, error) {
|
|||
return "", err
|
||||
}
|
||||
for _, item := range list.Items {
|
||||
nc := cluster.Restore(c.kclient, item.Name, &item.Spec)
|
||||
nc := cluster.Restore(c.kclient, item.Name, c.namespace, &item.Spec)
|
||||
c.clusters[item.Name] = nc
|
||||
|
||||
backup := item.Spec.Backup
|
||||
if backup != nil && backup.MaxSnapshot != 0 {
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, item.Name, *backup)
|
||||
err := k8sutil.CreateBackupReplicaSetAndService(c.kclient, item.Name, c.namespace, *backup)
|
||||
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -138,7 +141,7 @@ func (c *Controller) createTPR() error {
|
|||
|
||||
err = wait.Poll(3*time.Second, 100*time.Second,
|
||||
func() (done bool, err error) {
|
||||
resp, err := k8sutil.WatchETCDCluster(c.masterHost, c.kclient.RESTClient.Client, "0")
|
||||
resp, err := k8sutil.WatchETCDCluster(c.masterHost, c.namespace, c.kclient.RESTClient.Client, "0")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -153,12 +156,12 @@ func (c *Controller) createTPR() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func monitorEtcdCluster(host string, httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
|
||||
func monitorEtcdCluster(host, ns string, httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
|
||||
events := make(chan *Event)
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
resp, err := k8sutil.WatchETCDCluster(host, httpClient, watchVersion)
|
||||
resp, err := k8sutil.WatchETCDCluster(host, ns, httpClient, watchVersion)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
|
|
|
@ -65,13 +65,13 @@ func makeRestoreInitContainerSpec(backupAddr, name, token string) string {
|
|||
return string(b)
|
||||
}
|
||||
|
||||
func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName string, policy backup.Policy) error {
|
||||
func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName, ns string, policy backup.Policy) error {
|
||||
labels := map[string]string{
|
||||
"app": "etcd_backup_tool",
|
||||
"etcd_cluster": clusterName,
|
||||
}
|
||||
name := makeBackupName(clusterName)
|
||||
_, err := kclient.ReplicaSets("default").Create(&extensions.ReplicaSet{
|
||||
_, err := kclient.ReplicaSets(ns).Create(&extensions.ReplicaSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
|
@ -119,7 +119,7 @@ func CreateBackupReplicaSetAndService(kclient *unversioned.Client, clusterName s
|
|||
Selector: labels,
|
||||
},
|
||||
}
|
||||
if _, err := kclient.Services("default").Create(svc); err != nil {
|
||||
if _, err := kclient.Services(ns).Create(svc); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -133,20 +133,20 @@ func makeBackupName(clusterName string) string {
|
|||
return fmt.Sprintf("%s-backup-tool", clusterName)
|
||||
}
|
||||
|
||||
func CreateEtcdService(kclient *unversioned.Client, etcdName, clusterName string) error {
|
||||
func CreateEtcdService(kclient *unversioned.Client, etcdName, clusterName, ns string) error {
|
||||
svc := makeEtcdService(etcdName, clusterName)
|
||||
if _, err := kclient.Services("default").Create(svc); err != nil {
|
||||
if _, err := kclient.Services(ns).Create(svc); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: use a struct to replace the huge arg list.
|
||||
func CreateAndWaitPod(kclient *unversioned.Client, pod *api.Pod, m *etcdutil.Member) error {
|
||||
if _, err := kclient.Pods("default").Create(pod); err != nil {
|
||||
func CreateAndWaitPod(kclient *unversioned.Client, pod *api.Pod, m *etcdutil.Member, ns string) error {
|
||||
if _, err := kclient.Pods(ns).Create(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
w, err := kclient.Pods("default").Watch(api.SingleObject(api.ObjectMeta{Name: m.Name}))
|
||||
w, err := kclient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: m.Name}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -341,10 +341,12 @@ func IsKubernetesResourceNotFoundError(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func ListETCDCluster(host string, httpClient *http.Client) (*http.Response, error) {
|
||||
return httpClient.Get(host + "/apis/coreos.com/v1/namespaces/default/etcdclusters")
|
||||
func ListETCDCluster(host, ns string, httpClient *http.Client) (*http.Response, error) {
|
||||
return httpClient.Get(fmt.Sprintf("%s/apis/coreos.com/v1/namespaces/%s/etcdclusters",
|
||||
host, ns))
|
||||
}
|
||||
|
||||
func WatchETCDCluster(host string, httpClient *http.Client, resourceVersion string) (*http.Response, error) {
|
||||
return httpClient.Get(host + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true&resourceVersion=" + resourceVersion)
|
||||
func WatchETCDCluster(host, ns string, httpClient *http.Client, resourceVersion string) (*http.Response, error) {
|
||||
return httpClient.Get(fmt.Sprintf("%s/apis/coreos.com/v1/namespaces/%s/etcdclusters?watch=true&resourceVersion=%s",
|
||||
host, ns, resourceVersion))
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче