all: display pool status and delete failed/old pods

* status page shows kube pool details
* pods created by the coorindator are tracked
* pods that fail to create are deleted
* pods older than delete-at are deleted
* pods created by a different coordinator are deleted

Updates golang/go#12546

Change-Id: I4c4f8ff906962b4a014a66d0a9d490ff17710d62
Reviewed-on: https://go-review.googlesource.com/16101
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
This commit is contained in:
Evan Brown 2015-10-17 20:35:55 -07:00 коммит произвёл Brad Fitzpatrick
Родитель 4154d4f503
Коммит 83f9748046
6 изменённых файлов: 330 добавлений и 28 удалений

Просмотреть файл

@ -19,6 +19,13 @@ import (
"golang.org/x/net/context/ctxhttp"
)
var (
// TODO(evanbrown): resource requirements should be
// defined per-builder in dashboard/builders.go
BuildletCPU = api.MustParse("2") // 2 Cores
BuildletMemory = api.MustParse("2000000Ki") // 2,000,000Ki RAM
)
// PodOpts control how new pods are started.
type PodOpts struct {
// ImageRegistry specifies the Docker registry Kubernetes
@ -40,11 +47,6 @@ type PodOpts struct {
// to delete the pod.
DeleteIn time.Duration
// OnInstanceRequested optionally specifies a hook to run synchronously
// after the pod create call, but before
// waiting for its operation to proceed.
OnPodRequested func()
// OnPodCreated optionally specifies a hook to run synchronously
// after the pod operation succeeds.
OnPodCreated func()
@ -73,6 +75,7 @@ func StartPod(ctx context.Context, kubeClient *kubernetes.Client, podName, build
"type": builderType,
"role": "buildlet",
},
Annotations: map[string]string{},
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
@ -81,7 +84,13 @@ func StartPod(ctx context.Context, kubeClient *kubernetes.Client, podName, build
Name: "buildlet",
Image: imageID(opts.ImageRegistry, conf.KubeImage),
ImagePullPolicy: api.PullAlways,
Command: []string{"/usr/local/bin/stage0"},
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: BuildletCPU,
api.ResourceMemory: BuildletMemory,
},
},
Command: []string{"/usr/local/bin/stage0"},
Ports: []api.ContainerPort{
{
ContainerPort: 80,
@ -120,17 +129,19 @@ func StartPod(ctx context.Context, kubeClient *kubernetes.Client, podName, build
if opts.DeleteIn != 0 {
// In case the pod gets away from us (generally: if the
// coordinator dies while a build is running), then we
// set this attribute of when it should be killed so
// set this annotation of when it should be killed so
// we can kill it later when the coordinator is
// restarted. The cleanUpOldPods goroutine loop handles
// that killing.
addEnv("META_DELETE_AT", fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix()))
pod.ObjectMeta.Annotations["delete-at"] = fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix())
}
status, err := kubeClient.Run(ctx, pod)
status, err := kubeClient.RunPod(ctx, pod)
if err != nil {
return nil, fmt.Errorf("pod could not be created: %v", err)
}
condRun(opts.OnPodCreated)
// The new pod must be in Running phase. Possible phases are described at
// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
if status.Phase != api.PodRunning {

Просмотреть файл

@ -306,6 +306,7 @@ func main() {
http.HandleFunc("/dosomework/", handleDoSomeWork(workc))
} else {
go gcePool.cleanUpOldVMs()
go kubePool.cleanUpOldPods(context.Background())
if inStaging {
dashboard.BuildletBucket = "dev-go-builder-data"

Просмотреть файл

@ -393,11 +393,11 @@ func (p *gceBuildletPool) instanceUsed(instName string) bool {
return ok
}
func (p *gceBuildletPool) instancesActive() (ret []instanceTime) {
func (p *gceBuildletPool) instancesActive() (ret []resourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, create := range p.inst {
ret = append(ret, instanceTime{
ret = append(ret, resourceTime{
name: name,
creation: create,
})
@ -406,13 +406,13 @@ func (p *gceBuildletPool) instancesActive() (ret []instanceTime) {
return ret
}
// instanceTime is a GCE instance name and its creation time.
type instanceTime struct {
// resourceTime is a GCE instance or Kube pod name and its creation time.
type resourceTime struct {
name string
creation time.Time
}
type byCreationTime []instanceTime
type byCreationTime []resourceTime
func (s byCreationTime) Len() int { return len(s) }
func (s byCreationTime) Less(i, j int) bool { return s[i].creation.Before(s[j].creation) }

Просмотреть файл

@ -13,12 +13,16 @@ import (
"io"
"log"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
"golang.org/x/net/context"
"golang.org/x/oauth2"
container "google.golang.org/api/container/v1"
@ -104,15 +108,55 @@ func initKube() error {
if err != nil {
return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
}
go kubePool.pollCapacityLoop()
return nil
}
var kubePool = &kubeBuildletPool{}
var kubePool = &kubeBuildletPool{
cpuCapacity: api.NewQuantity(0, api.DecimalSI),
cpuUsage: api.NewQuantity(0, api.DecimalSI),
memoryCapacity: api.NewQuantity(0, api.BinarySI),
memoryUsage: api.NewQuantity(0, api.BinarySI),
}
// kubeBuildletPool is the Kubernetes buildlet pool.
type kubeBuildletPool struct {
// ...
mu sync.Mutex
mu sync.Mutex // guards all following
pods map[string]time.Time // pod instance name -> creationTime
cpuCapacity *api.Quantity // cpu capacity as reported by the Kubernetes api
memoryCapacity *api.Quantity
cpuUsage *api.Quantity
memoryUsage *api.Quantity
}
func (p *kubeBuildletPool) pollCapacityLoop() {
ctx := context.Background()
for {
p.pollCapacity(ctx)
time.Sleep(30 * time.Second)
}
}
func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
nodes, err := kubeClient.GetNodes(ctx)
if err != nil {
log.Printf("Failed to get Kubernetes cluster capacity for %s/%s: %v", projectID, projectRegion, err)
return
}
p.mu.Lock()
defer p.mu.Unlock()
// Calculate the total CPU and memory capacity of the cluster
var sumCPU = api.NewQuantity(0, api.DecimalSI)
var sumMemory = api.NewQuantity(0, api.BinarySI)
for _, n := range nodes {
sumCPU.Add(n.Status.Capacity[api.ResourceCPU])
sumMemory.Add(n.Status.Capacity[api.ResourceMemory])
}
p.cpuCapacity = sumCPU
p.memoryCapacity = sumMemory
}
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
@ -153,13 +197,10 @@ func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, typ, rev string, el
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s at %s", typ, rev),
DeleteIn: deleteIn,
OnPodRequested: func() {
el.logEventTime("pod_create_requested", podName)
log.Printf("Pod %q starting", podName)
},
OnPodCreated: func() {
el.logEventTime("pod_created")
needDelete = true // redundant with OnPodRequested one, but fine.
p.setPodUsed(podName, true)
needDelete = true
},
OnGotPodInfo: func() {
el.logEventTime("got_pod_info", "waiting_for_buildlet...")
@ -167,19 +208,95 @@ func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, typ, rev string, el
})
if err != nil {
el.logEventTime("kube_buildlet_create_failure", fmt.Sprintf("%s: %v", podName, err))
log.Printf("Failed to create kube pod for %s, %s: %v", typ, rev, err)
if needDelete {
//TODO(evanbrown): delete pod
log.Printf("Deleting failed pod %q", podName)
kubeClient.DeletePod(ctx, podName)
p.setPodUsed(podName, false)
}
//p.setInstanceUsed(instName, false)
return nil, err
}
bc.SetDescription("Kube Pod: " + podName)
// The build's context will be canceled when the build completes (successfully
// or not), or if the buildlet becomes unavailable. In any case, delete the pod
// running the buildlet.
go func() {
<-ctx.Done()
log.Printf("Deleting pod %q after build context cancel received ", podName)
// Giving DeletePod a new context here as the build ctx has been canceled
kubeClient.DeletePod(context.Background(), podName)
p.setPodUsed(podName, false)
}()
return bc, nil
}
func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
io.WriteString(w, "<b>Kubernetes pool summary</b><ul><li>(TODO)</li></ul>")
fmt.Fprintf(w, "<b>Kubernetes pool</b> capacity: %s", p.capacityString())
const show = 6 // must be even
active := p.podsActive()
if len(active) > 0 {
fmt.Fprintf(w, "<ul>")
for i, pod := range active {
if i < show/2 || i >= len(active)-(show/2) {
fmt.Fprintf(w, "<li>%v, %v</li>\n", pod.name, time.Since(pod.creation))
} else if i == show/2 {
fmt.Fprintf(w, "<li>... %d of %d total omitted ...</li>\n", len(active)-show, len(active))
}
}
fmt.Fprintf(w, "</ul>")
}
}
func (p *kubeBuildletPool) capacityString() string {
p.mu.Lock()
defer p.mu.Unlock()
return fmt.Sprintf("%v/%v CPUs; %v/%v Memory",
p.cpuUsage, p.cpuCapacity,
p.memoryUsage, p.memoryCapacity)
}
func (p *kubeBuildletPool) setPodUsed(podName string, used bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.pods == nil {
p.pods = make(map[string]time.Time)
}
if used {
p.pods[podName] = time.Now()
// Track cpu and memory usage
p.cpuUsage.Add(buildlet.BuildletCPU)
p.memoryUsage.Add(buildlet.BuildletMemory)
} else {
delete(p.pods, podName)
// Track cpu and memory usage
p.cpuUsage.Sub(buildlet.BuildletCPU)
p.memoryUsage.Sub(buildlet.BuildletMemory)
}
}
func (p *kubeBuildletPool) podUsed(podName string) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.pods[podName]
return ok
}
func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, create := range p.pods {
ret = append(ret, resourceTime{
name: name,
creation: create,
})
}
sort.Sort(byCreationTime(ret))
return ret
}
func (p *kubeBuildletPool) String() string {
@ -191,6 +308,69 @@ func (p *kubeBuildletPool) String() string {
return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
}
// cleanUpOldPods loops forever and periodically enumerates pods
// and deletes those which have expired.
//
// A Pod is considered expired if it has a "delete-at" metadata
// attribute having a unix timestamp before the current time.
//
// This is the safety mechanism to delete pods which stray from the
// normal deleting process. Pods are created to run a single build and
// should be shut down by a controlling process. Due to various types
// of failures, they might get stranded. To prevent them from getting
// stranded and wasting resources forever, we instead set the
// "delete-at" metadata attribute on them when created to some time
// that's well beyond their expected lifetime.
func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
if containerService == nil {
return
}
for {
pods, err := kubeClient.GetPods(ctx)
if err != nil {
log.Printf("Error cleaning pods: %v", err)
}
for _, pod := range pods {
if pod.ObjectMeta.Annotations == nil {
// Defensive. Not seen in practice.
continue
}
sawDeleteAt := false
for k, v := range pod.ObjectMeta.Annotations {
if k == "delete-at" {
sawDeleteAt = true
if v == "" {
log.Printf("missing delete-at value; ignoring")
continue
}
unixDeadline, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("invalid delete-at value %q seen; ignoring", v)
}
if err == nil && time.Now().Unix() > unixDeadline {
log.Printf("Deleting expired pod %q in zone %q ...", pod.Name)
err = kubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("problem deleting pod: %v", err)
}
}
}
}
// Delete buildlets (things we made) from previous
// generations. Only deleting things starting with "buildlet-"
// is a historical restriction, but still fine for paranoia.
if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") && !p.podUsed(pod.Name) {
log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name)
err = kubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("problem deleting pod: %v", err)
}
}
}
time.Sleep(time.Minute)
}
}
func hasCloudPlatformScope() bool {
return hasScope(container.CloudPlatformScope)
}

Просмотреть файл

@ -61,6 +61,11 @@ func handleStatus(w http.ResponseWriter, r *http.Request) {
gcePool.WriteHTMLStatus(&buf)
data.GCEPoolStatus = template.HTML(buf.String())
buf.Reset()
kubePool.WriteHTMLStatus(&buf)
data.KubePoolStatus = template.HTML(buf.String())
buf.Reset()
reversePool.WriteHTMLStatus(&buf)
data.ReversePoolStatus = template.HTML(buf.String())
@ -86,6 +91,7 @@ type statusData struct {
TrybotsErr string
Trybots template.HTML
GCEPoolStatus template.HTML // TODO: embed template
KubePoolStatus template.HTML // TODO: embed template
ReversePoolStatus template.HTML // TODO: embed template
RemoteBuildlets template.HTML
DiskFree string
@ -129,6 +135,7 @@ var statusTmpl = template.Must(template.New("status").Parse(`
<h2>Buildlet pools</h2>
<ul>
<li>{{.GCEPoolStatus}}</li>
<li>{{.KubePoolStatus}}</li>
<li>{{.ReversePoolStatus}}</li>
</ul>

Просмотреть файл

@ -11,6 +11,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
@ -26,6 +27,7 @@ const (
APIEndpoint = "/api/v1"
defaultPod = "/namespaces/default/pods"
defaultWatchPod = "/watch/namespaces/default/pods"
nodes = "/nodes"
)
// Client is a client for the Kubernetes master.
@ -55,7 +57,7 @@ func NewClient(baseURL string, client *http.Client) (*Client, error) {
// An error is returned if the pod can not be created, if it does
// does not enter the running phase within 2 minutes, or if ctx.Done
// is closed.
func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
var podJSON bytes.Buffer
if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@ -86,11 +88,67 @@ func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error)
status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
if err != nil {
log.Printf("Timed out waiting for pod to leave pending state. Pod will be deleted.")
// The pod did not leave the pending state. We should try to manually delete it before
// returning an error.
c.DeletePod(context.Background(), podResult.Name)
return nil, err
}
return status, nil
}
// GetPods returns all pods in the cluster, regardless of status.
func (c *Client) GetPods(ctx context.Context) ([]api.Pod, error) {
getURL := c.endpointURL + defaultPod
// Make request to Kubernetes API
req, err := http.NewRequest("GET", getURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
if err != nil {
return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
}
body, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
}
var podList api.PodList
if err := json.Unmarshal(body, &podList); err != nil {
return nil, fmt.Errorf("failed to decode list of pod resources: %v", err)
}
return podList.Items, nil
}
// PodDelete deletes the specified Kubernetes pod.
func (c *Client) DeletePod(ctx context.Context, podName string) error {
url := c.endpointURL + defaultPod + "/" + podName
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: DELETE %q : %v", url, err)
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
if err != nil {
return fmt.Errorf("failed to make request: DELETE %q: %v", url, err)
}
body, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return fmt.Errorf("failed to read response body: DELETE %q: %v, url, err")
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %d DELETE %q: %q: %v", res.StatusCode, url, string(body), err)
}
return nil
}
// awaitPodNotPending will return a pod's status in a
// podStatusResult when the pod is no longer in the pending
// state.
@ -162,6 +220,7 @@ func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion
return
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
defer res.Body.Close()
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
return
@ -169,8 +228,23 @@ func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion
var wps watchPodStatus
reader := bufio.NewReader(res.Body)
// bufio.Reader.ReadBytes is blocking, so we watch for
// context timeout or cancellation in a goroutine
// and close the response body when see see it. The
// response body is also closed via defer when the
// request is made, but closing twice is OK.
go func() {
<-ctx.Done()
res.Body.Close()
}()
for {
line, err := reader.ReadBytes('\n')
if ctx.Err() != nil {
statusChan <- PodStatusResult{Err: ctx.Err()}
return
}
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
return
@ -230,9 +304,38 @@ func (c *Client) PodLog(ctx context.Context, podName string) (string, error) {
return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
}
body, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return "", fmt.Errorf("failed to read response body: GET %q: %v", url, err)
}
res.Body.Close()
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
}
return string(body), nil
}
// PodNodes returns the list of nodes that comprise the Kubernetes cluster
func (c *Client) GetNodes(ctx context.Context) ([]api.Node, error) {
url := c.endpointURL + nodes
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", url, err)
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
if err != nil {
return nil, fmt.Errorf("failed to make request: GET %q: %v", url, err)
}
body, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return nil, fmt.Errorf("failed to read response body: GET %q: %v, url, err")
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
}
var nodeList *api.NodeList
if err := json.Unmarshal(body, &nodeList); err != nil {
return nil, fmt.Errorf("failed to decode node list: %v", err)
}
return nodeList.Items, nil
}