all: improve Kubernetes API client

* pod creation returns an *api.Pod
* WatchPod replaces WatchPodStatus

Updates golang/go#12546

Change-Id: I34bb6e0d994e552b41a8082cc4672a663ce961a3
Reviewed-on: https://go-review.googlesource.com/17100
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
This commit is contained in:
Evan Brown 2015-11-20 09:43:56 -08:00 коммит произвёл Brad Fitzpatrick
Родитель b8e783ac6b
Коммит 2e452e1be8
2 изменённых файлов: 21 добавлений и 20 удалений

Просмотреть файл

@ -136,7 +136,7 @@ func StartPod(ctx context.Context, kubeClient *kubernetes.Client, podName, build
pod.ObjectMeta.Annotations["delete-at"] = fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix())
}
status, err := kubeClient.RunPod(ctx, pod)
newPod, err := kubeClient.RunPod(ctx, pod)
if err != nil {
return nil, fmt.Errorf("pod could not be created: %v", err)
}
@ -144,19 +144,19 @@ func StartPod(ctx context.Context, kubeClient *kubernetes.Client, podName, build
// The new pod must be in Running phase. Possible phases are described at
// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
if status.Phase != api.PodRunning {
return nil, fmt.Errorf("pod is in invalid state %q: %v", status.Phase, status.Message)
if newPod.Status.Phase != api.PodRunning {
return nil, fmt.Errorf("pod is in invalid state %q: %v", newPod.Status.Phase, newPod.Status.Message)
}
// Wait for the pod to boot and its buildlet to come up.
var buildletURL string
var ipPort string
if !opts.TLS.IsZero() {
buildletURL = "https://" + status.PodIP
ipPort = status.PodIP + ":443"
buildletURL = "https://" + newPod.Status.PodIP
ipPort = newPod.Status.PodIP + ":443"
} else {
buildletURL = "http://" + status.PodIP
ipPort = status.PodIP + ":80"
buildletURL = "http://" + newPod.Status.PodIP
ipPort = newPod.Status.PodIP + ":80"
}
condRun(opts.OnGotPodInfo)

Просмотреть файл

@ -57,7 +57,7 @@ func NewClient(baseURL string, client *http.Client) (*Client, error) {
// An error is returned if the pod can not be created, if it does
// does not enter the running phase within 2 minutes, or if ctx.Done
// is closed.
func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) {
var podJSON bytes.Buffer
if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@ -86,15 +86,15 @@ func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, erro
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
if err != nil {
log.Printf("Timed out waiting for pod to leave pending state. Pod will be deleted.")
// The pod did not leave the pending state. We should try to manually delete it before
// returning an error.
c.DeletePod(context.Background(), podResult.Name)
c.DeletePod(context.Background(), createdPod.Name)
return nil, err
}
return status, nil
return createdPod, nil
}
// GetPods returns all pods in the cluster, regardless of status.
@ -156,14 +156,14 @@ func (c *Client) DeletePod(ctx context.Context, podName string) error {
// history from being retrieved when the watch is initiated.
// If there is an error polling for the pod's status, or if
// ctx.Done is closed, podStatusResult will contain an error.
func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.PodStatus, error) {
func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
if podResourceVersion == "" {
return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
podStatusResult, err := c.WatchPodStatus(ctx, podName, podResourceVersion)
podStatusResult, err := c.WatchPod(ctx, podName, podResourceVersion)
if err != nil {
return nil, err
}
@ -174,8 +174,8 @@ func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVer
if psr.Err != nil {
return nil, psr.Err
}
if psr.Status.Phase != api.PodPending {
return psr.Status, nil
if psr.Pod.Status.Phase != api.PodPending {
return psr.Pod, nil
}
}
}
@ -183,8 +183,9 @@ func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVer
// PodStatusResult wraps a api.PodStatus and error
type PodStatusResult struct {
Status *api.PodStatus
Err error
Pod *api.Pod
Type string
Err error
}
type watchPodStatus struct {
@ -194,7 +195,7 @@ type watchPodStatus struct {
Object api.Pod `json:"object"`
}
// WatchPodStatus long-polls the Kubernetes watch API to be notified
// WatchPod long-polls the Kubernetes watch API to be notified
// of changes to the specified pod. Changes are sent on the returned
// PodStatusResult channel as they are received.
// The podResourceVersion is required to prevent a pod's entire
@ -203,7 +204,7 @@ type watchPodStatus struct {
// If any error occurs communicating with the Kubernetes API, the
// error will be sent on the returned PodStatusResult channel and
// it will be closed.
func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
func (c *Client) WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
if podResourceVersion == "" {
return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
}
@ -253,7 +254,7 @@ func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion
statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
return
}
statusChan <- PodStatusResult{Status: &wps.Object.Status}
statusChan <- PodStatusResult{Pod: &wps.Object, Type: wps.Type}
}
}()
return statusChan, nil