cmd/coordinator, buildlet: reliability fixes

Add background heartbeats to detect dead GCE VMs (the OpenBSD
buildlets seem to hang a lot),

Add timeouts to test executions.

Take helper buildlets out of service once they're marked bad.

Keep the in-order buildlet running forever when sharding tests, in
case all the helpers die. (observed once)

Keep a cache of recently deleted VMs and don't try to delete VMs again
if we've recently deleted them. (they're slow to delete)

More reverse buildlets more paranoid in their health checking and closing
of the connection.

Make status page link to /try set URLs.

Also, better logging (more sometimes, less others0, and misc bug fixes.

Change-Id: I57a5e8e39381234006cac4dd799b655d64be71bb
Reviewed-on: https://go-review.googlesource.com/10981
Reviewed-by: Andrew Gerrand <adg@golang.org>
This commit is contained in:
Brad Fitzpatrick 2015-06-12 10:31:58 -07:00
Родитель 5e8cfd4b8c
Коммит d4ea014ceb
5 изменённых файлов: 235 добавлений и 86 удалений

Просмотреть файл

@ -33,6 +33,7 @@ func NewClient(ipPort string, kp KeyPair) *Client {
ipPort: ipPort,
tls: kp,
password: kp.Password(),
peerDead: make(chan struct{}),
httpClient: &http.Client{
Transport: &http.Transport{
Dial: defaultDialer(),
@ -49,6 +50,7 @@ func (c *Client) SetCloseFunc(fn func() error) {
}
func (c *Client) Close() error {
c.setPeerDead(errors.New("Close called"))
var err error
if c.closeFunc != nil {
err = c.closeFunc()
@ -57,6 +59,14 @@ func (c *Client) Close() error {
return err
}
// To be called only via c.setPeerDeadOnce.Do(s.setPeerDead)
func (c *Client) setPeerDead(err error) {
c.setPeerDeadOnce.Do(func() {
c.deadErr = err
close(c.peerDead)
})
}
// SetDescription sets a short description of where the buildlet
// connection came from. This is used by the build coordinator status
// page, mostly for debugging.
@ -65,10 +75,21 @@ func (c *Client) SetDescription(v string) {
}
// SetHTTPClient replaces the underlying HTTP client.
// It should only be called before the Client is used.
func (c *Client) SetHTTPClient(httpClient *http.Client) {
c.httpClient = httpClient
}
// EnableHeartbeats enables background heartbeating
// against the peer.
// It should only be called before the Client is used.
func (c *Client) EnableHeartbeats() {
// TODO(bradfitz): make this always enabled, once the
// reverse buildlet connection model supports
// multiple connections at once.
c.heartbeat = true
}
// defaultDialer returns the net/http package's default Dial function.
// Notably, this sets TCP keep-alive values, so when we kill VMs
// (whose TCP stacks stop replying, forever), we don't leak file
@ -86,10 +107,16 @@ type Client struct {
tls KeyPair
password string // basic auth password or empty for none
httpClient *http.Client
heartbeat bool // whether to heartbeat in the background
closeFunc func() error
desc string
initHeartbeatOnce sync.Once
setPeerDeadOnce sync.Once
peerDead chan struct{} // closed on peer death
deadErr error // guarded by peerDead's close
mu sync.Mutex
broken bool // client is broken in some way
}
@ -126,12 +153,41 @@ func (c *Client) IsBroken() bool {
}
func (c *Client) do(req *http.Request) (*http.Response, error) {
c.initHeartbeatOnce.Do(c.initHeartbeats)
if c.password != "" {
req.SetBasicAuth("gomote", c.password)
}
return c.httpClient.Do(req)
}
func (c *Client) initHeartbeats() {
if !c.heartbeat {
// TODO(bradfitz): make this always enabled later, once
// reverse buildlets are fixed.
return
}
go c.heartbeatLoop()
}
func (c *Client) heartbeatLoop() {
for {
select {
case <-c.peerDead:
// Already dead by something else.
// Most likely: c.Close was called.
return
case <-time.After(10 * time.Second):
t0 := time.Now()
if _, err := c.Status(); err != nil {
err := fmt.Errorf("Buildlet %v failed heartbeat after %v; marking dead; err=%v", c, time.Since(t0), err)
c.MarkBroken()
c.setPeerDead(err)
return
}
}
}
}
var errHeaderTimeout = errors.New("timeout waiting for headers")
// doHeaderTimeout calls c.do(req) and returns its results, or
@ -150,15 +206,20 @@ func (c *Client) doHeaderTimeout(req *http.Request, max time.Duration) (res *htt
timer := time.NewTimer(max)
defer timer.Stop()
cleanup := func() {
if re := <-resErrc; re.res != nil {
re.res.Body.Close()
}
}
select {
case re := <-resErrc:
return re.res, re.err
case <-c.peerDead:
go cleanup()
return nil, c.deadErr
case <-timer.C:
go func() {
if re := <-resErrc; re.res != nil {
res.Body.Close()
}
}()
go cleanup()
return nil, errHeaderTimeout
}
}
@ -279,8 +340,13 @@ type ExecOpts struct {
// response from the buildlet, but before the output begins
// writing to Output.
OnStartExec func()
// Timeout is an optional duration before ErrTimeout is returned.
Timeout time.Duration
}
var ErrTimeout = errors.New("buildlet: timeout waiting for command to complete")
// Exec runs cmd on the buildlet.
//
// Two errors are returned: one is whether the command succeeded
@ -315,9 +381,9 @@ func (c *Client) Exec(cmd string, opts ExecOpts) (remoteErr, execErr error) {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// The first thing the buildlet's exec handler does is flush the headers, so
// 5 seconds should be plenty of time, regardless of where on the planet
// 10 seconds should be plenty of time, regardless of where on the planet
// (Atlanta, Paris, etc) the reverse buildlet is:
res, err := c.doHeaderTimeout(req, 5*time.Second)
res, err := c.doHeaderTimeout(req, 10*time.Second)
if err == errHeaderTimeout {
c.MarkBroken()
return nil, errors.New("buildlet: timeout waiting for exec header response")
@ -332,28 +398,52 @@ func (c *Client) Exec(cmd string, opts ExecOpts) (remoteErr, execErr error) {
}
condRun(opts.OnStartExec)
// Stream the output:
out := opts.Output
if out == nil {
out = ioutil.Discard
}
if _, err := io.Copy(out, res.Body); err != nil {
return nil, fmt.Errorf("error copying response: %v", err)
type errs struct {
remoteErr, execErr error
}
resc := make(chan errs, 1)
go func() {
// Stream the output:
out := opts.Output
if out == nil {
out = ioutil.Discard
}
if _, err := io.Copy(out, res.Body); err != nil {
resc <- errs{execErr: fmt.Errorf("error copying response: %v", err)}
return
}
// Don't record to the dashboard unless we heard the trailer from
// the buildlet, otherwise it was probably some unrelated error
// (like the VM being killed, or the buildlet crashing due to
// e.g. https://golang.org/issue/9309, since we require a tip
// build of the buildlet to get Trailers support)
state := res.Trailer.Get("Process-State")
if state == "" {
return nil, errors.New("missing Process-State trailer from HTTP response; buildlet built with old (<= 1.4) Go?")
// Don't record to the dashboard unless we heard the trailer from
// the buildlet, otherwise it was probably some unrelated error
// (like the VM being killed, or the buildlet crashing due to
// e.g. https://golang.org/issue/9309, since we require a tip
// build of the buildlet to get Trailers support)
state := res.Trailer.Get("Process-State")
if state == "" {
resc <- errs{execErr: errors.New("missing Process-State trailer from HTTP response; buildlet built with old (<= 1.4) Go?")}
return
}
if state != "ok" {
resc <- errs{remoteErr: errors.New(state)}
} else {
resc <- errs{} // success
}
}()
var timer <-chan time.Time
if opts.Timeout > 0 {
t := time.NewTimer(opts.Timeout)
defer t.Stop()
timer = t.C
}
if state != "ok" {
return errors.New(state), nil
select {
case <-timer:
c.MarkBroken()
return nil, ErrTimeout
case res := <-resc:
return res.remoteErr, res.execErr
case <-c.peerDead:
return nil, c.deadErr
}
return nil, nil
}
// Destroy shuts down the buildlet, destroying all state immediately.
@ -437,7 +527,7 @@ func (c *Client) Status() (Status, error) {
if err != nil {
return Status{}, err
}
resp, err := c.do(req)
resp, err := c.doHeaderTimeout(req, 10*time.Second) // plenty of time
if err != nil {
return Status{}, err
}
@ -462,7 +552,7 @@ func (c *Client) WorkDir() (string, error) {
if err != nil {
return "", err
}
resp, err := c.do(req)
resp, err := c.doHeaderTimeout(req, 10*time.Second) // plenty of time
if err != nil {
return "", err
}

Просмотреть файл

@ -115,12 +115,13 @@ func init() {
"plan9-386",
"nacl-386",
"nacl-amd64p32",
"linux-arm-shard_test",
"linux-arm-shard_std_am",
"linux-arm-shard_std_nz",
"linux-arm-shard_runtimecpu",
"linux-arm-shard_cgotest",
"linux-arm-shard_misc",
/* "linux-arm-shard_test",
"linux-arm-shard_std_am",
"linux-arm-shard_std_nz",
"linux-arm-shard_runtimecpu",
"linux-arm-shard_cgotest",
"linux-arm-shard_misc",
*/
}
for _, bname := range tryList {
conf, ok := dashboard.Builders[bname]
@ -512,8 +513,11 @@ func handleLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Content-Type-Options", "nosniff")
writeStatusHeader(w, st)
if r.FormValue("nostream") != "" {
fmt.Fprintf(w, "\n\n(no live streaming. reload manually to see status)\n")
nostream := r.FormValue("nostream") != ""
if nostream || !st.isRunning() {
if nostream {
fmt.Fprintf(w, "\n\n(live streaming disabled; reload manually to see status)\n")
}
st.mu.Lock()
defer st.mu.Unlock()
w.Write(st.output.Bytes())
@ -1268,6 +1272,13 @@ func (st *buildStatus) build() error {
doneMsg := "all tests passed"
if remoteErr != nil {
doneMsg = "with test failures"
} else if err != nil {
doneMsg = "comm error: " + err.Error()
}
if err != nil {
// Return the error *before* we create the magic
// "done" event. (which the try coordinator looks for)
return err
}
st.logEventTime("done", doneMsg) // "done" is a magic value
@ -1857,7 +1868,7 @@ func (st *buildStatus) runSubrepoTests() (remoteErr, err error) {
// runTests is only called for builders which support a split make/run
// (should be everything, at least soon). Currently (2015-05-27) iOS
// and Android and Nacl may not. Untested.
// and Android and Nacl do not.
func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err error) {
testNames, err := st.distTestList()
if err != nil {
@ -1875,8 +1886,12 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
for {
tis, ok := set.testsToRunInOrder()
if !ok {
st.logEventTime("in_order_tests_complete")
return
select {
case <-st.donec:
return
case <-time.After(5 * time.Second):
}
continue
}
goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
st.runTestsOnBuildlet(st.bc, tis, goroot)
@ -1892,7 +1907,7 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
defer time.Sleep(5 * time.Minute)
defer st.logEventTime("DEV_HELPER_SLEEP", bc.IPPort())
}
st.logEventTime("got_helper", bc.IPPort())
st.logEventTime("got_helper", bc.String())
if err := bc.PutTarFromURL(st.snapshotURL(), "go"); err != nil {
log.Printf("failed to extract snapshot for helper %s: %v", bc.IPPort(), err)
return
@ -1902,9 +1917,9 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
return
}
st.logEventTime("setup_helper", bc.IPPort())
st.logEventTime("setup_helper", bc.String())
goroot := st.conf.FilePathJoin(workDir, "go")
for {
for !bc.IsBroken() {
tis, ok := set.testsToRunBiggestFirst()
if !ok {
st.logEventTime("biggest_tests_complete", bc.IPPort())
@ -1950,8 +1965,14 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
return fmt.Errorf("dist test failed: %s: %v", ti.name, ti.remoteErr), nil
}
}
shardedDuration := time.Since(startTime)
st.logEventTime("tests_complete", fmt.Sprintf("took %v; aggregate %v; saved %v", shardedDuration, serialDuration, serialDuration-shardedDuration))
elapsed := time.Since(startTime)
var msg string
if st.conf.NumTestHelpers > 0 {
msg = fmt.Sprintf("took %v; aggregate %v; saved %v", elapsed, serialDuration, serialDuration-elapsed)
} else {
msg = fmt.Sprintf("took %v", elapsed)
}
st.logEventTime("tests_complete", msg)
fmt.Fprintf(st, "\nAll tests passed.\n")
return nil, nil
}
@ -1982,6 +2003,11 @@ func parseOutputAndBanner(b []byte) (banner string, out []byte) {
// (A communication error)
const maxTestExecErrors = 3
func execTimeout(testNames []string) time.Duration {
// TODO(bradfitz): something smarter probably.
return 10 * time.Minute
}
// runTestsOnBuildlet runs tis on bc, using the optional goroot environment variable.
func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem, goroot string) {
names := make([]string, len(tis))
@ -1994,19 +2020,6 @@ func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem,
which := fmt.Sprintf("%s: %v", bc.IPPort(), names)
st.logEventTime("start_tests", which)
// TODO(bradfitz,adg): a few weeks after
// https://go-review.googlesource.com/10688 is submitted,
// around Jun 18th 2015, remove this innerRx stuff and just
// pass a list of test names to dist instead. We don't want to
// do it right away, so people don't have to rebase their CLs
// to avoid trybot failures.
var innerRx string
if len(tis) > 1 {
innerRx = "(" + strings.Join(names, "|") + ")"
} else {
innerRx = names[0]
}
var buf bytes.Buffer
t0 := time.Now()
remoteErr, err := bc.Exec(path.Join("go", "bin", "go"), buildlet.ExecOpts{
@ -2019,8 +2032,11 @@ func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem,
Dir: ".",
Output: &buf, // see "maybe stream lines" TODO below
ExtraEnv: append(st.conf.Env(), "GOROOT="+goroot),
Timeout: execTimeout(names),
Path: []string{"$WORKDIR/go/bin", "$PATH"},
Args: []string{"tool", "dist", "test", "--no-rebuild", "--banner=" + banner, "--run=^" + innerRx + "$"},
Args: append([]string{
"tool", "dist", "test", "--no-rebuild", "--banner=" + banner,
}, names...),
})
summary := "ok"
if err != nil {
@ -2029,7 +2045,7 @@ func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem,
summary = "test failed remotely"
}
execDuration := time.Since(t0)
st.logEventTime("end_tests", fmt.Sprintf("%s; %s after %v", which, summary, execDuration))
st.logEventTime("end_tests", fmt.Sprintf("%s; %s (test exec = %v)", which, summary, execDuration))
if err != nil {
for _, ti := range tis {
ti.numFail++
@ -2306,26 +2322,27 @@ func (st *buildStatus) HTMLStatusLine() template.HTML {
var buf bytes.Buffer
fmt.Fprintf(&buf, "<a href='https://github.com/golang/go/wiki/DashboardBuilders'>%s</a> rev <a href='%s%s'>%s</a>",
st.name, urlPrefix, st.rev, st.rev)
st.name, urlPrefix, st.rev, st.rev[:8])
if st.isSubrepo() {
fmt.Fprintf(&buf, " (sub-repo %s rev <a href='%s%s'>%s</a>)",
st.subName, urlPrefix, st.subRev, st.subRev)
st.subName, urlPrefix, st.subRev, st.subRev[:8])
}
if ts := st.trySet; ts != nil {
fmt.Fprintf(&buf, " (trying <a href='https://go-review.googlesource.com/#/q/%s'>%s</a>)",
fmt.Fprintf(&buf, " (<a href='/try?commit=%v'>trybot set</a> for <a href='https://go-review.googlesource.com/#/q/%s'>%s</a>)",
ts.Commit[:8],
ts.ChangeID, ts.ChangeID[:8])
}
if st.done.IsZero() {
buf.WriteString(", running")
fmt.Fprintf(&buf, "; <a href='%s'>build log</a>; %s", st.logsURLLocked(), html.EscapeString(st.bc.String()))
} else if st.succeeded {
buf.WriteString(", succeeded")
} else {
buf.WriteString(", failed")
fmt.Fprintf(&buf, "; <a href='%s'>build log</a>; %s", st.logsURLLocked(), html.EscapeString(st.bc.String()))
}
fmt.Fprintf(&buf, "; <a href='%s'>build log</a>; %s", st.logsURLLocked(), html.EscapeString(st.bc.String()))
t := st.done
if t.IsZero() {
t = st.startTime

Просмотреть файл

@ -23,6 +23,7 @@ import (
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/gerrit"
"golang.org/x/build/internal/lru"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@ -252,6 +253,7 @@ func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTi
p.setInstanceUsed(instName, false)
return nil, err
}
bc.EnableHeartbeats()
bc.SetDescription("GCE VM: " + instName)
bc.SetCloseFunc(func() error { return p.putBuildlet(bc, typ, instName) })
return bc, nil
@ -461,18 +463,23 @@ func (p *gceBuildletPool) cleanZoneVMs(zone string) error {
// Gophers for interactive development & debugging
// (non-builder users); those are named "mote-*".
if sawDeleteAt && strings.HasPrefix(inst.Name, "buildlet-") && !p.instanceUsed(inst.Name) {
log.Printf("Deleting VM %q in zone %q from an earlier coordinator generation ...", inst.Name, zone)
deleteVM(zone, inst.Name)
if _, ok := deletedVMCache.Get(inst.Name); !ok {
log.Printf("Deleting VM %q in zone %q from an earlier coordinator generation ...", inst.Name, zone)
deleteVM(zone, inst.Name)
}
}
}
return nil
}
var deletedVMCache = lru.New(100) // keyed by instName
// deleteVM starts a delete of an instance in a given zone.
//
// It either returns an operation name (if delete is pending) or the
// empty string if the instance didn't exist.
func deleteVM(zone, instName string) (operation string, err error) {
deletedVMCache.Add(instName, token{})
gceAPIGate()
op, err := computeService.Instances.Delete(projectID, zone, instName).Do()
apiErr, ok := err.(*googleapi.Error)

Просмотреть файл

@ -29,6 +29,7 @@ work, go to:
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
@ -36,6 +37,7 @@ import (
"net"
"net/http"
"sort"
"strings"
"sync"
"time"
@ -51,7 +53,7 @@ var reversePool = &reverseBuildletPool{
func init() {
go func() {
for {
time.Sleep(5 * time.Second)
time.Sleep(15 * time.Second)
reversePool.reverseHealthCheck()
}
}()
@ -84,14 +86,13 @@ func (p *reverseBuildletPool) tryToGrab(machineType string) (*buildlet.Client, e
if usable && b.inUseAs == "" {
// Found an unused match.
b.inUseAs = machineType
b.inUseTime = time.Now()
b.client.SetCloseFunc(func() error {
p.mu.Lock()
b.inUseAs = ""
b.inUseTime = time.Now()
p.mu.Unlock()
select {
case p.buildletReturned <- token{}:
default:
}
p.noteBuildletReturned()
return nil
})
return b.client, nil
@ -103,6 +104,13 @@ func (p *reverseBuildletPool) tryToGrab(machineType string) (*buildlet.Client, e
return nil, errInUse
}
func (p *reverseBuildletPool) noteBuildletReturned() {
select {
case p.buildletReturned <- token{}:
default:
}
}
// nukeBuildlet wipes out victim as a buildlet we'll ever return again,
// and closes its TCP connection in hopes that it will fix itself
// later.
@ -131,6 +139,7 @@ func (p *reverseBuildletPool) reverseHealthCheck() {
continue // skip busy buildlets
}
b.inUseAs = "health"
b.inUseTime = time.Now()
res := make(chan error, 1)
responses[b] = res
client := b.client
@ -152,6 +161,8 @@ func (p *reverseBuildletPool) reverseHealthCheck() {
continue
}
b.inUseAs = ""
b.inUseTime = time.Now()
p.noteBuildletReturned()
var err error
select {
case err = <-res:
@ -220,8 +231,15 @@ func (p *reverseBuildletPool) WriteHTMLStatus(w io.Writer) {
inUse := make(map[string]int)
inUseOther := make(map[string]int)
var machineBuf bytes.Buffer
p.mu.Lock()
for _, b := range p.buildlets {
machStatus := "<i>idle</i>"
if b.inUseAs != "" {
machStatus = "working as <b>" + b.inUseAs + "</b>"
}
fmt.Fprintf(&machineBuf, "<li>%s, %s: %s for %v</li>\n",
b.conn.RemoteAddr(), strings.Join(b.modes, ", "), machStatus, time.Since(b.inUseTime))
for _, mode := range b.modes {
if b.inUseAs != "" && b.inUseAs != "health" {
if mode == b.inUseAs {
@ -241,7 +259,7 @@ func (p *reverseBuildletPool) WriteHTMLStatus(w io.Writer) {
}
sort.Strings(modes)
io.WriteString(w, "<b>Reverse pool</b><ul>")
io.WriteString(w, "<b>Reverse pool summary</b><ul>")
if len(modes) == 0 {
io.WriteString(w, "<li>no connections</li>")
}
@ -254,6 +272,8 @@ func (p *reverseBuildletPool) WriteHTMLStatus(w io.Writer) {
}
}
io.WriteString(w, "</ul>")
fmt.Fprintf(w, "<b>Reverse pool machine detail</b><ul>%s</ul>", machineBuf.Bytes())
}
func (p *reverseBuildletPool) String() string {
@ -320,9 +340,10 @@ type reverseBuildlet struct {
modes []string
// inUseAs signifies that the buildlet is in use as the named mode.
// guarded by mutex on reverseBuildletPool.
inUseAs string
// TODO: inUseTime time.Time + more HTML status
// inUseTime is when it entered that state.
// Both are guarded by the mutex on reverseBuildletPool.
inUseAs string
inUseTime time.Time
}
func handleReverse(w http.ResponseWriter, r *http.Request) {
@ -349,18 +370,20 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("Registering reverse buildlet %s", r.RemoteAddr)
log.Printf("Registering reverse buildlet %s for modes %v", r.RemoteAddr, modes)
// The server becomes a (very simple) http client.
(&http.Response{StatusCode: 200, Proto: "HTTP/1.1"}).Write(conn)
client := buildlet.NewClient("none", buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
Transport: newRoundTripper(bufrw),
Transport: newRoundTripper(conn, bufrw),
})
client.SetDescription(fmt.Sprintf("reverse peer %s for modes %v", r.RemoteAddr, modes))
tstatus := time.Now()
status, err := client.Status()
if err != nil {
log.Printf("Reverse connection did not answer status: %v", err)
log.Printf("Reverse connection %s for modes %v did not answer status after %v: %v", r.RemoteAddr, modes, time.Since(tstatus), err)
conn.Close()
return
}
@ -376,9 +399,10 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
reversePool.mu.Lock()
defer reversePool.mu.Unlock()
b := &reverseBuildlet{
modes: modes,
client: client,
conn: conn,
modes: modes,
client: client,
conn: conn,
inUseTime: time.Now(),
}
reversePool.buildlets = append(reversePool.buildlets, b)
registerBuildlet(modes)
@ -386,8 +410,9 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
var registerBuildlet = func(modes []string) {} // test hook
func newRoundTripper(bufrw *bufio.ReadWriter) *reverseRoundTripper {
func newRoundTripper(conn net.Conn, bufrw *bufio.ReadWriter) *reverseRoundTripper {
return &reverseRoundTripper{
conn: conn,
bufrw: bufrw,
sema: make(chan bool, 1),
}
@ -398,6 +423,7 @@ func newRoundTripper(bufrw *bufio.ReadWriter) *reverseRoundTripper {
//
// Attempts at concurrent requests return an error.
type reverseRoundTripper struct {
conn net.Conn
bufrw *bufio.ReadWriter
sema chan bool
}
@ -406,29 +432,37 @@ func (c *reverseRoundTripper) RoundTrip(req *http.Request) (resp *http.Response,
// Serialize trips. It is up to callers to avoid deadlocking.
c.sema <- true
if err := req.Write(c.bufrw); err != nil {
go c.conn.Close()
<-c.sema
return nil, err
}
if err := c.bufrw.Flush(); err != nil {
go c.conn.Close()
<-c.sema
return nil, err
}
resp, err = http.ReadResponse(c.bufrw.Reader, req)
if err != nil {
go c.conn.Close()
<-c.sema
return nil, err
}
resp.Body = &reverseLockedBody{resp.Body, c.sema}
resp.Body = &reverseLockedBody{c, resp.Body, c.sema}
return resp, err
}
type reverseLockedBody struct {
rt *reverseRoundTripper
body io.ReadCloser
sema chan bool
}
func (b *reverseLockedBody) Read(p []byte) (n int, err error) {
return b.body.Read(p)
n, err = b.body.Read(p)
if err != nil && err != io.EOF {
go b.rt.conn.Close()
}
return
}
func (b *reverseLockedBody) Close() error {

Просмотреть файл

@ -37,7 +37,8 @@ func handleStatus(w http.ResponseWriter, r *http.Request) {
for _, key := range tryList {
if ts := tries[key]; ts != nil {
state := ts.state()
fmt.Fprintf(&buf, "Change-ID: %v Commit: %v\n", key.ChangeID, key.Commit)
fmt.Fprintf(&buf, "Change-ID: %v Commit: %v (<a href='/try?commit=%v'>status</a>)\n",
key.ChangeID, key.Commit, key.Commit[:8])
fmt.Fprintf(&buf, " Remain: %d, fails: %v\n", state.remain, state.failed)
for _, bs := range ts.builds {
fmt.Fprintf(&buf, " %s: running=%v\n", bs.name, bs.isRunning())
@ -51,7 +52,7 @@ func handleStatus(w http.ResponseWriter, r *http.Request) {
if errTryDeps != nil {
data.TrybotsErr = errTryDeps.Error()
} else {
data.Trybots = buf.String()
data.Trybots = template.HTML(buf.String())
}
buf.Reset()
@ -81,7 +82,7 @@ type statusData struct {
Active []*buildStatus
Recent []*buildStatus
TrybotsErr string
Trybots string
Trybots template.HTML
GCEPoolStatus template.HTML // TODO: embed template
ReversePoolStatus template.HTML // TODO: embed template
DiskFree string