diff --git a/buildlet/gce.go b/buildlet/gce.go index dfadf9a4..2c4a3e94 100644 --- a/buildlet/gce.go +++ b/buildlet/gce.go @@ -8,6 +8,7 @@ import ( "crypto/tls" "errors" "fmt" + "log" "net/http" "strings" "time" @@ -62,6 +63,12 @@ type VMOpts struct { // OnInstanceCreated optionally specifies a hook to run synchronously // after the computeService.Instances.Get call. OnGotInstanceInfo func() + + // FallbackToFullPrice optionally specifies a hook to return a new + // GCE instance name if the first one failed to launch + // as preemptible. (If you use the same name, GCE complains about + // resources already existing, even if it failed to be created) + FallbackToFullPrice func() (newInstname string) } // StartNewVM boots a new VM on GCE and returns a buildlet client @@ -85,7 +92,7 @@ func StartNewVM(ts oauth2.TokenSource, instName, builderType string, opts VMOpts return nil, errors.New("buildlet: missing required ProjectID option") } - usePreempt := true + usePreempt := false Try: prefix := "https://www.googleapis.com/compute/v1/projects/" + projectID machType := prefix + "/zones/" + zone + "/machineTypes/" + conf.MachineType() @@ -94,6 +101,19 @@ Try: diskType = "" // a spinning disk } + // Request an IP address if this is a world-facing buildlet. + var accessConfigs []*compute.AccessConfig + // TODO(bradfitz): remove the "true ||" part once we figure out why the buildlet + // never boots without an IP address. Userspace seems to hang before we get to the buildlet? + if true || !opts.TLS.IsZero() { + accessConfigs = []*compute.AccessConfig{ + &compute.AccessConfig{ + Type: "ONE_TO_ONE_NAT", + Name: "External NAT", + }, + } + } + instance := &compute.Instance{ Name: instName, Description: opts.Description, @@ -120,13 +140,8 @@ Try: Metadata: &compute.Metadata{}, NetworkInterfaces: []*compute.NetworkInterface{ &compute.NetworkInterface{ - AccessConfigs: []*compute.AccessConfig{ - &compute.AccessConfig{ - Type: "ONE_TO_ONE_NAT", - Name: "External NAT", - }, - }, - Network: prefix + "/global/networks/default", + AccessConfigs: accessConfigs, + Network: prefix + "/global/networks/default", }, }, Scheduling: &compute.Scheduling{ @@ -188,8 +203,12 @@ OpLoop: case "DONE": if op.Error != nil { for _, operr := range op.Error.Errors { - if operr.Code == "ZONE_RESOURCE_POOL_EXHAUSTED" && usePreempt { + log.Printf("failed to create instance %s in zone %s: %v", instName, zone, operr.Code) + if operr.Code == "ZONE_RESOURCE_POOL_EXHAUSTED" && usePreempt && opts.FallbackToFullPrice != nil { + oldName := instName usePreempt = false + instName = opts.FallbackToFullPrice() + log.Printf("buildlet/gce: retrying without preempt with name %q (previously: %q)", instName, oldName) goto Try } // TODO: catch Code=="QUOTA_EXCEEDED" and "Message" and return diff --git a/cmd/buildlet/buildlet.go b/cmd/buildlet/buildlet.go index e6364a1e..196a8a75 100644 --- a/cmd/buildlet/buildlet.go +++ b/cmd/buildlet/buildlet.go @@ -45,7 +45,7 @@ import ( var ( haltEntireOS = flag.Bool("halt", true, "halt OS in /halt handler. If false, the buildlet process just ends.") workDir = flag.String("workdir", "", "Temporary directory to use. The contents of this directory may be deleted at any time. If empty, TempDir is used to create one.") - listenAddr = flag.String("listen", defaultListenAddr(), "address to listen on. Unused in reverse mode. Warning: this service is inherently insecure and offers no protection of its own. Do not expose this port to the world.") + listenAddr = flag.String("listen", "AUTO", "address to listen on. Unused in reverse mode. Warning: this service is inherently insecure and offers no protection of its own. Do not expose this port to the world.") reverse = flag.String("reverse", "", "if non-empty, go into reverse mode where the buildlet dials the coordinator instead of listening for connections. The value is a comma-separated list of modes, e.g. 'darwin-arm,darwin-amd64-race'") coordinator = flag.String("coordinator", "localhost:8119", "address of coordinator, in production use farmer.golang.org. Only used in reverse mode.") ) @@ -79,8 +79,15 @@ func main() { log.SetOutput(w) } } + log.Printf("buildlet starting.") flag.Parse() + if *listenAddr == "AUTO" { + v := defaultListenAddr() + log.Printf("Will listen on %s", v) + *listenAddr = v + } + onGCE := metadata.OnGCE() if !onGCE && !strings.HasPrefix(*listenAddr, "localhost:") { log.Printf("** WARNING *** This server is unsafe and offers no security. Be careful.") diff --git a/cmd/coordinator/buildongce/create.go b/cmd/coordinator/buildongce/create.go index 8d73c4ba..55946e7b 100644 --- a/cmd/coordinator/buildongce/create.go +++ b/cmd/coordinator/buildongce/create.go @@ -22,7 +22,7 @@ import ( var ( proj = flag.String("project", "symbolic-datum-552", "name of Project") - zone = flag.String("zone", "us-central1-a", "GCE zone") + zone = flag.String("zone", "us-central1-b", "GCE zone") mach = flag.String("machinetype", "n1-highcpu-2", "Machine type") instName = flag.String("instance_name", "farmer", "Name of VM instance.") sshPub = flag.String("ssh_public_key", "", "ssh public key file to authorize. Can modify later in Google's web UI anyway.") diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go index 0aac8868..d7c9ccfd 100644 --- a/cmd/coordinator/coordinator.go +++ b/cmd/coordinator/coordinator.go @@ -35,6 +35,8 @@ import ( "golang.org/x/build/buildlet" "golang.org/x/build/dashboard" "golang.org/x/build/gerrit" + "golang.org/x/build/internal/lru" + "golang.org/x/build/internal/singleflight" "golang.org/x/build/types" "google.golang.org/cloud/storage" ) @@ -90,7 +92,7 @@ var tryBuilders []dashboard.BuildConfig func init() { tryList := []string{ - "all-compile", + "misc-compile", "darwin-amd64-10_10", "linux-386", "linux-amd64", @@ -573,7 +575,8 @@ func workaroundFlush(w http.ResponseWriter) { func findWorkLoop(work chan<- builderRev) { // Useful for debugging a single run: if devCluster && false { - work <- builderRev{name: "linux-amd64-race", rev: "54789eff385780c54254f822e09505b6222918e2"} + work <- builderRev{name: "linux-amd64", rev: "54789eff385780c54254f822e09505b6222918e2"} + work <- builderRev{name: "windows-amd64-gce", rev: "54789eff385780c54254f822e09505b6222918e2"} return } ticker := time.NewTicker(15 * time.Second) @@ -780,12 +783,17 @@ func (ts *trySet) notifyStarting() { msg := "TryBots beginning. Status page: http://farmer.golang.org/try?commit=" + ts.Commit[:8] if ci, err := gerritClient.GetChangeDetail(ts.ChangeID); err == nil { + if len(ci.Messages) == 0 { + log.Printf("No Gerrit comments retrieved on %v", ts.ChangeID) + } for _, cmi := range ci.Messages { - if cmi.Message == msg { + if strings.Contains(cmi.Message, msg) { // Dup. Don't spam. return } } + } else { + log.Printf("Error getting Gerrit comments on %s: %v", ts.ChangeID, err) } // Ignore error. This isn't critical. @@ -1137,7 +1145,7 @@ func (st *buildStatus) onceInitHelpersFunc() { st.helpers = GetBuildlets(st.donec, pool, st.conf.NumTestHelpers, st.buildletType(), st.rev, st) } -func (st *buildStatus) build() (retErr error) { +func (st *buildStatus) build() error { pool, err := st.buildletPool() if err != nil { return err @@ -1153,20 +1161,6 @@ func (st *buildStatus) build() (retErr error) { st.mu.Unlock() st.logEventTime("got_buildlet", bc.IPPort()) - goodRes := func(res *http.Response, err error, what string) bool { - if err != nil { - retErr = fmt.Errorf("%s: %v", what, err) - return false - } - if res.StatusCode/100 != 2 { - slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10)) - retErr = fmt.Errorf("%s: %v; body: %s", what, res.Status, slurp) - res.Body.Close() - return false - - } - return true - } // Write the VERSION file. st.logEventTime("start_write_version_tar") @@ -1174,19 +1168,15 @@ func (st *buildStatus) build() (retErr error) { return fmt.Errorf("writing VERSION tgz: %v", err) } - // Feed the buildlet a tar file for it to extract. - // TODO: cache these. - st.logEventTime("start_fetch_gerrit_tgz") - tarRes, err := http.Get("https://go.googlesource.com/go/+archive/" + st.rev + ".tar.gz") - if !goodRes(tarRes, err, "fetching tarball from Gerrit") { - return - } - var grp syncutil.Group grp.Go(func() error { + st.logEventTime("fetch_go_tar") + tarReader, err := getSourceTgz(st, "go", st.rev) + if err != nil { + return err + } st.logEventTime("start_write_go_tar") - if err := bc.PutTar(tarRes.Body, "go"); err != nil { - tarRes.Body.Close() + if err := bc.PutTar(tarReader, "go"); err != nil { return fmt.Errorf("writing tarball from Gerrit: %v", err) } st.logEventTime("end_write_go_tar") @@ -1356,8 +1346,7 @@ func (st *buildStatus) distTestList() (names []string, err error) { func (st *buildStatus) newTestSet(names []string) *testSet { set := &testSet{ - st: st, - retryc: make(chan *testItem, len(names)), + st: st, } for _, name := range names { set.items = append(set.items, &testItem{ @@ -1635,12 +1624,16 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err // lumpy. The rest of the buildlets run the largest tests // first (critical path scheduling). go func() { - goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries - for tis := range set.itemsInOrder() { + for { + tis, ok := set.testsToRunInOrder() + if !ok { + st.logEventTime("in_order_tests_complete") + return + } + goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries st.runTestsOnBuildlet(st.bc, tis, goroot) } }() - helperWork := set.itemsBiggestFirst() go func() { for helper := range helpers { go func(bc *buildlet.Client) { @@ -1660,9 +1653,14 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err) return } - goroot := st.conf.FilePathJoin(workDir, "go") st.logEventTime("setup_helper", bc.IPPort()) - for tis := range helperWork { + goroot := st.conf.FilePathJoin(workDir, "go") + for { + tis, ok := set.testsToRunBiggestFirst() + if !ok { + st.logEventTime("biggest_tests_complete", bc.IPPort()) + return + } st.runTestsOnBuildlet(bc, tis, goroot) } }(helper) @@ -1672,7 +1670,15 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err var lastBanner string var serialDuration time.Duration for _, ti := range set.items { - <-ti.done // wait for success + AwaitDone: + for { + select { + case <-ti.done: // wait for success + break AwaitDone + case <-time.After(30 * time.Second): + st.logEventTime("still_waiting_on_test", ti.name) + } + } serialDuration += ti.execDuration if len(ti.output) > 0 { @@ -1818,11 +1824,9 @@ type testSet struct { st *buildStatus items []*testItem - // retryc communicates failures to watch a test. The channel is - // never closed. Sends should also select on reading st.donec - // to see if the things have stopped early due to another test - // failing and aborting the build. - retryc chan *testItem + mu sync.Mutex + inOrder [][]*testItem + biggestFirst [][]*testItem } // cancelAll cancels all pending tests. @@ -1832,76 +1836,72 @@ func (s *testSet) cancelAll() { } } -// itemsInOrder returns a channel of items mostly in their original order. -// The exception is that an item which fails to execute may happen later -// in a different order. -// Each item sent in the channel has been took. (ti.tryTake returned true) -// The returned channel is closed when no items remain. -func (s *testSet) itemsInOrder() <-chan []*testItem { - return s.itemChan(s.items) +func (s *testSet) testsToRunInOrder() (chunk []*testItem, ok bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.inOrder == nil { + s.initInOrder() + } + return s.testsFromSlice(s.inOrder) } -func (s *testSet) itemsBiggestFirst() <-chan []*testItem { - items := append([]*testItem(nil), s.items...) - sort.Sort(sort.Reverse(byTestDuration(items))) - return s.itemChan(items) +func (s *testSet) testsToRunBiggestFirst() (chunk []*testItem, ok bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.biggestFirst == nil { + s.initBiggestFirst() + } + return s.testsFromSlice(s.biggestFirst) } -// itemChan returns a channel which yields the provided items, usually -// in the same order given items, but grouped with others tests they -// should be run with. (only stdlib tests are are grouped) -func (s *testSet) itemChan(items []*testItem) <-chan []*testItem { - names := make([]string, len(items)) +func (s *testSet) testsFromSlice(chunkList [][]*testItem) (chunk []*testItem, ok bool) { + for _, candChunk := range chunkList { + for _, ti := range candChunk { + if ti.tryTake() { + chunk = append(chunk, ti) + } + } + if len(chunk) > 0 { + return chunk, true + } + } + return nil, false +} + +func (s *testSet) initInOrder() { + names := make([]string, len(s.items)) namedItem := map[string]*testItem{} - for i, ti := range items { + for i, ti := range s.items { names[i] = ti.name namedItem[ti.name] = ti } + + // First do the go_test:* ones. partitionGoTests + // only returns those, which are the ones we merge together. stdSets := partitionGoTests(names) - setForTest := map[string][]*testItem{} for _, set := range stdSets { tis := make([]*testItem, len(set)) for i, name := range set { tis[i] = namedItem[name] - setForTest[name] = tis } + s.inOrder = append(s.inOrder, tis) } - ch := make(chan []*testItem) - go func() { - defer close(ch) - for _, ti := range items { - if !ti.tryTake() { - continue - } - send := []*testItem{ti} - for _, other := range setForTest[ti.name] { - if other != ti && other.tryTake() { - send = append(send, other) - } - } - select { - case ch <- send: - case <-s.st.donec: - return - } + // Then do the misc tests, which are always by themselves. + // (No benefit to merging them) + for _, ti := range s.items { + if !strings.HasPrefix(ti.name, "go_test:") { + s.inOrder = append(s.inOrder, []*testItem{ti}) } - for { - select { - case ti := <-s.retryc: - if ti.tryTake() { - select { - case ch <- []*testItem{ti}: - case <-s.st.donec: - return - } - } - case <-s.st.donec: - return - } - } - }() - return ch + } +} + +func (s *testSet) initBiggestFirst() { + items := append([]*testItem(nil), s.items...) + sort.Sort(sort.Reverse(byTestDuration(items))) + for _, item := range items { + s.biggestFirst = append(s.biggestFirst, []*testItem{item}) + } } type testItem struct { @@ -1950,14 +1950,6 @@ func (ti *testItem) isDone() bool { func (ti *testItem) retry() { // release it to make it available for somebody else to try later: <-ti.take - - // Enqueue this test to retry, unless the build is - // only proceeding to the first failure and it's - // already failed. - select { - case ti.set.retryc <- ti: - case <-ti.set.st.donec: - } } type byTestDuration []*testItem @@ -2104,7 +2096,9 @@ func (st *buildStatus) writeEventsLocked(w io.Writer, htmlMode bool) { } fmt.Fprintf(w, " %7s %v %s %s\n", elapsed, evt.t.Format(time.RFC3339), e, text) } - fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds())) + if st.isRunningLocked() { + fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds())) + } } @@ -2214,4 +2208,75 @@ func versionTgz(rev string) io.Reader { return bytes.NewReader(buf.Bytes()) } +var sourceGroup singleflight.Group + +var sourceCache = lru.New(20) // git rev -> []byte + +// repo is go.googlesource.com repo ("go", "net", etc) +// rev is git revision. +func getSourceTgz(el eventTimeLogger, repo, rev string) (tgz io.Reader, err error) { + fromCache := false + vi, err, shared := sourceGroup.Do(rev, func() (interface{}, error) { + if tgzBytes, ok := sourceCache.Get(rev); ok { + fromCache = true + return tgzBytes, nil + } + + for i := 0; i < 10; i++ { + el.logEventTime("fetching_source", "from watcher") + tgzBytes, err := getSourceTgzFromWatcher(repo, rev) + if err == nil { + sourceCache.Add(rev, tgzBytes) + return tgzBytes, nil + } + log.Printf("Error fetching source %s/%s from watcher (after %v uptime): %v", + repo, rev, time.Since(processStartTime), err) + // Wait for watcher to start up. Give it a minute until + // we try Gerrit. + time.Sleep(6 * time.Second) + } + + el.logEventTime("fetching_source", "from gerrit") + tgzBytes, err := getSourceTgzFromGerrit(repo, rev) + if err == nil { + sourceCache.Add(rev, tgzBytes) + } + return tgzBytes, err + }) + if err != nil { + return nil, err + } + el.logEventTime("got_source", fmt.Sprintf("cache=%v shared=%v", fromCache, shared)) + return bytes.NewReader(vi.([]byte)), nil +} + +func getSourceTgzFromGerrit(repo, rev string) (tgz []byte, err error) { + return getSourceTgzFromURL("gerrit", repo, rev, "https://go.googlesource.com/"+repo+"/+archive/"+rev+".tar.gz") +} + +func getSourceTgzFromWatcher(repo, rev string) (tgz []byte, err error) { + return getSourceTgzFromURL("watcher", repo, rev, "http://"+gitArchiveAddr+"/"+repo+".tar.gz?rev="+rev) +} + +func getSourceTgzFromURL(source, repo, rev, urlStr string) (tgz []byte, err error) { + res, err := http.Get(urlStr) + if err != nil { + return nil, fmt.Errorf("fetching %s/%s from %s: %v", repo, rev, source, err) + } + defer res.Body.Close() + if res.StatusCode/100 != 2 { + slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10)) + return nil, fmt.Errorf("fetching %s/%s from %s: %v; body: %s", repo, rev, source, res.Status, slurp) + } + const maxSize = 25 << 20 // seems unlikely; go source is 7.8MB on 2015-06-15 + slurp, err := ioutil.ReadAll(io.LimitReader(res.Body, maxSize+1)) + if len(slurp) > maxSize && err == nil { + err = fmt.Errorf("body over %d bytes", maxSize) + } + if err != nil { + return nil, fmt.Errorf("reading %s/%s from %s: %v", repo, rev, source, err) + } + return slurp, nil +} + var nl = []byte("\n") diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go index 418024dc..6b73ad06 100644 --- a/cmd/coordinator/gce.go +++ b/cmd/coordinator/gce.go @@ -21,6 +21,7 @@ import ( "time" "golang.org/x/build/buildlet" + "golang.org/x/build/dashboard" "golang.org/x/build/gerrit" "golang.org/x/net/context" "golang.org/x/oauth2" @@ -39,7 +40,7 @@ func init() { // apiCallTicker ticks regularly, preventing us from accidentally making // GCE API calls too quickly. Our quota is 20 QPS, but we temporarily // limit ourselves to less than that. -var apiCallTicker = time.NewTicker(time.Second / 5) +var apiCallTicker = time.NewTicker(time.Second / 10) func gceAPIGate() { <-apiCallTicker.C @@ -49,6 +50,7 @@ func gceAPIGate() { var ( projectID string projectZone string + projectRegion string computeService *compute.Service externalIP string tokenSource oauth2.TokenSource @@ -88,12 +90,15 @@ func initGCE() error { if err != nil || projectZone == "" { return fmt.Errorf("failed to get current GCE zone: %v", err) } + // Convert the zone from "projects/1234/zones/us-central1-a" to "us-central1-a". projectZone = path.Base(projectZone) if !hasComputeScope() { return errors.New("The coordinator is not running with access to read and write Compute resources. VM support disabled.") } + projectRegion = projectZone[:strings.LastIndex(projectZone, "-")] // "us-central1" + externalIP, err = metadata.ExternalIP() if err != nil { return fmt.Errorf("ExternalIP: %v", err) @@ -109,9 +114,8 @@ func initGCE() error { devCluster = projectID == "go-dashboard-dev" if devCluster { log.Printf("Running in dev cluster") - gcePool.vmCap = make(chan bool, 5) } - + go gcePool.pollQuotaLoop() return nil } @@ -134,38 +138,72 @@ func checkTryBuildDeps() error { return nil } -// We artifically limit ourselves to 60 VMs right now, assuming that -// each takes 2 CPU, and we have a current quota of 200 CPUs. That -// gives us headroom, but also doesn't account for SSD or memory -// quota. -// TODO(bradfitz): better quota system. -const maxVMs = 60 +var gcePool = &gceBuildletPool{} -var gcePool = &gceBuildletPool{ - vmCap: make(chan bool, maxVMs), -} var _ BuildletPool = (*gceBuildletPool)(nil) -type gceBuildletPool struct { - // vmCap is a semaphore used to limit the number of VMs in - // use. - vmCap chan bool +// maxInstances is a temporary hack because we can't get buildlets to boot +// without IPs, and we only have 200 IP addresses. +// TODO(bradfitz): remove this once fixed. +const maxInstances = 190 - mu sync.Mutex - instUsed map[string]time.Time // GCE VM instance name -> creationTime +type gceBuildletPool struct { + mu sync.Mutex // guards all following + + disabled bool + + // CPU quota usage & limits. + cpuLeft int // dead-reckoning CPUs remain + instLeft int // dead-reckoning instances remain + instUsage int + cpuUsage int + addrUsage int + inst map[string]time.Time // GCE VM instance name -> creationTime +} + +func (p *gceBuildletPool) pollQuotaLoop() { + for { + p.pollQuota() + time.Sleep(5 * time.Second) + } +} + +func (p *gceBuildletPool) pollQuota() { + gceAPIGate() + reg, err := computeService.Regions.Get(projectID, projectRegion).Do() + if err != nil { + log.Printf("Failed to get quota for %s/%s: %v", projectID, projectRegion, err) + return + } + p.mu.Lock() + defer p.mu.Unlock() + for _, quota := range reg.Quotas { + switch quota.Metric { + case "CPUS": + p.cpuLeft = int(quota.Limit) - int(quota.Usage) + p.cpuUsage = int(quota.Usage) + case "INSTANCES": + p.instLeft = int(quota.Limit) - int(quota.Usage) + p.instUsage = int(quota.Usage) + case "IN_USE_ADDRESSES": + p.addrUsage = int(quota.Usage) + } + } } func (p *gceBuildletPool) SetEnabled(enabled bool) { - if enabled { - p.vmCap = make(chan bool, maxVMs) - } else { - p.vmCap = make(chan bool) - } + p.mu.Lock() + defer p.mu.Unlock() + p.disabled = !enabled } func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) { el.logEventTime("awaiting_gce_quota") - if err := p.awaitVMCountQuota(cancel); err != nil { + conf, ok := dashboard.Builders[typ] + if !ok { + return nil, fmt.Errorf("gcepool: unknown buildlet type %q", typ) + } + if err := p.awaitVMCountQuota(cancel, conf.GCENumCPU()); err != nil { return nil, err } @@ -184,10 +222,18 @@ func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTi Description: fmt.Sprintf("Go Builder for %s at %s", typ, rev), DeleteIn: vmDeleteTimeout, OnInstanceRequested: func() { - needDelete = true - el.logEventTime("instance_create_requested") + el.logEventTime("instance_create_requested", instName) log.Printf("GCE VM %q now booting", instName) }, + FallbackToFullPrice: func() string { + el.logEventTime("gce_fallback_to_full_price", "for "+instName) + p.setInstanceUsed(instName, false) + newName := instName + "-f" + log.Printf("Gave up on preemptible %q; now booting %q", instName, newName) + instName = newName + p.setInstanceUsed(instName, true) + return newName + }, OnInstanceCreated: func() { el.logEventTime("instance_created") needDelete = true // redundant with OnInstanceRequested one, but fine. @@ -197,26 +243,39 @@ func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTi }, }) if err != nil { + el.logEventTime("gce_buildlet_create_failure", fmt.Sprintf("%s: %v", instName, err)) log.Printf("Failed to create VM for %s, %s: %v", typ, rev, err) if needDelete { deleteVM(projectZone, instName) + p.putVMCountQuota(conf.GCENumCPU()) } p.setInstanceUsed(instName, false) - p.putVMCountQuota() return nil, err } bc.SetDescription("GCE VM: " + instName) - bc.SetCloseFunc(func() error { - deleteVM(projectZone, instName) - p.setInstanceUsed(instName, false) - p.putVMCountQuota() - return nil - }) + bc.SetCloseFunc(func() error { return p.putBuildlet(bc, typ, instName) }) return bc, nil } +func (p *gceBuildletPool) putBuildlet(bc *buildlet.Client, typ, instName string) error { + // TODO(bradfitz): add the buildlet to a freelist (of max N + // items) for up to 10 minutes since when it got started if + // it's never seen a command execution failure, and we can + // wipe all its disk content. (perhaps wipe its disk content when + // it's retrieved, not put back on the freelist) + deleteVM(projectZone, instName) + p.setInstanceUsed(instName, false) + + conf, ok := dashboard.Builders[typ] + if !ok { + panic("failed to lookup conf") // should've worked if we did it before + } + p.putVMCountQuota(conf.GCENumCPU()) + return nil +} + func (p *gceBuildletPool) WriteHTMLStatus(w io.Writer) { - fmt.Fprintf(w, "GCE pool capacity: %d/%d", len(p.vmCap), cap(p.vmCap)) + fmt.Fprintf(w, "GCE pool capacity: %s", p.capacityString()) const show = 6 // must be even active := p.instancesActive() if len(active) > 0 { @@ -233,44 +292,84 @@ func (p *gceBuildletPool) WriteHTMLStatus(w io.Writer) { } func (p *gceBuildletPool) String() string { - return fmt.Sprintf("GCE pool capacity: %d/%d", len(p.vmCap), cap(p.vmCap)) + return fmt.Sprintf("GCE pool capacity: %s", p.capacityString()) } -// awaitVMCountQuota waits for quota. -func (p *gceBuildletPool) awaitVMCountQuota(cancel Cancel) error { - select { - case p.vmCap <- true: - return nil - case <-cancel: - return ErrCanceled +func (p *gceBuildletPool) capacityString() string { + p.mu.Lock() + defer p.mu.Unlock() + return fmt.Sprintf("%d/%d instances; %d/%d CPUs", + len(p.inst), p.instUsage+p.instLeft, + p.cpuUsage, p.cpuUsage+p.cpuLeft) +} + +// awaitVMCountQuota waits for numCPU CPUs of quota to become available, +// or returns ErrCanceled. +func (p *gceBuildletPool) awaitVMCountQuota(cancel Cancel, numCPU int) error { + // Poll every 2 seconds, which could be better, but works and + // is simple. + for { + if p.tryAllocateQuota(numCPU) { + return nil + } + select { + case <-time.After(2 * time.Second): + case <-cancel: + return ErrCanceled + } } } -func (p *gceBuildletPool) putVMCountQuota() { <-p.vmCap } + +func (p *gceBuildletPool) tryAllocateQuota(numCPU int) bool { + p.mu.Lock() + defer p.mu.Unlock() + if p.disabled { + return false + } + if p.cpuLeft >= numCPU && p.instLeft >= 1 && len(p.inst) < maxInstances && p.addrUsage < maxInstances { + p.cpuUsage += numCPU + p.cpuLeft -= numCPU + p.instLeft-- + p.addrUsage++ + return true + } + return false +} + +// putVMCountQuota adjusts the dead-reckoning of our quota usage by +// one instance and cpu CPUs. +func (p *gceBuildletPool) putVMCountQuota(cpu int) { + p.mu.Lock() + defer p.mu.Unlock() + p.cpuUsage -= cpu + p.cpuLeft += cpu + p.instLeft++ +} func (p *gceBuildletPool) setInstanceUsed(instName string, used bool) { p.mu.Lock() defer p.mu.Unlock() - if p.instUsed == nil { - p.instUsed = make(map[string]time.Time) + if p.inst == nil { + p.inst = make(map[string]time.Time) } if used { - p.instUsed[instName] = time.Now() + p.inst[instName] = time.Now() } else { - delete(p.instUsed, instName) + delete(p.inst, instName) } } func (p *gceBuildletPool) instanceUsed(instName string) bool { p.mu.Lock() defer p.mu.Unlock() - _, ok := p.instUsed[instName] + _, ok := p.inst[instName] return ok } func (p *gceBuildletPool) instancesActive() (ret []instanceTime) { p.mu.Lock() defer p.mu.Unlock() - for name, create := range p.instUsed { + for name, create := range p.inst { ret = append(ret, instanceTime{ name: name, creation: create, diff --git a/cmd/coordinator/watcher.go b/cmd/coordinator/watcher.go index b5f9fd9b..9fdea4c3 100644 --- a/cmd/coordinator/watcher.go +++ b/cmd/coordinator/watcher.go @@ -32,6 +32,8 @@ type watchConfig struct { dash string // "https://build.golang.org/" (must end in /) interval time.Duration // Polling interval mirrorBase string // "https://github.com/golang/" or empty to disable mirroring + netHost bool // run docker container in the host's network namespace + httpAddr string } type imageInfo struct { @@ -45,12 +47,20 @@ var images = map[string]*imageInfo{ "go-commit-watcher": {url: "https://storage.googleapis.com/go-builder-data/docker-commit-watcher.tar.gz"}, } +const gitArchiveAddr = "127.0.0.1:21536" // 21536 == keys above WATCH + func startWatchers() { mirrorBase := "https://github.com/golang/" if devCluster { mirrorBase = "" // don't mirror from dev cluster } - addWatcher(watchConfig{repo: "https://go.googlesource.com/go", dash: dashBase(), mirrorBase: mirrorBase}) + addWatcher(watchConfig{ + repo: "https://go.googlesource.com/go", + dash: dashBase(), + mirrorBase: mirrorBase, + netHost: true, + httpAddr: gitArchiveAddr, + }) addWatcher(watchConfig{repo: "https://go.googlesource.com/gofrontend", dash: dashBase() + "gccgo/"}) go cleanUpOldContainers() @@ -94,12 +104,16 @@ func (conf watchConfig) dockerRunArgs() (args []string) { args = append(args, "-v", tmpKey+":/.gobuildkey") args = append(args, "-v", tmpKey+":/root/.gobuildkey") } + if conf.netHost { + args = append(args, "--net=host") + } args = append(args, "go-commit-watcher", "/usr/local/bin/watcher", "-repo="+conf.repo, "-dash="+conf.dash, "-poll="+conf.interval.String(), + "-http="+conf.httpAddr, ) if conf.mirrorBase != "" { dst, err := url.Parse(conf.mirrorBase) @@ -191,8 +205,13 @@ func startWatching(conf watchConfig) (err error) { // Start a goroutine to wait for the watcher to die. go func() { exec.Command("docker", "wait", container).Run() + out, _ := exec.Command("docker", "logs", container).CombinedOutput() exec.Command("docker", "rm", "-v", container).Run() - log.Printf("Watcher %v crashed. Restarting soon.", conf.repo) + const maxLogBytes = 1 << 10 + if len(out) > maxLogBytes { + out = out[len(out)-maxLogBytes:] + } + log.Printf("Watcher %v crashed. Restarting soon. Logs: %s", conf.repo, out) restartWatcherSoon(conf) }() return nil diff --git a/cmd/watcher/watcher.go b/cmd/watcher/watcher.go index 50954d07..ace1b367 100644 --- a/cmd/watcher/watcher.go +++ b/cmd/watcher/watcher.go @@ -16,6 +16,7 @@ import ( "io" "io/ioutil" "log" + "net" "net/http" "net/url" "os" @@ -24,6 +25,7 @@ import ( "path/filepath" "runtime" "sort" + "strconv" "strings" "sync" "time" @@ -44,6 +46,7 @@ var ( network = flag.Bool("network", true, "Enable network calls (disable for testing)") mirrorBase = flag.String("mirror", "", `Mirror repository base URL (eg "https://github.com/golang/")`) filter = flag.String("filter", "", "Comma-separated list of directories or files to watch for new commits (only works on main repo)") + httpAddr = flag.String("http", "", "If non-empty, the listen address to run an HTTP server on") ) var ( @@ -80,6 +83,14 @@ func run() error { } defer os.RemoveAll(dir) + if *httpAddr != "" { + ln, err := net.Listen("tcp", *httpAddr) + if err != nil { + return err + } + go http.Serve(ln, nil) + } + errc := make(chan error) go func() { @@ -88,11 +99,13 @@ func run() error { name := (*repoURL)[strings.LastIndex(*repoURL, "/")+1:] dst = *mirrorBase + name } + name := strings.TrimPrefix(*repoURL, goBase) r, err := NewRepo(dir, *repoURL, dst, "") if err != nil { errc <- err return } + http.Handle("/"+name+".tar.gz", r) errc <- r.Watch() }() @@ -113,6 +126,7 @@ func run() error { errc <- err return } + http.Handle("/"+name+".tar.gz", r) errc <- r.Watch() }(path) } @@ -276,6 +290,9 @@ func (r *Repo) postChildren(b *Branch, parent *Commit) error { continue } if err := r.postCommit(c); err != nil { + if strings.Contains(err.Error(), "this package already has a first commit; aborting") { + return nil + } return err } } @@ -653,6 +670,28 @@ func (r *Repo) push() error { }) } +func (r *Repo) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "GET" && req.Method != "HEAD" { + w.WriteHeader(http.StatusBadRequest) + return + } + rev := req.FormValue("rev") + if rev == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + cmd := exec.Command("git", "archive", "--format=tgz", rev) + cmd.Dir = r.root + tgz, err := cmd.Output() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Length", strconv.Itoa(len(tgz))) + w.Header().Set("Content-Type", "application/x-compressed") + w.Write(tgz) +} + func try(n int, fn func() error) error { var err error for tries := 0; tries < n; tries++ { diff --git a/dashboard/builders.go b/dashboard/builders.go index 792a9513..41a484ea 100644 --- a/dashboard/builders.go +++ b/dashboard/builders.go @@ -6,7 +6,10 @@ // pieces of the Go continuous build system. package dashboard -import "strings" +import ( + "strconv" + "strings" +) // Builders are the different build configurations. // The keys are like "darwin-amd64" or "linux-386-387". @@ -38,16 +41,18 @@ type BuildConfig struct { // request from the buildlet pool. If empty, it defaults to // the value of Name. // - // Note: we should probably start using this mechanism for - // more builder types, which combined with buildlet reuse - // could reduce latency. (e.g. "linux-386-387", "linux-amd64", - // and "linux-amd64-race" all sharing same buildlet and - // machine type, and able to jump onto each others - // buidlets... they vary only in env/args). For now we're - // only using this for ARM trybots. + // These should be used to minimize builder types, so the buildlet pool + // implementations can reuse buildlets from similar-enough builds. + // (e.g. a shared linux-386 trybot can be reused for some linux-amd64 + // or linux-amd64-race tests, etc) + // + // TODO(bradfitz): break BuildConfig up into BuildConfig and + // BuildletConfig and have a BuildConfig refer to a + // BuildletConfig. There's no much confusion now. BuildletType string - env []string // extra environment ("key=value") pairs + env []string // extra environment ("key=value") pairs + allScriptArgs []string } func (c *BuildConfig) Env() []string { return append([]string(nil), c.env...) } @@ -123,7 +128,7 @@ func (c *BuildConfig) AllScript() string { if strings.HasPrefix(c.Name, "darwin-arm") { return "src/iostest.bash" } - if c.Name == "all-compile" { + if c.Name == "misc-compile" { return "src/buildall.bash" } return "src/all.bash" @@ -136,6 +141,19 @@ func (c *BuildConfig) AllScript() string { // but for now we've only set up the scripts and verified that the main // configurations work. func (c *BuildConfig) SplitMakeRun() bool { + if strings.HasPrefix(c.Name, "linux-arm") { + // On Scaleway, we don't want to snapshot these to GCS + // yet. That might be a lot of bandwidth and we + // haven't measure their speed yet. We might want to + // store snapshots within Scaleway instead. For now: + // use the old way. + return false + } + if strings.HasPrefix(c.Name, "darwin-") { + // TODO(bradfitz,crawshaw,adg): this is failing for some reason: + // Error: runTests: distTestList: Remote error: fork/exec /var/folders/5h/sqs3zkxd12zclcslj67vccqh0000gp/T/buildlet-scatch190808745/go/bin/go: no such file or directory + return false + } switch c.AllScript() { case "src/all.bash", "src/all.bat", "src/all.rc": // These we've verified to work. @@ -151,7 +169,7 @@ func (c *BuildConfig) AllScriptArgs() []string { if strings.HasPrefix(c.Name, "darwin-arm") { return []string{"-restart"} } - return nil + return append([]string(nil), c.allScriptArgs...) } // MakeScript returns the relative path to the operating system's script to @@ -217,6 +235,13 @@ func (c BuildConfig) ShortOwner() string { return strings.TrimSuffix(c.Owner, "@golang.org") } +// GCENumCPU reports the number of GCE CPUs this buildlet requires. +func (c *BuildConfig) GCENumCPU() int { + t := c.MachineType() + n, _ := strconv.Atoi(t[strings.LastIndex(t, "-")+1:]) + return n +} + func init() { addBuilder(BuildConfig{ Name: "freebsd-amd64-gce93", @@ -242,8 +267,9 @@ func init() { env: []string{"CC=clang"}, }) addBuilder(BuildConfig{ - Name: "freebsd-386-gce101", - VMImage: "freebsd-amd64-gce101", + Name: "freebsd-386-gce101", + VMImage: "freebsd-amd64-gce101", + //BuildletType: "freebsd-amd64-gce101", machineType: "n1-highcpu-2", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.freebsd-amd64", Go14URL: "https://storage.googleapis.com/go-builder-data/go1.4-freebsd-amd64.tar.gz", @@ -256,16 +282,18 @@ func init() { env: []string{"GOARCH=386", "CC=clang"}, }) addBuilder(BuildConfig{ - Name: "linux-386", - VMImage: "linux-buildlet-std", + Name: "linux-386", + VMImage: "linux-buildlet-std", + //BuildletType: "linux-amd64", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64", env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GOARCH=386", "GOHOSTARCH=386"}, NumTestHelpers: 3, }) addBuilder(BuildConfig{ - Name: "linux-386-387", - Notes: "GO386=387", - VMImage: "linux-buildlet-std", + Name: "linux-386-387", + Notes: "GO386=387", + VMImage: "linux-buildlet-std", + //BuildletType: "linux-amd64", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64", env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GOARCH=386", "GOHOSTARCH=386", "GO386=387"}, }) @@ -276,13 +304,18 @@ func init() { NumTestHelpers: 3, }) addBuilder(BuildConfig{ - Name: "all-compile", - TryOnly: true, // TODO: for speed, restrict this to builds not covered by other trybots + Name: "misc-compile", + TryOnly: true, VMImage: "linux-buildlet-std", machineType: "n1-highcpu-16", // CPU-bound, uses it well. - Notes: "Runs buildall.sh to compile stdlib for all GOOS/GOARCH, but doesn't run any tests.", + Notes: "Runs buildall.sh to compile stdlib for GOOS/GOARCH pairs not otherwise covered by trybots, but doesn't run any tests.", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64", env: []string{"GOROOT_BOOTSTRAP=/go1.4"}, + allScriptArgs: []string{ + // Filtering pattern to buildall.bash: + // TODO: add darwin-386 and + "^(linux-arm64|linux-ppc64|linux-ppc64le|nacl-arm|plan9-amd64|solaris-amd64|netbsd-386|netbsd-amd64|netbsd-arm|freebsd-arm|darwin-386)$", + }, }) addBuilder(BuildConfig{ Name: "linux-amd64-nocgo", @@ -301,7 +334,8 @@ func init() { Name: "linux-amd64-noopt", Notes: "optimizations and inlining disabled", VMImage: "linux-buildlet-std", - env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GO_GCFLAGS=-N -l"}, + //BuildletType: "linux-amd64", + env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GO_GCFLAGS=-N -l"}, }) addBuilder(BuildConfig{ Name: "linux-amd64-race", @@ -311,8 +345,9 @@ func init() { // TODO(bradfitz): make race.bash shardable, then: NumTestHelpers: 3 }) addBuilder(BuildConfig{ - Name: "linux-386-clang", - VMImage: "linux-buildlet-clang", + Name: "linux-386-clang", + VMImage: "linux-buildlet-clang", + //BuildletType: "linux-amd64-clang", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64", env: []string{"GOROOT_BOOTSTRAP=/go1.4", "CC=/usr/bin/clang", "GOHOSTARCH=386"}, }) @@ -420,6 +455,7 @@ func init() { VMImage: "linux-buildlet-nacl-v2", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64", env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GOOS=nacl", "GOARCH=386", "GOHOSTOS=linux", "GOHOSTARCH=amd64"}, + //BuildletType: "nacl-amd64p32", }) addBuilder(BuildConfig{ Name: "nacl-amd64p32", @@ -494,9 +530,10 @@ func init() { env: []string{"GOARCH=amd64", "GOHOSTARCH=amd64"}, }) addBuilder(BuildConfig{ - Name: "windows-386-gce", - VMImage: "windows-buildlet-v2", - machineType: "n1-highcpu-2", + Name: "windows-386-gce", + VMImage: "windows-buildlet-v2", + machineType: "n1-highcpu-2", + // TODO(bradfitz): once buildlet type vs. config type is split: BuildletType: "windows-amd64-gce", buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.windows-amd64", Go14URL: "https://storage.googleapis.com/go-builder-data/go1.4-windows-386.tar.gz", RegularDisk: true, diff --git a/env/commit-watcher/Dockerfile b/env/commit-watcher/Dockerfile index ceb20242..6d9e4d60 100644 --- a/env/commit-watcher/Dockerfile +++ b/env/commit-watcher/Dockerfile @@ -13,4 +13,5 @@ ADD /scripts/install-apt-deps.sh /scripts/ RUN /scripts/install-apt-deps.sh ADD /scripts/build-commit-watcher.sh /scripts/ -RUN GO_REV=go1.4 TOOLS_REV=a54d0066172 WATCHER_REV=f7dc6a2 /scripts/build-commit-watcher.sh && test -f /usr/local/bin/watcher +# Note that WATCHER_REV must be full for "git fetch origin $REV" later: +RUN GO_REV=go1.4 TOOLS_REV=a54d0066172 WATCHER_REV=89ace0b562ee0e4424941a285df09002d72a6a9a /scripts/build-commit-watcher.sh && test -f /usr/local/bin/watcher diff --git a/env/commit-watcher/scripts/build-commit-watcher.sh b/env/commit-watcher/scripts/build-commit-watcher.sh index 1a61eafa..70b5d6e2 100755 --- a/env/commit-watcher/scripts/build-commit-watcher.sh +++ b/env/commit-watcher/scripts/build-commit-watcher.sh @@ -20,6 +20,11 @@ GO_BUILD=$GOPATH/src/golang.org/x/build mkdir -p $GO_BUILD git clone https://go.googlesource.com/build $GO_BUILD +# Um, this didn't seem to work? Old git version in wheezy? +#git fetch https://go.googlesource.com/build $WATCHER_REV:origin/dummy-commit # in case it's a pending CL +# Hack, instead: +cd $GO_BUILD && git fetch https://go.googlesource.com/build refs/changes/50/10750/5 + mkdir -p $PREFIX/bin (cd $GO_BUILD && git reset --hard $WATCHER_REV && GOBIN=$PREFIX/bin /goroot/bin/go install golang.org/x/build/cmd/watcher) diff --git a/internal/lru/cache.go b/internal/lru/cache.go new file mode 100644 index 00000000..c19c0e5a --- /dev/null +++ b/internal/lru/cache.go @@ -0,0 +1,96 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package lru implements an LRU cache. +package lru + +import ( + "container/list" + "sync" +) + +// Cache is an LRU cache, safe for concurrent access. +type Cache struct { + maxEntries int + + mu sync.Mutex + ll *list.List + cache map[interface{}]*list.Element +} + +// *entry is the type stored in each *list.Element. +type entry struct { + key, value interface{} +} + +// New returns a new cache with the provided maximum items. +func New(maxEntries int) *Cache { + return &Cache{ + maxEntries: maxEntries, + ll: list.New(), + cache: make(map[interface{}]*list.Element), + } +} + +// Add adds the provided key and value to the cache, evicting +// an old item if necessary. +func (c *Cache) Add(key, value interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + + // Already in cache? + if ee, ok := c.cache[key]; ok { + c.ll.MoveToFront(ee) + ee.Value.(*entry).value = value + return + } + + // Add to cache if not present + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + + if c.ll.Len() > c.maxEntries { + c.removeOldest() + } +} + +// Get fetches the key's value from the cache. +// The ok result will be true if the item was found. +func (c *Cache) Get(key interface{}) (value interface{}, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + if ele, hit := c.cache[key]; hit { + c.ll.MoveToFront(ele) + return ele.Value.(*entry).value, true + } + return +} + +// RemoveOldest removes the oldest item in the cache and returns its key and value. +// If the cache is empty, the empty string and nil are returned. +func (c *Cache) RemoveOldest() (key, value interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + return c.removeOldest() +} + +// note: must hold c.mu +func (c *Cache) removeOldest() (key, value interface{}) { + ele := c.ll.Back() + if ele == nil { + return + } + c.ll.Remove(ele) + ent := ele.Value.(*entry) + delete(c.cache, ent.key) + return ent.key, ent.value + +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.ll.Len() +} diff --git a/internal/lru/cache_test.go b/internal/lru/cache_test.go new file mode 100644 index 00000000..c8d5a590 --- /dev/null +++ b/internal/lru/cache_test.go @@ -0,0 +1,59 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package lru + +import ( + "reflect" + "testing" +) + +func TestLRU(t *testing.T) { + c := New(2) + + expectMiss := func(k string) { + v, ok := c.Get(k) + if ok { + t.Fatalf("expected cache miss on key %q but hit value %v", k, v) + } + } + + expectHit := func(k string, ev interface{}) { + v, ok := c.Get(k) + if !ok { + t.Fatalf("expected cache(%q)=%v; but missed", k, ev) + } + if !reflect.DeepEqual(v, ev) { + t.Fatalf("expected cache(%q)=%v; but got %v", k, ev, v) + } + } + + expectMiss("1") + c.Add("1", "one") + expectHit("1", "one") + + c.Add("2", "two") + expectHit("1", "one") + expectHit("2", "two") + + c.Add("3", "three") + expectHit("3", "three") + expectHit("2", "two") + expectMiss("1") +} + +func TestRemoveOldest(t *testing.T) { + c := New(2) + c.Add("1", "one") + c.Add("2", "two") + if k, v := c.RemoveOldest(); k != "1" || v != "one" { + t.Fatalf("oldest = %q, %q; want 1, one", k, v) + } + if k, v := c.RemoveOldest(); k != "2" || v != "two" { + t.Fatalf("oldest = %q, %q; want 2, two", k, v) + } + if k, v := c.RemoveOldest(); k != nil || v != nil { + t.Fatalf("oldest = %v, %v; want \"\", nil", k, v) + } +} diff --git a/internal/singleflight/singleflight.go b/internal/singleflight/singleflight.go new file mode 100644 index 00000000..f4cb2d67 --- /dev/null +++ b/internal/singleflight/singleflight.go @@ -0,0 +1,111 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight + +import "sync" + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + c.val, c.err = fn() + c.wg.Done() + + g.mu.Lock() + delete(g.m, key) + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + g.mu.Unlock() +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/internal/singleflight/singleflight_test.go b/internal/singleflight/singleflight_test.go new file mode 100644 index 00000000..30ba7f7a --- /dev/null +++ b/internal/singleflight/singleflight_test.go @@ -0,0 +1,73 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package singleflight + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestDo(t *testing.T) { + var g Group + v, err, _ := g.Do("key", func() (interface{}, error) { + return "bar", nil + }) + if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { + t.Errorf("Do = %v; want %v", got, want) + } + if err != nil { + t.Errorf("Do error = %v", err) + } +} + +func TestDoErr(t *testing.T) { + var g Group + someErr := errors.New("Some error") + v, err, _ := g.Do("key", func() (interface{}, error) { + return nil, someErr + }) + if err != someErr { + t.Errorf("Do error = %v; want someErr %v", err, someErr) + } + if v != nil { + t.Errorf("unexpected non-nil value %#v", v) + } +} + +func TestDoDupSuppress(t *testing.T) { + var g Group + c := make(chan string) + var calls int32 + fn := func() (interface{}, error) { + atomic.AddInt32(&calls, 1) + return <-c, nil + } + + const n = 10 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + v, err, _ := g.Do("key", fn) + if err != nil { + t.Errorf("Do error: %v", err) + } + if v.(string) != "bar" { + t.Errorf("got %q; want %q", v, "bar") + } + wg.Done() + }() + } + time.Sleep(100 * time.Millisecond) // let goroutines above block + c <- "bar" + wg.Wait() + if got := atomic.LoadInt32(&calls); got != 1 { + t.Errorf("number of calls = %d; want 1", got) + } +}