зеркало из https://github.com/Azure/draft-classic.git
Merge pull request #583 from radu-matei/connect-wait-pods
Wait for pod to be ready before connecting
This commit is contained in:
Коммит
7020b2e9fb
|
@ -8,7 +8,6 @@ import (
|
|||
|
||||
"github.com/BurntSushi/toml"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
|
@ -70,11 +69,12 @@ func DeployedApplication(draftTomlPath, draftEnvironment string) (*App, error) {
|
|||
// Connect tunnels to a Kubernetes pod running the application and returns the connection information
|
||||
func (a *App) Connect(clientset kubernetes.Interface, clientConfig *restclient.Config, targetContainer string, overridePorts []string) (*Connection, error) {
|
||||
var cc []*ContainerConnection
|
||||
pod, err := getPod(a.Namespace, a.Name, clientset)
|
||||
label := labels.Set{DraftLabelKey: a.Name}
|
||||
|
||||
pod, err := podutil.GetPod(a.Namespace, label, clientset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m, err := getPortMapping(overridePorts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -178,23 +178,6 @@ func (c *Connection) RequestLogStream(namespace string, containerName string, lo
|
|||
|
||||
}
|
||||
|
||||
func getPod(namespace, label string, clientset kubernetes.Interface) (*v1.Pod, error) {
|
||||
options := metav1.ListOptions{LabelSelector: labels.Set{DraftLabelKey: label}.AsSelector().String()}
|
||||
pods, err := clientset.CoreV1().Pods(namespace).List(options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pods.Items) < 1 {
|
||||
return nil, fmt.Errorf("could not find ready pod")
|
||||
}
|
||||
for _, p := range pods.Items {
|
||||
if podutil.IsPodReady(&p) {
|
||||
return &p, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("could not find a ready pod")
|
||||
}
|
||||
|
||||
func getTargetContainerPorts(containers []v1.Container, targetContainer string) ([]int, error) {
|
||||
var ports []int
|
||||
containerFound := false
|
||||
|
|
|
@ -1,8 +1,17 @@
|
|||
// package podutil exists for functions that exist in k8s.io/kubernetes but not in k8s.io/client-go. Everything here should be contributed upstream.
|
||||
// package podutil exists for functions that exist in k8s.io/kubernetes but not in k8s.io/client-go. Most of the things here should be contributed upstream.
|
||||
|
||||
package podutil
|
||||
|
||||
import "k8s.io/api/core/v1"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// IsPodReady returns true if a pod is ready; false otherwise.
|
||||
func IsPodReady(pod *v1.Pod) bool {
|
||||
|
@ -35,3 +44,47 @@ func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (i
|
|||
}
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
// GetPod waits for a pod with the specified label to be ready, then returns it
|
||||
// if no pod is ready, it checks every second until a pod is ready until timeout is reached
|
||||
func GetPod(namespace string, l labels.Set, clientset kubernetes.Interface) (*v1.Pod, error) {
|
||||
var targetPod *v1.Pod
|
||||
s := newStopChan()
|
||||
|
||||
listwatch := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", namespace, fields.Everything())
|
||||
_, controller := cache.NewInformer(listwatch, &v1.Pod{}, time.Second, cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(o, n interface{}) {
|
||||
newPod := n.(*v1.Pod)
|
||||
|
||||
// check the pod label and if pod is in terminating state
|
||||
if (!hasLabels(l, newPod.Labels)) || (newPod.ObjectMeta.DeletionTimestamp != nil) {
|
||||
return
|
||||
}
|
||||
|
||||
if IsPodReady(newPod) {
|
||||
targetPod = newPod
|
||||
s.closeOnce()
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
go func() {
|
||||
controller.Run(s.c)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-s.c:
|
||||
return targetPod, nil
|
||||
case <-time.After(5 * time.Minute):
|
||||
return nil, fmt.Errorf("cannot get pod with labels %v: timed out", l)
|
||||
}
|
||||
}
|
||||
|
||||
func hasLabels(draftLabels, podLabels map[string]string) bool {
|
||||
for k, v := range draftLabels {
|
||||
if podLabels[k] != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package podutil
|
||||
|
||||
import "sync"
|
||||
|
||||
// needed so that we only attempt to close the channel once
|
||||
// used when watching for pods to be ready
|
||||
type stopChan struct {
|
||||
c chan struct{}
|
||||
sync.Once
|
||||
}
|
||||
|
||||
func newStopChan() *stopChan {
|
||||
return &stopChan{c: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (s *stopChan) closeOnce() {
|
||||
s.Do(func() {
|
||||
close(s.c)
|
||||
})
|
||||
}
|
Загрузка…
Ссылка в новой задаче