зеркало из https://github.com/Azure/etcd-operator.git
reorg directory
This commit is contained in:
Родитель
2a9fa47940
Коммит
7a50165a7c
|
@ -6,6 +6,7 @@
|
|||
# Folders
|
||||
_obj
|
||||
_test
|
||||
_output
|
||||
|
||||
# Architecture specific extensions/prefixes
|
||||
*.[568vq]
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
FROM tutum/curl:latest
|
||||
ADD ./kube-etcd-controller /usr/local/bin/
|
||||
ADD ./_output/bin/kube-etcd-controller /usr/local/bin/
|
||||
ENTRYPOINT ["kube-etcd-controller"]
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"github.com/coreos/kube-etcd-controller/pkg/controller"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg controller.Config
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&cfg.MasterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
|
||||
flag.StringVar(&cfg.TLSConfig.CertFile, "cert-file", "", " - NOT RECOMMENDED FOR PRODUCTION - Path to public TLS certificate file.")
|
||||
flag.StringVar(&cfg.TLSConfig.KeyFile, "key-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to private TLS certificate file.")
|
||||
flag.StringVar(&cfg.TLSConfig.CAFile, "ca-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to TLS CA file.")
|
||||
flag.BoolVar(&cfg.TLSInsecure, "tls-insecure", false, "- NOT RECOMMENDED FOR PRODUCTION - Don't verify API server's CA certificate.")
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
func main() {
|
||||
c := controller.New(cfg)
|
||||
c.Run()
|
||||
}
|
|
@ -4,16 +4,18 @@ set -o errexit
|
|||
set -o nounset
|
||||
set -o pipefail
|
||||
|
||||
if ! which go > /dev/null; then
|
||||
echo "go needs to be installed"
|
||||
exit 1
|
||||
fi
|
||||
if ! which go > /dev/null; then
|
||||
echo "go needs to be installed"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! which docker > /dev/null; then
|
||||
echo "docker needs to be installed"
|
||||
exit 1
|
||||
fi
|
||||
if ! which docker > /dev/null; then
|
||||
echo "docker needs to be installed"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
go build .
|
||||
mkdir -p _output/bin || true
|
||||
|
||||
go build -o _output/bin/kube-etcd-controller cmd/controller/main.go
|
||||
docker build --tag gcr.io/coreos-k8s-scale-testing/kubeetcdctrl:latest .
|
||||
gcloud docker push gcr.io/coreos-k8s-scale-testing/kubeetcdctrl:latest
|
||||
|
|
30
main.go
30
main.go
|
@ -1,30 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
)
|
||||
|
||||
var (
|
||||
masterHost string
|
||||
tlsInsecure bool
|
||||
tlsConfig restclient.TLSClientConfig
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&masterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
|
||||
flag.StringVar(&tlsConfig.CertFile, "cert-file", "", " - NOT RECOMMENDED FOR PRODUCTION - Path to public TLS certificate file.")
|
||||
flag.StringVar(&tlsConfig.KeyFile, "key-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to private TLS certificate file.")
|
||||
flag.StringVar(&tlsConfig.CAFile, "ca-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to TLS CA file.")
|
||||
flag.BoolVar(&tlsInsecure, "tls-insecure", false, "- NOT RECOMMENDED FOR PRODUCTION - Don't verify API server's CA certificate.")
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
func main() {
|
||||
c := &etcdClusterController{
|
||||
kclient: mustCreateClient(masterHost),
|
||||
clusters: make(map[string]*Cluster),
|
||||
}
|
||||
c.Run()
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package main
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -11,8 +11,10 @@ import (
|
|||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/k8sutil"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
k8sapi "k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
@ -30,7 +32,7 @@ type clusterEvent struct {
|
|||
typ clusterEventType
|
||||
spec Spec
|
||||
// currently running pods in kubernetes
|
||||
running MemberSet
|
||||
running etcdutil.MemberSet
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
|
@ -47,12 +49,12 @@ type Cluster struct {
|
|||
// members repsersents the members in the etcd cluster.
|
||||
// the name of the member is the the name of the pod the member
|
||||
// process runs in.
|
||||
members MemberSet
|
||||
members etcdutil.MemberSet
|
||||
|
||||
backupDir string
|
||||
}
|
||||
|
||||
func newCluster(kclient *unversioned.Client, name string, spec *Spec) *Cluster {
|
||||
func New(kclient *unversioned.Client, name string, spec *Spec) *Cluster {
|
||||
c := &Cluster{
|
||||
kclient: kclient,
|
||||
name: name,
|
||||
|
@ -65,7 +67,7 @@ func newCluster(kclient *unversioned.Client, name string, spec *Spec) *Cluster {
|
|||
return c
|
||||
}
|
||||
|
||||
func (c *Cluster) init(spec *Spec) {
|
||||
func (c *Cluster) Init(spec *Spec) {
|
||||
c.send(&clusterEvent{
|
||||
typ: eventNewCluster,
|
||||
spec: *spec,
|
||||
|
@ -110,11 +112,11 @@ func (c *Cluster) run() {
|
|||
}
|
||||
|
||||
func (c *Cluster) create(spec *Spec) {
|
||||
members := MemberSet{}
|
||||
members := etcdutil.MemberSet{}
|
||||
c.spec = spec
|
||||
// we want to make use of member's utility methods.
|
||||
etcdName := fmt.Sprintf("%s-%04d", c.name, 0)
|
||||
members.Add(&Member{Name: etcdName})
|
||||
members.Add(&etcdutil.Member{Name: etcdName})
|
||||
if err := c.createPodAndService(members, members[etcdName], "new"); err != nil {
|
||||
panic(fmt.Sprintf("(TODO: we need to clean up already created ones.)\nError: %v", err))
|
||||
}
|
||||
|
@ -122,7 +124,7 @@ func (c *Cluster) create(spec *Spec) {
|
|||
fmt.Println("created cluster:", members)
|
||||
}
|
||||
|
||||
func (c *Cluster) update(spec *Spec) {
|
||||
func (c *Cluster) Update(spec *Spec) {
|
||||
// Only handles size change now. TODO: handle other updates.
|
||||
if spec.Size == c.spec.Size {
|
||||
return
|
||||
|
@ -138,14 +140,14 @@ func (c *Cluster) updateMembers(etcdcli *clientv3.Client) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
c.members = MemberSet{}
|
||||
c.members = etcdutil.MemberSet{}
|
||||
for _, m := range resp.Members {
|
||||
id := findID(m.Name)
|
||||
if id+1 > c.idCounter {
|
||||
c.idCounter = id + 1
|
||||
}
|
||||
|
||||
c.members[m.Name] = &Member{
|
||||
c.members[m.Name] = &etcdutil.Member{
|
||||
Name: m.Name,
|
||||
ID: m.ID,
|
||||
}
|
||||
|
@ -162,7 +164,7 @@ func findID(name string) int {
|
|||
}
|
||||
|
||||
func (c *Cluster) delete() {
|
||||
option := api.ListOptions{
|
||||
option := k8sapi.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
"etcd_cluster": c.name,
|
||||
}),
|
||||
|
@ -180,23 +182,23 @@ func (c *Cluster) delete() {
|
|||
}
|
||||
|
||||
// todo: use a struct to replace the huge arg list.
|
||||
func (c *Cluster) createPodAndService(members MemberSet, m *Member, state string) error {
|
||||
if err := createEtcdService(c.kclient, m.Name, c.name); err != nil {
|
||||
func (c *Cluster) createPodAndService(members etcdutil.MemberSet, m *etcdutil.Member, state string) error {
|
||||
if err := k8sutil.CreateEtcdService(c.kclient, m.Name, c.name); err != nil {
|
||||
return err
|
||||
}
|
||||
return createEtcdPod(c.kclient, members.PeerURLPairs(), m, c.name, state, c.spec.AntiAffinity)
|
||||
return k8sutil.CreateEtcdPod(c.kclient, members.PeerURLPairs(), m, c.name, state, c.spec.AntiAffinity)
|
||||
}
|
||||
|
||||
func (c *Cluster) removePodAndService(name string) error {
|
||||
err := c.kclient.Pods("default").Delete(name, nil)
|
||||
if err != nil {
|
||||
if !isKubernetesResourceNotFoundError(err) {
|
||||
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = c.kclient.Services("default").Delete(name)
|
||||
if err != nil {
|
||||
if !isKubernetesResourceNotFoundError(err) {
|
||||
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -247,7 +249,7 @@ func (c *Cluster) backup() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) monitorPods() {
|
||||
opts := api.ListOptions{
|
||||
opts := k8sapi.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
"etcd_cluster": c.name,
|
||||
}),
|
||||
|
@ -264,9 +266,9 @@ func (c *Cluster) monitorPods() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
running := MemberSet{}
|
||||
running := etcdutil.MemberSet{}
|
||||
for i := range podList.Items {
|
||||
running.Add(&Member{Name: podList.Items[i].Name})
|
||||
running.Add(&etcdutil.Member{Name: podList.Items[i].Name})
|
||||
}
|
||||
|
||||
c.send(&clusterEvent{
|
|
@ -1,4 +1,4 @@
|
|||
package main
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -22,7 +23,7 @@ import (
|
|||
// 3. If L = members, the current state matches the membership state. END.
|
||||
// 4. If len(L) < len(members)/2 + 1, quorum lost. Go to recovery process (TODO).
|
||||
// 5. Add one missing member. END.
|
||||
func (c *Cluster) reconcile(running MemberSet) error {
|
||||
func (c *Cluster) reconcile(running etcdutil.MemberSet) error {
|
||||
log.Println("Reconciling:")
|
||||
defer func() {
|
||||
log.Println("Finish Reconciling")
|
||||
|
@ -37,7 +38,7 @@ func (c *Cluster) reconcile(running MemberSet) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := waitMemberReady(etcdcli); err != nil {
|
||||
if err := etcdutil.WaitMemberReady(etcdcli); err != nil {
|
||||
return err
|
||||
}
|
||||
c.updateMembers(etcdcli)
|
||||
|
@ -97,7 +98,7 @@ func (c *Cluster) addOneMember() error {
|
|||
return err
|
||||
}
|
||||
newMemberName := fmt.Sprintf("%s-%04d", c.name, c.idCounter)
|
||||
newMember := &Member{Name: newMemberName}
|
||||
newMember := &etcdutil.Member{Name: newMemberName}
|
||||
resp, err := etcdcli.MemberAdd(context.TODO(), []string{newMember.PeerAddr()})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -117,7 +118,7 @@ func (c *Cluster) removeOneMember() error {
|
|||
return c.removeMember(c.members.PickOne())
|
||||
}
|
||||
|
||||
func (c *Cluster) removeMember(toRemove *Member) error {
|
||||
func (c *Cluster) removeMember(toRemove *etcdutil.Member) error {
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: c.members.ClientURLs(),
|
||||
DialTimeout: 5 * time.Second,
|
|
@ -1,19 +1,10 @@
|
|||
package main
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
)
|
||||
|
||||
type EtcdClusterList struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
// Standard list metadata
|
||||
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
|
||||
unversioned.ListMeta `json:"metadata,omitempty"`
|
||||
// Items is a list of third party objects
|
||||
Items []EtcdCluster `json:"items"`
|
||||
}
|
||||
|
||||
type EtcdCluster struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
api.ObjectMeta `json:"metadata,omitempty"`
|
|
@ -1,4 +1,4 @@
|
|||
package main
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@ -8,8 +8,11 @@ import (
|
|||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/cluster"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/k8sutil"
|
||||
k8sapi "k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
@ -20,19 +23,50 @@ const (
|
|||
|
||||
type Event struct {
|
||||
Type string
|
||||
Object EtcdCluster
|
||||
Object cluster.EtcdCluster
|
||||
}
|
||||
|
||||
type etcdClusterController struct {
|
||||
kclient *unversioned.Client
|
||||
clusters map[string]*Cluster
|
||||
type Controller struct {
|
||||
masterHost string
|
||||
kclient *unversioned.Client
|
||||
clusters map[string]*cluster.Cluster
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) Run() {
|
||||
type Config struct {
|
||||
MasterHost string
|
||||
TLSInsecure bool
|
||||
TLSConfig restclient.TLSClientConfig
|
||||
}
|
||||
|
||||
func New(cfg Config) *Controller {
|
||||
host, c := getClient(cfg)
|
||||
return &Controller{
|
||||
masterHost: host,
|
||||
kclient: c,
|
||||
clusters: make(map[string]*cluster.Cluster),
|
||||
}
|
||||
}
|
||||
|
||||
func getClient(cfg Config) (string, *unversioned.Client) {
|
||||
if len(cfg.MasterHost) == 0 {
|
||||
inCfg, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
client, err := unversioned.NewInCluster()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return inCfg.Host, client
|
||||
}
|
||||
return cfg.MasterHost, k8sutil.MustCreateClient(cfg.MasterHost, cfg.TLSInsecure, cfg.TLSConfig)
|
||||
}
|
||||
|
||||
func (c *Controller) Run() {
|
||||
watchVersion := "0"
|
||||
if err := c.createTPR(); err != nil {
|
||||
switch {
|
||||
case isKubernetesResourceAlreadyExistError(err):
|
||||
case k8sutil.IsKubernetesResourceAlreadyExistError(err):
|
||||
watchVersion, err = c.findAllClusters()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -43,18 +77,19 @@ func (c *etcdClusterController) Run() {
|
|||
}
|
||||
log.Println("etcd cluster controller starts running...")
|
||||
|
||||
eventCh, errCh := monitorEtcdCluster(c.kclient.RESTClient.Client, watchVersion)
|
||||
eventCh, errCh := monitorEtcdCluster(c.masterHost, c.kclient.RESTClient.Client, watchVersion)
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
clusterName := event.Object.ObjectMeta.Name
|
||||
switch event.Type {
|
||||
case "ADDED":
|
||||
nc := newCluster(c.kclient, clusterName, &event.Object.Spec)
|
||||
nc.init(&event.Object.Spec)
|
||||
nc := cluster.New(c.kclient, clusterName, &event.Object.Spec)
|
||||
// TODO: combine init into New. Different fresh new and recovered new.
|
||||
nc.Init(&event.Object.Spec)
|
||||
c.clusters[clusterName] = nc
|
||||
case "MODIFIED":
|
||||
c.clusters[clusterName].update(&event.Object.Spec)
|
||||
c.clusters[clusterName].Update(&event.Object.Spec)
|
||||
case "DELETED":
|
||||
c.clusters[clusterName].Delete()
|
||||
delete(c.clusters, clusterName)
|
||||
|
@ -65,9 +100,9 @@ func (c *etcdClusterController) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) findAllClusters() (string, error) {
|
||||
func (c *Controller) findAllClusters() (string, error) {
|
||||
log.Println("finding existing clusters...")
|
||||
resp, err := listETCDCluster(c.kclient.RESTClient.Client)
|
||||
resp, err := k8sutil.ListETCDCluster(c.masterHost, c.kclient.RESTClient.Client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -77,15 +112,15 @@ func (c *etcdClusterController) findAllClusters() (string, error) {
|
|||
return "", err
|
||||
}
|
||||
for _, item := range list.Items {
|
||||
nc := newCluster(c.kclient, item.Name, &item.Spec)
|
||||
nc := cluster.New(c.kclient, item.Name, &item.Spec)
|
||||
c.clusters[item.Name] = nc
|
||||
}
|
||||
return list.ListMeta.ResourceVersion, nil
|
||||
}
|
||||
|
||||
func (c *etcdClusterController) createTPR() error {
|
||||
func (c *Controller) createTPR() error {
|
||||
tpr := &extensions.ThirdPartyResource{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
ObjectMeta: k8sapi.ObjectMeta{
|
||||
Name: tprName,
|
||||
},
|
||||
Versions: []extensions.APIVersion{
|
||||
|
@ -98,9 +133,9 @@ func (c *etcdClusterController) createTPR() error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = wait.Poll(5*time.Second, 100*time.Second,
|
||||
err = wait.Poll(3*time.Second, 100*time.Second,
|
||||
func() (done bool, err error) {
|
||||
resp, err := watchETCDCluster(c.kclient.RESTClient.Client, "0")
|
||||
resp, err := k8sutil.WatchETCDCluster(c.masterHost, c.kclient.RESTClient.Client, "0")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -115,12 +150,12 @@ func (c *etcdClusterController) createTPR() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func monitorEtcdCluster(httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
|
||||
func monitorEtcdCluster(host string, httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
|
||||
events := make(chan *Event)
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
resp, err := watchETCDCluster(httpClient, watchVersion)
|
||||
resp, err := k8sutil.WatchETCDCluster(host, httpClient, watchVersion)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
|
@ -0,0 +1,15 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"github.com/coreos/kube-etcd-controller/pkg/cluster"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
)
|
||||
|
||||
type EtcdClusterList struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
// Standard list metadata
|
||||
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
|
||||
unversioned.ListMeta `json:"metadata,omitempty"`
|
||||
// Items is a list of third party objects
|
||||
Items []cluster.EtcdCluster `json:"items"`
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package etcdutil
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func WaitMemberReady(cli *clientv3.Client) error {
|
||||
for {
|
||||
_, err := cli.Get(context.TODO(), "/")
|
||||
if err == rpctypes.ErrNotCapable {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package main
|
||||
package etcdutil
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -1,4 +1,4 @@
|
|||
package main
|
||||
package k8sutil
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@ -8,10 +8,9 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/kube-etcd-controller/pkg/util/etcdutil"
|
||||
"github.com/pborman/uuid"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||
unversionedAPI "k8s.io/kubernetes/pkg/api/unversioned"
|
||||
|
@ -21,7 +20,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func createEtcdService(kclient *unversioned.Client, etcdName, clusterName string) error {
|
||||
func CreateEtcdService(kclient *unversioned.Client, etcdName, clusterName string) error {
|
||||
svc := makeEtcdService(etcdName, clusterName)
|
||||
if _, err := kclient.Services("default").Create(svc); err != nil {
|
||||
return err
|
||||
|
@ -29,8 +28,8 @@ func createEtcdService(kclient *unversioned.Client, etcdName, clusterName string
|
|||
return nil
|
||||
}
|
||||
|
||||
// todo: use a struct to replace the huge arg list.
|
||||
func createEtcdPod(kclient *unversioned.Client, initialCluster []string, m *Member, clusterName, state string, antiAffinity bool) error {
|
||||
// TODO: use a struct to replace the huge arg list.
|
||||
func CreateEtcdPod(kclient *unversioned.Client, initialCluster []string, m *etcdutil.Member, clusterName, state string, antiAffinity bool) error {
|
||||
pod := makeEtcdPod(m, initialCluster, clusterName, state, antiAffinity)
|
||||
if _, err := kclient.Pods("default").Create(pod); err != nil {
|
||||
return err
|
||||
|
@ -76,7 +75,7 @@ func makeEtcdService(etcdName, clusterName string) *api.Service {
|
|||
}
|
||||
|
||||
// todo: use a struct to replace the huge arg list.
|
||||
func makeEtcdPod(m *Member, initialCluster []string, clusterName, state string, antiAffinity bool) *api.Pod {
|
||||
func makeEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state string, antiAffinity bool) *api.Pod {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: m.Name,
|
||||
|
@ -152,29 +151,16 @@ func makeEtcdPod(m *Member, initialCluster []string, clusterName, state string,
|
|||
return pod
|
||||
}
|
||||
|
||||
func mustCreateClient(host string) *unversioned.Client {
|
||||
if len(host) == 0 {
|
||||
cfg, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
c, err := unversioned.NewInCluster()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
masterHost = cfg.Host
|
||||
return c
|
||||
}
|
||||
|
||||
hostUrl, err := url.Parse(host)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error parsing host url %s : %v", host, err))
|
||||
}
|
||||
func MustCreateClient(host string, tlsInsecure bool, tlsConfig restclient.TLSClientConfig) *unversioned.Client {
|
||||
cfg := &restclient.Config{
|
||||
Host: host,
|
||||
QPS: 100,
|
||||
Burst: 100,
|
||||
}
|
||||
hostUrl, err := url.Parse(host)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error parsing host url %s : %v", host, err))
|
||||
}
|
||||
if hostUrl.Scheme == "https" {
|
||||
cfg.TLSClientConfig = tlsConfig
|
||||
cfg.Insecure = tlsInsecure
|
||||
|
@ -186,7 +172,7 @@ func mustCreateClient(host string) *unversioned.Client {
|
|||
return c
|
||||
}
|
||||
|
||||
func isKubernetesResourceAlreadyExistError(err error) bool {
|
||||
func IsKubernetesResourceAlreadyExistError(err error) bool {
|
||||
se, ok := err.(*apierrors.StatusError)
|
||||
if !ok {
|
||||
return false
|
||||
|
@ -197,7 +183,7 @@ func isKubernetesResourceAlreadyExistError(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func isKubernetesResourceNotFoundError(err error) bool {
|
||||
func IsKubernetesResourceNotFoundError(err error) bool {
|
||||
se, ok := err.(*apierrors.StatusError)
|
||||
if !ok {
|
||||
return false
|
||||
|
@ -208,20 +194,10 @@ func isKubernetesResourceNotFoundError(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func waitMemberReady(cli *clientv3.Client) error {
|
||||
for {
|
||||
_, err := cli.Get(context.TODO(), "/")
|
||||
if err == rpctypes.ErrNotCapable {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
func ListETCDCluster(host string, httpClient *http.Client) (*http.Response, error) {
|
||||
return httpClient.Get(host + "/apis/coreos.com/v1/namespaces/default/etcdclusters")
|
||||
}
|
||||
|
||||
func listETCDCluster(httpClient *http.Client) (*http.Response, error) {
|
||||
return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters")
|
||||
}
|
||||
|
||||
func watchETCDCluster(httpClient *http.Client, resourceVersion string) (*http.Response, error) {
|
||||
return httpClient.Get(masterHost + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true&resourceVersion=" + resourceVersion)
|
||||
func WatchETCDCluster(host string, httpClient *http.Client, resourceVersion string) (*http.Response, error) {
|
||||
return httpClient.Get(host + "/apis/coreos.com/v1/namespaces/default/etcdclusters?watch=true&resourceVersion=" + resourceVersion)
|
||||
}
|
Загрузка…
Ссылка в новой задаче