This commit is contained in:
Aaron Brown 2017-05-18 15:18:42 -04:00
Родитель d9b35960ea
Коммит 86ae75d143
10 изменённых файлов: 106 добавлений и 49 удалений

Просмотреть файл

@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd-operator/pkg/backup"
"github.com/coreos/etcd-operator/pkg/backup/env"
"github.com/coreos/etcd-operator/pkg/spec"
"github.com/coreos/etcd-operator/pkg/util/constants"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
"github.com/coreos/etcd-operator/version"
@ -30,10 +31,11 @@ import (
)
var (
masterHost string
clusterName string
listenAddr string
namespace string
masterHost string
clusterName string
clusterDomain string
listenAddr string
namespace string
printVersion bool
)
@ -41,6 +43,7 @@ var (
func init() {
flag.StringVar(&masterHost, "master", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
flag.StringVar(&clusterName, "etcd-cluster", "", "")
flag.StringVar(&clusterDomain, "cluster-domain", constants.DefaultClusterDomain, "Domain for this cluster.")
flag.StringVar(&listenAddr, "listen", "0.0.0.0:19999", "")
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
@ -69,7 +72,7 @@ func main() {
}
kclient := k8sutil.MustNewKubeClient()
bk, err := backup.New(kclient, clusterName, namespace, cs, listenAddr)
bk, err := backup.New(kclient, clusterName, namespace, clusterDomain, cs, listenAddr)
if err != nil {
logrus.Fatalf("failed to create backup sidecar: %v", err)
}

Просмотреть файл

@ -53,6 +53,7 @@ var (
awsConfig string
s3Bucket string
gcInterval time.Duration
clusterDomain string
chaosLevel int
@ -78,6 +79,7 @@ func init() {
flag.IntVar(&chaosLevel, "chaos-level", -1, "DO NOT USE IN PRODUCTION - level of chaos injected into the etcd clusters created by the operator.")
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
flag.DurationVar(&gcInterval, "gc-interval", 10*time.Minute, "GC interval")
flag.StringVar(&clusterDomain, "cluster-domain", constants.DefaultClusterDomain, "Domain for this cluster.")
flag.Parse()
// Workaround for watching TPR resource.
@ -196,6 +198,7 @@ func newControllerConfig() controller.Config {
Namespace: namespace,
ServiceAccount: serviceAccount,
PVProvisioner: pvProvisioner,
ClusterDomain: clusterDomain,
S3Context: s3config.S3Context{
AWSSecret: awsSecret,
AWSConfig: awsConfig,

Просмотреть файл

@ -48,6 +48,7 @@ type Backup struct {
clusterName string
namespace string
clusterDomain string
policy spec.BackupPolicy
listenAddr string
etcdTLSConfig *tls.Config
@ -60,7 +61,7 @@ type Backup struct {
recentBackupsStatus []backupapi.BackupStatus
}
func New(kclient kubernetes.Interface, clusterName, ns string, sp spec.ClusterSpec, listenAddr string) (*Backup, error) {
func New(kclient kubernetes.Interface, clusterName, ns, clusterDomain string, sp spec.ClusterSpec, listenAddr string) (*Backup, error) {
bdir := path.Join(constants.BackupMountDir, PVBackupV1, clusterName)
// We created not only backup dir and but also tmp dir under it.
// tmp dir is used to store intermediate snapshot files.
@ -105,6 +106,7 @@ func New(kclient kubernetes.Interface, clusterName, ns string, sp spec.ClusterSp
kclient: kclient,
clusterName: clusterName,
namespace: ns,
clusterDomain: clusterDomain,
policy: *sp.Backup,
listenAddr: listenAddr,
be: be,
@ -179,7 +181,7 @@ func (b *Backup) saveSnap(lastSnapRev int64) (int64, error) {
logrus.Warning(msg)
return lastSnapRev, fmt.Errorf(msg)
}
member, rev := getMemberWithMaxRev(pods, b.etcdTLSConfig)
member, rev := getMemberWithMaxRev(pods, b.etcdTLSConfig, b.clusterDomain)
if member == nil {
logrus.Warning("no reachable member")
return lastSnapRev, fmt.Errorf("no reachable member")
@ -246,14 +248,15 @@ func (b *Backup) writeSnap(m *etcdutil.Member, rev int64) error {
return nil
}
func getMemberWithMaxRev(pods []*v1.Pod, tc *tls.Config) (*etcdutil.Member, int64) {
func getMemberWithMaxRev(pods []*v1.Pod, tc *tls.Config, clusterDomain string) (*etcdutil.Member, int64) {
var member *etcdutil.Member
maxRev := int64(0)
for _, pod := range pods {
m := &etcdutil.Member{
Name: pod.Name,
Namespace: pod.Namespace,
SecureClient: tc != nil,
Name: pod.Name,
Namespace: pod.Namespace,
ClusterDomain: clusterDomain,
SecureClient: tc != nil,
}
cfg := clientv3.Config{
Endpoints: []string{m.ClientAddr()},

Просмотреть файл

@ -57,6 +57,7 @@ type clusterEvent struct {
type Config struct {
PVProvisioner string
ServiceAccount string
ClusterDomain string
s3config.S3Context
KubeCli kubernetes.Interface
@ -326,7 +327,7 @@ func (c *Cluster) run(stopC <-chan struct{}) {
// On controller restore, we could have "members == nil"
if rerr != nil || c.members == nil {
rerr = c.updateMembers(podsToMemberSet(running, c.cluster.Spec.SelfHosted, c.isSecureClient()))
rerr = c.updateMembers(podsToMemberSet(running, c.cluster.Spec.SelfHosted, c.isSecureClient(), c.config.ClusterDomain))
if rerr != nil {
c.logger.Errorf("failed to update members: %v", rerr)
break
@ -395,10 +396,11 @@ func isBackupPolicyEqual(b1, b2 *spec.BackupPolicy) bool {
func (c *Cluster) startSeedMember(recoverFromBackup bool) error {
m := &etcdutil.Member{
Name: etcdutil.CreateMemberName(c.cluster.Metadata.Name, c.memberCounter),
Namespace: c.cluster.Metadata.Namespace,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
Name: etcdutil.CreateMemberName(c.cluster.Metadata.Name, c.memberCounter),
Namespace: c.cluster.Metadata.Namespace,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
ClusterDomain: c.config.ClusterDomain,
}
ms := etcdutil.NewMemberSet(m)
if err := c.createPod(ms, m, "new", recoverFromBackup); err != nil {
@ -524,7 +526,10 @@ func (c *Cluster) updateMemberStatus(pods []*v1.Pod) {
if c.cluster.Spec.SelfHosted != nil {
url = fmt.Sprintf("http://%s:2379", pod.Status.PodIP)
} else {
m := &etcdutil.Member{Name: pod.Name, Namespace: pod.Namespace, SecureClient: c.isSecureClient()}
m := &etcdutil.Member{Name: pod.Name,
Namespace: pod.Namespace,
ClusterDomain: c.config.ClusterDomain,
SecureClient: c.isSecureClient()}
url = m.ClientAddr()
}
healthy, err := etcdutil.CheckHealth(url, c.tlsConfig)

Просмотреть файл

@ -66,23 +66,27 @@ func (c *Cluster) updateMembers(known etcdutil.MemberSet) error {
}
members[name] = &etcdutil.Member{
Name: name,
Namespace: c.cluster.Metadata.Namespace,
ID: m.ID,
ClientURLs: m.ClientURLs,
PeerURLs: m.PeerURLs,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
Name: name,
Namespace: c.cluster.Metadata.Namespace,
ClusterDomain: c.config.ClusterDomain,
ID: m.ID,
ClientURLs: m.ClientURLs,
PeerURLs: m.PeerURLs,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
}
}
c.members = members
return nil
}
func podsToMemberSet(pods []*v1.Pod, selfHosted *spec.SelfHostedPolicy, sc bool) etcdutil.MemberSet {
func podsToMemberSet(pods []*v1.Pod, selfHosted *spec.SelfHostedPolicy, sc bool, clusterDomain string) etcdutil.MemberSet {
members := etcdutil.MemberSet{}
for _, pod := range pods {
m := &etcdutil.Member{Name: pod.Name, Namespace: pod.Namespace, SecureClient: sc}
m := &etcdutil.Member{Name: pod.Name,
Namespace: pod.Namespace,
ClusterDomain: clusterDomain,
SecureClient: sc}
if selfHosted != nil {
m.ClientURLs = []string{"http://" + pod.Status.PodIP + ":2379"}
m.PeerURLs = []string{"http://" + pod.Status.PodIP + ":2380"}

Просмотреть файл

@ -17,7 +17,6 @@ package cluster
import (
"errors"
"github.com/coreos/etcd-operator/pkg/spec"
"github.com/coreos/etcd-operator/pkg/util/constants"
"github.com/coreos/etcd-operator/pkg/util/etcdutil"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
@ -40,15 +39,15 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error {
}()
sp := c.cluster.Spec
running := podsToMemberSet(pods, c.cluster.Spec.SelfHosted, c.isSecureClient())
running := podsToMemberSet(pods, c.cluster.Spec.SelfHosted, c.isSecureClient(), c.config.ClusterDomain)
if !running.IsEqual(c.members) || c.members.Size() != sp.Size {
return c.reconcileMembers(running)
}
if needUpgrade(pods, sp) {
if c.needUpgrade(pods) {
c.status.UpgradeVersionTo(sp.Version)
m := pickOneOldMember(pods, sp.Version)
m := c.pickOneOldMember(pods)
return c.upgradeOneMember(m.Name)
}
@ -58,6 +57,20 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error {
return nil
}
func (c *Cluster) needUpgrade(pods []*v1.Pod) bool {
return len(pods) == c.cluster.Spec.Size && c.pickOneOldMember(pods) != nil
}
func (c *Cluster) pickOneOldMember(pods []*v1.Pod) *etcdutil.Member {
for _, pod := range pods {
if k8sutil.GetEtcdVersion(pod) == c.cluster.Spec.Version {
continue
}
return &etcdutil.Member{Name: pod.Name, Namespace: pod.Namespace, ClusterDomain: c.config.ClusterDomain}
}
return nil
}
// reconcileMembers reconciles
// - running pods on k8s and cluster membership
// - cluster membership and expected size of etcd cluster
@ -128,10 +141,11 @@ func (c *Cluster) addOneMember() error {
newMemberName := etcdutil.CreateMemberName(c.cluster.Metadata.Name, c.memberCounter)
newMember := &etcdutil.Member{
Name: newMemberName,
Namespace: c.cluster.Metadata.Namespace,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
Name: newMemberName,
Namespace: c.cluster.Metadata.Namespace,
ClusterDomain: c.config.ClusterDomain,
SecurePeer: c.isSecurePeer(),
SecureClient: c.isSecureClient(),
}
ctx, _ := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.MemberAdd(ctx, []string{newMember.PeerURL()})
@ -228,17 +242,3 @@ func (c *Cluster) disasterRecovery(left etcdutil.MemberSet) error {
}
return c.recover()
}
func needUpgrade(pods []*v1.Pod, cs spec.ClusterSpec) bool {
return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil
}
func pickOneOldMember(pods []*v1.Pod, newVersion string) *etcdutil.Member {
for _, pod := range pods {
if k8sutil.GetEtcdVersion(pod) == newVersion {
continue
}
return &etcdutil.Member{Name: pod.Name, Namespace: pod.Namespace}
}
return nil
}

Просмотреть файл

@ -76,6 +76,8 @@ type Config struct {
Namespace string
ServiceAccount string
PVProvisioner string
ClusterDomain string
s3config.S3Context
KubeCli kubernetes.Interface
}
@ -240,6 +242,7 @@ func (c *Controller) makeClusterConfig() cluster.Config {
PVProvisioner: c.PVProvisioner,
ServiceAccount: c.Config.ServiceAccount,
S3Context: c.S3Context,
ClusterDomain: c.ClusterDomain,
KubeCli: c.KubeCli,
}

Просмотреть файл

@ -20,6 +20,7 @@ const (
DefaultDialTimeout = 5 * time.Second
DefaultRequestTimeout = 5 * time.Second
DefaultSnapshotInterval = 1800 * time.Second
DefaultClusterDomain = "cluster.local."
DefaultBackupPodHTTPPort = 19999

Просмотреть файл

@ -21,6 +21,8 @@ import (
"regexp"
"strconv"
"strings"
"github.com/coreos/etcd-operator/pkg/util/constants"
)
type Member struct {
@ -39,10 +41,23 @@ type Member struct {
SecurePeer bool
SecureClient bool
// ClusterDomain is the domain under which DNS entries will be added
ClusterDomain string
}
func (m *Member) clusterDomain() string {
if len(m.ClusterDomain) < 1 {
return constants.DefaultClusterDomain
}
if !strings.HasSuffix(m.ClusterDomain, ".") {
return fmt.Sprintf("%s.", m.ClusterDomain)
}
return m.ClusterDomain
}
func (m *Member) fqdn() string {
return fmt.Sprintf("%s.%s.%s.svc.cluster.local", m.Name, clusterNameFromMemberName(m.Name), m.Namespace)
return fmt.Sprintf("%s.%s.%s.svc.%s", m.Name, clusterNameFromMemberName(m.Name), m.Namespace, m.clusterDomain())
}
func (m *Member) ClientAddr() string {

Просмотреть файл

@ -54,3 +54,23 @@ func TestMemberSetIsEqual(t *testing.T) {
}
}
}
func TestMemberFqdn(t *testing.T) {
tests := []struct {
m *Member
want string
}{{
m: &Member{Name: "cluster-000", Namespace: "foo"},
want: "cluster-000.cluster.foo.svc.cluster.local.",
}, {
m: &Member{Name: "cluster-000", Namespace: "foo", ClusterDomain: "abc.local"},
want: "cluster-000.cluster.foo.svc.abc.local.",
}}
for i, tt := range tests {
val := tt.m.fqdn()
if val != tt.want {
t.Errorf("#%d: get=%s, want=%s", i, val, tt.want)
}
}
}