buildlet, cmd/coordinator: GCE quota accounting, fixes

And deal with Preemptible resource exhaustion errors.

And change all-compile to misc-compile and only do the builders
not covered otherwise (Fixes #11073)

And make the watcher serve git source.

And cache and singleflight fetching of git source.

And a million other things.

Fixes golang/go#11073

Change-Id: I0f45610f0c6a06bd0c8ba9632b8624e00aeb52fc
Reviewed-on: https://go-review.googlesource.com/10750
Reviewed-by: Andrew Gerrand <adg@golang.org>
This commit is contained in:
Brad Fitzpatrick 2015-06-04 18:25:50 -07:00
Родитель 854424e4c8
Коммит 1b1e086fd1
14 изменённых файлов: 823 добавлений и 193 удалений

Просмотреть файл

@ -8,6 +8,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
@ -62,6 +63,12 @@ type VMOpts struct {
// OnInstanceCreated optionally specifies a hook to run synchronously
// after the computeService.Instances.Get call.
OnGotInstanceInfo func()
// FallbackToFullPrice optionally specifies a hook to return a new
// GCE instance name if the first one failed to launch
// as preemptible. (If you use the same name, GCE complains about
// resources already existing, even if it failed to be created)
FallbackToFullPrice func() (newInstname string)
}
// StartNewVM boots a new VM on GCE and returns a buildlet client
@ -85,7 +92,7 @@ func StartNewVM(ts oauth2.TokenSource, instName, builderType string, opts VMOpts
return nil, errors.New("buildlet: missing required ProjectID option")
}
usePreempt := true
usePreempt := false
Try:
prefix := "https://www.googleapis.com/compute/v1/projects/" + projectID
machType := prefix + "/zones/" + zone + "/machineTypes/" + conf.MachineType()
@ -94,6 +101,19 @@ Try:
diskType = "" // a spinning disk
}
// Request an IP address if this is a world-facing buildlet.
var accessConfigs []*compute.AccessConfig
// TODO(bradfitz): remove the "true ||" part once we figure out why the buildlet
// never boots without an IP address. Userspace seems to hang before we get to the buildlet?
if true || !opts.TLS.IsZero() {
accessConfigs = []*compute.AccessConfig{
&compute.AccessConfig{
Type: "ONE_TO_ONE_NAT",
Name: "External NAT",
},
}
}
instance := &compute.Instance{
Name: instName,
Description: opts.Description,
@ -120,13 +140,8 @@ Try:
Metadata: &compute.Metadata{},
NetworkInterfaces: []*compute.NetworkInterface{
&compute.NetworkInterface{
AccessConfigs: []*compute.AccessConfig{
&compute.AccessConfig{
Type: "ONE_TO_ONE_NAT",
Name: "External NAT",
},
},
Network: prefix + "/global/networks/default",
AccessConfigs: accessConfigs,
Network: prefix + "/global/networks/default",
},
},
Scheduling: &compute.Scheduling{
@ -188,8 +203,12 @@ OpLoop:
case "DONE":
if op.Error != nil {
for _, operr := range op.Error.Errors {
if operr.Code == "ZONE_RESOURCE_POOL_EXHAUSTED" && usePreempt {
log.Printf("failed to create instance %s in zone %s: %v", instName, zone, operr.Code)
if operr.Code == "ZONE_RESOURCE_POOL_EXHAUSTED" && usePreempt && opts.FallbackToFullPrice != nil {
oldName := instName
usePreempt = false
instName = opts.FallbackToFullPrice()
log.Printf("buildlet/gce: retrying without preempt with name %q (previously: %q)", instName, oldName)
goto Try
}
// TODO: catch Code=="QUOTA_EXCEEDED" and "Message" and return

Просмотреть файл

@ -45,7 +45,7 @@ import (
var (
haltEntireOS = flag.Bool("halt", true, "halt OS in /halt handler. If false, the buildlet process just ends.")
workDir = flag.String("workdir", "", "Temporary directory to use. The contents of this directory may be deleted at any time. If empty, TempDir is used to create one.")
listenAddr = flag.String("listen", defaultListenAddr(), "address to listen on. Unused in reverse mode. Warning: this service is inherently insecure and offers no protection of its own. Do not expose this port to the world.")
listenAddr = flag.String("listen", "AUTO", "address to listen on. Unused in reverse mode. Warning: this service is inherently insecure and offers no protection of its own. Do not expose this port to the world.")
reverse = flag.String("reverse", "", "if non-empty, go into reverse mode where the buildlet dials the coordinator instead of listening for connections. The value is a comma-separated list of modes, e.g. 'darwin-arm,darwin-amd64-race'")
coordinator = flag.String("coordinator", "localhost:8119", "address of coordinator, in production use farmer.golang.org. Only used in reverse mode.")
)
@ -79,8 +79,15 @@ func main() {
log.SetOutput(w)
}
}
log.Printf("buildlet starting.")
flag.Parse()
if *listenAddr == "AUTO" {
v := defaultListenAddr()
log.Printf("Will listen on %s", v)
*listenAddr = v
}
onGCE := metadata.OnGCE()
if !onGCE && !strings.HasPrefix(*listenAddr, "localhost:") {
log.Printf("** WARNING *** This server is unsafe and offers no security. Be careful.")

Просмотреть файл

@ -22,7 +22,7 @@ import (
var (
proj = flag.String("project", "symbolic-datum-552", "name of Project")
zone = flag.String("zone", "us-central1-a", "GCE zone")
zone = flag.String("zone", "us-central1-b", "GCE zone")
mach = flag.String("machinetype", "n1-highcpu-2", "Machine type")
instName = flag.String("instance_name", "farmer", "Name of VM instance.")
sshPub = flag.String("ssh_public_key", "", "ssh public key file to authorize. Can modify later in Google's web UI anyway.")

Просмотреть файл

@ -35,6 +35,8 @@ import (
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/gerrit"
"golang.org/x/build/internal/lru"
"golang.org/x/build/internal/singleflight"
"golang.org/x/build/types"
"google.golang.org/cloud/storage"
)
@ -90,7 +92,7 @@ var tryBuilders []dashboard.BuildConfig
func init() {
tryList := []string{
"all-compile",
"misc-compile",
"darwin-amd64-10_10",
"linux-386",
"linux-amd64",
@ -573,7 +575,8 @@ func workaroundFlush(w http.ResponseWriter) {
func findWorkLoop(work chan<- builderRev) {
// Useful for debugging a single run:
if devCluster && false {
work <- builderRev{name: "linux-amd64-race", rev: "54789eff385780c54254f822e09505b6222918e2"}
work <- builderRev{name: "linux-amd64", rev: "54789eff385780c54254f822e09505b6222918e2"}
work <- builderRev{name: "windows-amd64-gce", rev: "54789eff385780c54254f822e09505b6222918e2"}
return
}
ticker := time.NewTicker(15 * time.Second)
@ -780,12 +783,17 @@ func (ts *trySet) notifyStarting() {
msg := "TryBots beginning. Status page: http://farmer.golang.org/try?commit=" + ts.Commit[:8]
if ci, err := gerritClient.GetChangeDetail(ts.ChangeID); err == nil {
if len(ci.Messages) == 0 {
log.Printf("No Gerrit comments retrieved on %v", ts.ChangeID)
}
for _, cmi := range ci.Messages {
if cmi.Message == msg {
if strings.Contains(cmi.Message, msg) {
// Dup. Don't spam.
return
}
}
} else {
log.Printf("Error getting Gerrit comments on %s: %v", ts.ChangeID, err)
}
// Ignore error. This isn't critical.
@ -1137,7 +1145,7 @@ func (st *buildStatus) onceInitHelpersFunc() {
st.helpers = GetBuildlets(st.donec, pool, st.conf.NumTestHelpers, st.buildletType(), st.rev, st)
}
func (st *buildStatus) build() (retErr error) {
func (st *buildStatus) build() error {
pool, err := st.buildletPool()
if err != nil {
return err
@ -1153,20 +1161,6 @@ func (st *buildStatus) build() (retErr error) {
st.mu.Unlock()
st.logEventTime("got_buildlet", bc.IPPort())
goodRes := func(res *http.Response, err error, what string) bool {
if err != nil {
retErr = fmt.Errorf("%s: %v", what, err)
return false
}
if res.StatusCode/100 != 2 {
slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
retErr = fmt.Errorf("%s: %v; body: %s", what, res.Status, slurp)
res.Body.Close()
return false
}
return true
}
// Write the VERSION file.
st.logEventTime("start_write_version_tar")
@ -1174,19 +1168,15 @@ func (st *buildStatus) build() (retErr error) {
return fmt.Errorf("writing VERSION tgz: %v", err)
}
// Feed the buildlet a tar file for it to extract.
// TODO: cache these.
st.logEventTime("start_fetch_gerrit_tgz")
tarRes, err := http.Get("https://go.googlesource.com/go/+archive/" + st.rev + ".tar.gz")
if !goodRes(tarRes, err, "fetching tarball from Gerrit") {
return
}
var grp syncutil.Group
grp.Go(func() error {
st.logEventTime("fetch_go_tar")
tarReader, err := getSourceTgz(st, "go", st.rev)
if err != nil {
return err
}
st.logEventTime("start_write_go_tar")
if err := bc.PutTar(tarRes.Body, "go"); err != nil {
tarRes.Body.Close()
if err := bc.PutTar(tarReader, "go"); err != nil {
return fmt.Errorf("writing tarball from Gerrit: %v", err)
}
st.logEventTime("end_write_go_tar")
@ -1356,8 +1346,7 @@ func (st *buildStatus) distTestList() (names []string, err error) {
func (st *buildStatus) newTestSet(names []string) *testSet {
set := &testSet{
st: st,
retryc: make(chan *testItem, len(names)),
st: st,
}
for _, name := range names {
set.items = append(set.items, &testItem{
@ -1635,12 +1624,16 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
// lumpy. The rest of the buildlets run the largest tests
// first (critical path scheduling).
go func() {
goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
for tis := range set.itemsInOrder() {
for {
tis, ok := set.testsToRunInOrder()
if !ok {
st.logEventTime("in_order_tests_complete")
return
}
goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
st.runTestsOnBuildlet(st.bc, tis, goroot)
}
}()
helperWork := set.itemsBiggestFirst()
go func() {
for helper := range helpers {
go func(bc *buildlet.Client) {
@ -1660,9 +1653,14 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
return
}
goroot := st.conf.FilePathJoin(workDir, "go")
st.logEventTime("setup_helper", bc.IPPort())
for tis := range helperWork {
goroot := st.conf.FilePathJoin(workDir, "go")
for {
tis, ok := set.testsToRunBiggestFirst()
if !ok {
st.logEventTime("biggest_tests_complete", bc.IPPort())
return
}
st.runTestsOnBuildlet(bc, tis, goroot)
}
}(helper)
@ -1672,7 +1670,15 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err
var lastBanner string
var serialDuration time.Duration
for _, ti := range set.items {
<-ti.done // wait for success
AwaitDone:
for {
select {
case <-ti.done: // wait for success
break AwaitDone
case <-time.After(30 * time.Second):
st.logEventTime("still_waiting_on_test", ti.name)
}
}
serialDuration += ti.execDuration
if len(ti.output) > 0 {
@ -1818,11 +1824,9 @@ type testSet struct {
st *buildStatus
items []*testItem
// retryc communicates failures to watch a test. The channel is
// never closed. Sends should also select on reading st.donec
// to see if the things have stopped early due to another test
// failing and aborting the build.
retryc chan *testItem
mu sync.Mutex
inOrder [][]*testItem
biggestFirst [][]*testItem
}
// cancelAll cancels all pending tests.
@ -1832,76 +1836,72 @@ func (s *testSet) cancelAll() {
}
}
// itemsInOrder returns a channel of items mostly in their original order.
// The exception is that an item which fails to execute may happen later
// in a different order.
// Each item sent in the channel has been took. (ti.tryTake returned true)
// The returned channel is closed when no items remain.
func (s *testSet) itemsInOrder() <-chan []*testItem {
return s.itemChan(s.items)
func (s *testSet) testsToRunInOrder() (chunk []*testItem, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.inOrder == nil {
s.initInOrder()
}
return s.testsFromSlice(s.inOrder)
}
func (s *testSet) itemsBiggestFirst() <-chan []*testItem {
items := append([]*testItem(nil), s.items...)
sort.Sort(sort.Reverse(byTestDuration(items)))
return s.itemChan(items)
func (s *testSet) testsToRunBiggestFirst() (chunk []*testItem, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.biggestFirst == nil {
s.initBiggestFirst()
}
return s.testsFromSlice(s.biggestFirst)
}
// itemChan returns a channel which yields the provided items, usually
// in the same order given items, but grouped with others tests they
// should be run with. (only stdlib tests are are grouped)
func (s *testSet) itemChan(items []*testItem) <-chan []*testItem {
names := make([]string, len(items))
func (s *testSet) testsFromSlice(chunkList [][]*testItem) (chunk []*testItem, ok bool) {
for _, candChunk := range chunkList {
for _, ti := range candChunk {
if ti.tryTake() {
chunk = append(chunk, ti)
}
}
if len(chunk) > 0 {
return chunk, true
}
}
return nil, false
}
func (s *testSet) initInOrder() {
names := make([]string, len(s.items))
namedItem := map[string]*testItem{}
for i, ti := range items {
for i, ti := range s.items {
names[i] = ti.name
namedItem[ti.name] = ti
}
// First do the go_test:* ones. partitionGoTests
// only returns those, which are the ones we merge together.
stdSets := partitionGoTests(names)
setForTest := map[string][]*testItem{}
for _, set := range stdSets {
tis := make([]*testItem, len(set))
for i, name := range set {
tis[i] = namedItem[name]
setForTest[name] = tis
}
s.inOrder = append(s.inOrder, tis)
}
ch := make(chan []*testItem)
go func() {
defer close(ch)
for _, ti := range items {
if !ti.tryTake() {
continue
}
send := []*testItem{ti}
for _, other := range setForTest[ti.name] {
if other != ti && other.tryTake() {
send = append(send, other)
}
}
select {
case ch <- send:
case <-s.st.donec:
return
}
// Then do the misc tests, which are always by themselves.
// (No benefit to merging them)
for _, ti := range s.items {
if !strings.HasPrefix(ti.name, "go_test:") {
s.inOrder = append(s.inOrder, []*testItem{ti})
}
for {
select {
case ti := <-s.retryc:
if ti.tryTake() {
select {
case ch <- []*testItem{ti}:
case <-s.st.donec:
return
}
}
case <-s.st.donec:
return
}
}
}()
return ch
}
}
func (s *testSet) initBiggestFirst() {
items := append([]*testItem(nil), s.items...)
sort.Sort(sort.Reverse(byTestDuration(items)))
for _, item := range items {
s.biggestFirst = append(s.biggestFirst, []*testItem{item})
}
}
type testItem struct {
@ -1950,14 +1950,6 @@ func (ti *testItem) isDone() bool {
func (ti *testItem) retry() {
// release it to make it available for somebody else to try later:
<-ti.take
// Enqueue this test to retry, unless the build is
// only proceeding to the first failure and it's
// already failed.
select {
case ti.set.retryc <- ti:
case <-ti.set.st.donec:
}
}
type byTestDuration []*testItem
@ -2104,7 +2096,9 @@ func (st *buildStatus) writeEventsLocked(w io.Writer, htmlMode bool) {
}
fmt.Fprintf(w, " %7s %v %s %s\n", elapsed, evt.t.Format(time.RFC3339), e, text)
}
fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
if st.isRunningLocked() {
fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
}
}
@ -2214,4 +2208,75 @@ func versionTgz(rev string) io.Reader {
return bytes.NewReader(buf.Bytes())
}
var sourceGroup singleflight.Group
var sourceCache = lru.New(20) // git rev -> []byte
// repo is go.googlesource.com repo ("go", "net", etc)
// rev is git revision.
func getSourceTgz(el eventTimeLogger, repo, rev string) (tgz io.Reader, err error) {
fromCache := false
vi, err, shared := sourceGroup.Do(rev, func() (interface{}, error) {
if tgzBytes, ok := sourceCache.Get(rev); ok {
fromCache = true
return tgzBytes, nil
}
for i := 0; i < 10; i++ {
el.logEventTime("fetching_source", "from watcher")
tgzBytes, err := getSourceTgzFromWatcher(repo, rev)
if err == nil {
sourceCache.Add(rev, tgzBytes)
return tgzBytes, nil
}
log.Printf("Error fetching source %s/%s from watcher (after %v uptime): %v",
repo, rev, time.Since(processStartTime), err)
// Wait for watcher to start up. Give it a minute until
// we try Gerrit.
time.Sleep(6 * time.Second)
}
el.logEventTime("fetching_source", "from gerrit")
tgzBytes, err := getSourceTgzFromGerrit(repo, rev)
if err == nil {
sourceCache.Add(rev, tgzBytes)
}
return tgzBytes, err
})
if err != nil {
return nil, err
}
el.logEventTime("got_source", fmt.Sprintf("cache=%v shared=%v", fromCache, shared))
return bytes.NewReader(vi.([]byte)), nil
}
func getSourceTgzFromGerrit(repo, rev string) (tgz []byte, err error) {
return getSourceTgzFromURL("gerrit", repo, rev, "https://go.googlesource.com/"+repo+"/+archive/"+rev+".tar.gz")
}
func getSourceTgzFromWatcher(repo, rev string) (tgz []byte, err error) {
return getSourceTgzFromURL("watcher", repo, rev, "http://"+gitArchiveAddr+"/"+repo+".tar.gz?rev="+rev)
}
func getSourceTgzFromURL(source, repo, rev, urlStr string) (tgz []byte, err error) {
res, err := http.Get(urlStr)
if err != nil {
return nil, fmt.Errorf("fetching %s/%s from %s: %v", repo, rev, source, err)
}
defer res.Body.Close()
if res.StatusCode/100 != 2 {
slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
return nil, fmt.Errorf("fetching %s/%s from %s: %v; body: %s", repo, rev, source, res.Status, slurp)
}
const maxSize = 25 << 20 // seems unlikely; go source is 7.8MB on 2015-06-15
slurp, err := ioutil.ReadAll(io.LimitReader(res.Body, maxSize+1))
if len(slurp) > maxSize && err == nil {
err = fmt.Errorf("body over %d bytes", maxSize)
}
if err != nil {
return nil, fmt.Errorf("reading %s/%s from %s: %v", repo, rev, source, err)
}
return slurp, nil
}
var nl = []byte("\n")

Просмотреть файл

@ -21,6 +21,7 @@ import (
"time"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/gerrit"
"golang.org/x/net/context"
"golang.org/x/oauth2"
@ -39,7 +40,7 @@ func init() {
// apiCallTicker ticks regularly, preventing us from accidentally making
// GCE API calls too quickly. Our quota is 20 QPS, but we temporarily
// limit ourselves to less than that.
var apiCallTicker = time.NewTicker(time.Second / 5)
var apiCallTicker = time.NewTicker(time.Second / 10)
func gceAPIGate() {
<-apiCallTicker.C
@ -49,6 +50,7 @@ func gceAPIGate() {
var (
projectID string
projectZone string
projectRegion string
computeService *compute.Service
externalIP string
tokenSource oauth2.TokenSource
@ -88,12 +90,15 @@ func initGCE() error {
if err != nil || projectZone == "" {
return fmt.Errorf("failed to get current GCE zone: %v", err)
}
// Convert the zone from "projects/1234/zones/us-central1-a" to "us-central1-a".
projectZone = path.Base(projectZone)
if !hasComputeScope() {
return errors.New("The coordinator is not running with access to read and write Compute resources. VM support disabled.")
}
projectRegion = projectZone[:strings.LastIndex(projectZone, "-")] // "us-central1"
externalIP, err = metadata.ExternalIP()
if err != nil {
return fmt.Errorf("ExternalIP: %v", err)
@ -109,9 +114,8 @@ func initGCE() error {
devCluster = projectID == "go-dashboard-dev"
if devCluster {
log.Printf("Running in dev cluster")
gcePool.vmCap = make(chan bool, 5)
}
go gcePool.pollQuotaLoop()
return nil
}
@ -134,38 +138,72 @@ func checkTryBuildDeps() error {
return nil
}
// We artifically limit ourselves to 60 VMs right now, assuming that
// each takes 2 CPU, and we have a current quota of 200 CPUs. That
// gives us headroom, but also doesn't account for SSD or memory
// quota.
// TODO(bradfitz): better quota system.
const maxVMs = 60
var gcePool = &gceBuildletPool{}
var gcePool = &gceBuildletPool{
vmCap: make(chan bool, maxVMs),
}
var _ BuildletPool = (*gceBuildletPool)(nil)
type gceBuildletPool struct {
// vmCap is a semaphore used to limit the number of VMs in
// use.
vmCap chan bool
// maxInstances is a temporary hack because we can't get buildlets to boot
// without IPs, and we only have 200 IP addresses.
// TODO(bradfitz): remove this once fixed.
const maxInstances = 190
mu sync.Mutex
instUsed map[string]time.Time // GCE VM instance name -> creationTime
type gceBuildletPool struct {
mu sync.Mutex // guards all following
disabled bool
// CPU quota usage & limits.
cpuLeft int // dead-reckoning CPUs remain
instLeft int // dead-reckoning instances remain
instUsage int
cpuUsage int
addrUsage int
inst map[string]time.Time // GCE VM instance name -> creationTime
}
func (p *gceBuildletPool) pollQuotaLoop() {
for {
p.pollQuota()
time.Sleep(5 * time.Second)
}
}
func (p *gceBuildletPool) pollQuota() {
gceAPIGate()
reg, err := computeService.Regions.Get(projectID, projectRegion).Do()
if err != nil {
log.Printf("Failed to get quota for %s/%s: %v", projectID, projectRegion, err)
return
}
p.mu.Lock()
defer p.mu.Unlock()
for _, quota := range reg.Quotas {
switch quota.Metric {
case "CPUS":
p.cpuLeft = int(quota.Limit) - int(quota.Usage)
p.cpuUsage = int(quota.Usage)
case "INSTANCES":
p.instLeft = int(quota.Limit) - int(quota.Usage)
p.instUsage = int(quota.Usage)
case "IN_USE_ADDRESSES":
p.addrUsage = int(quota.Usage)
}
}
}
func (p *gceBuildletPool) SetEnabled(enabled bool) {
if enabled {
p.vmCap = make(chan bool, maxVMs)
} else {
p.vmCap = make(chan bool)
}
p.mu.Lock()
defer p.mu.Unlock()
p.disabled = !enabled
}
func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
el.logEventTime("awaiting_gce_quota")
if err := p.awaitVMCountQuota(cancel); err != nil {
conf, ok := dashboard.Builders[typ]
if !ok {
return nil, fmt.Errorf("gcepool: unknown buildlet type %q", typ)
}
if err := p.awaitVMCountQuota(cancel, conf.GCENumCPU()); err != nil {
return nil, err
}
@ -184,10 +222,18 @@ func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTi
Description: fmt.Sprintf("Go Builder for %s at %s", typ, rev),
DeleteIn: vmDeleteTimeout,
OnInstanceRequested: func() {
needDelete = true
el.logEventTime("instance_create_requested")
el.logEventTime("instance_create_requested", instName)
log.Printf("GCE VM %q now booting", instName)
},
FallbackToFullPrice: func() string {
el.logEventTime("gce_fallback_to_full_price", "for "+instName)
p.setInstanceUsed(instName, false)
newName := instName + "-f"
log.Printf("Gave up on preemptible %q; now booting %q", instName, newName)
instName = newName
p.setInstanceUsed(instName, true)
return newName
},
OnInstanceCreated: func() {
el.logEventTime("instance_created")
needDelete = true // redundant with OnInstanceRequested one, but fine.
@ -197,26 +243,39 @@ func (p *gceBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTi
},
})
if err != nil {
el.logEventTime("gce_buildlet_create_failure", fmt.Sprintf("%s: %v", instName, err))
log.Printf("Failed to create VM for %s, %s: %v", typ, rev, err)
if needDelete {
deleteVM(projectZone, instName)
p.putVMCountQuota(conf.GCENumCPU())
}
p.setInstanceUsed(instName, false)
p.putVMCountQuota()
return nil, err
}
bc.SetDescription("GCE VM: " + instName)
bc.SetCloseFunc(func() error {
deleteVM(projectZone, instName)
p.setInstanceUsed(instName, false)
p.putVMCountQuota()
return nil
})
bc.SetCloseFunc(func() error { return p.putBuildlet(bc, typ, instName) })
return bc, nil
}
func (p *gceBuildletPool) putBuildlet(bc *buildlet.Client, typ, instName string) error {
// TODO(bradfitz): add the buildlet to a freelist (of max N
// items) for up to 10 minutes since when it got started if
// it's never seen a command execution failure, and we can
// wipe all its disk content. (perhaps wipe its disk content when
// it's retrieved, not put back on the freelist)
deleteVM(projectZone, instName)
p.setInstanceUsed(instName, false)
conf, ok := dashboard.Builders[typ]
if !ok {
panic("failed to lookup conf") // should've worked if we did it before
}
p.putVMCountQuota(conf.GCENumCPU())
return nil
}
func (p *gceBuildletPool) WriteHTMLStatus(w io.Writer) {
fmt.Fprintf(w, "<b>GCE pool</b> capacity: %d/%d", len(p.vmCap), cap(p.vmCap))
fmt.Fprintf(w, "<b>GCE pool</b> capacity: %s", p.capacityString())
const show = 6 // must be even
active := p.instancesActive()
if len(active) > 0 {
@ -233,44 +292,84 @@ func (p *gceBuildletPool) WriteHTMLStatus(w io.Writer) {
}
func (p *gceBuildletPool) String() string {
return fmt.Sprintf("GCE pool capacity: %d/%d", len(p.vmCap), cap(p.vmCap))
return fmt.Sprintf("GCE pool capacity: %s", p.capacityString())
}
// awaitVMCountQuota waits for quota.
func (p *gceBuildletPool) awaitVMCountQuota(cancel Cancel) error {
select {
case p.vmCap <- true:
return nil
case <-cancel:
return ErrCanceled
func (p *gceBuildletPool) capacityString() string {
p.mu.Lock()
defer p.mu.Unlock()
return fmt.Sprintf("%d/%d instances; %d/%d CPUs",
len(p.inst), p.instUsage+p.instLeft,
p.cpuUsage, p.cpuUsage+p.cpuLeft)
}
// awaitVMCountQuota waits for numCPU CPUs of quota to become available,
// or returns ErrCanceled.
func (p *gceBuildletPool) awaitVMCountQuota(cancel Cancel, numCPU int) error {
// Poll every 2 seconds, which could be better, but works and
// is simple.
for {
if p.tryAllocateQuota(numCPU) {
return nil
}
select {
case <-time.After(2 * time.Second):
case <-cancel:
return ErrCanceled
}
}
}
func (p *gceBuildletPool) putVMCountQuota() { <-p.vmCap }
func (p *gceBuildletPool) tryAllocateQuota(numCPU int) bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.disabled {
return false
}
if p.cpuLeft >= numCPU && p.instLeft >= 1 && len(p.inst) < maxInstances && p.addrUsage < maxInstances {
p.cpuUsage += numCPU
p.cpuLeft -= numCPU
p.instLeft--
p.addrUsage++
return true
}
return false
}
// putVMCountQuota adjusts the dead-reckoning of our quota usage by
// one instance and cpu CPUs.
func (p *gceBuildletPool) putVMCountQuota(cpu int) {
p.mu.Lock()
defer p.mu.Unlock()
p.cpuUsage -= cpu
p.cpuLeft += cpu
p.instLeft++
}
func (p *gceBuildletPool) setInstanceUsed(instName string, used bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.instUsed == nil {
p.instUsed = make(map[string]time.Time)
if p.inst == nil {
p.inst = make(map[string]time.Time)
}
if used {
p.instUsed[instName] = time.Now()
p.inst[instName] = time.Now()
} else {
delete(p.instUsed, instName)
delete(p.inst, instName)
}
}
func (p *gceBuildletPool) instanceUsed(instName string) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.instUsed[instName]
_, ok := p.inst[instName]
return ok
}
func (p *gceBuildletPool) instancesActive() (ret []instanceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, create := range p.instUsed {
for name, create := range p.inst {
ret = append(ret, instanceTime{
name: name,
creation: create,

Просмотреть файл

@ -32,6 +32,8 @@ type watchConfig struct {
dash string // "https://build.golang.org/" (must end in /)
interval time.Duration // Polling interval
mirrorBase string // "https://github.com/golang/" or empty to disable mirroring
netHost bool // run docker container in the host's network namespace
httpAddr string
}
type imageInfo struct {
@ -45,12 +47,20 @@ var images = map[string]*imageInfo{
"go-commit-watcher": {url: "https://storage.googleapis.com/go-builder-data/docker-commit-watcher.tar.gz"},
}
const gitArchiveAddr = "127.0.0.1:21536" // 21536 == keys above WATCH
func startWatchers() {
mirrorBase := "https://github.com/golang/"
if devCluster {
mirrorBase = "" // don't mirror from dev cluster
}
addWatcher(watchConfig{repo: "https://go.googlesource.com/go", dash: dashBase(), mirrorBase: mirrorBase})
addWatcher(watchConfig{
repo: "https://go.googlesource.com/go",
dash: dashBase(),
mirrorBase: mirrorBase,
netHost: true,
httpAddr: gitArchiveAddr,
})
addWatcher(watchConfig{repo: "https://go.googlesource.com/gofrontend", dash: dashBase() + "gccgo/"})
go cleanUpOldContainers()
@ -94,12 +104,16 @@ func (conf watchConfig) dockerRunArgs() (args []string) {
args = append(args, "-v", tmpKey+":/.gobuildkey")
args = append(args, "-v", tmpKey+":/root/.gobuildkey")
}
if conf.netHost {
args = append(args, "--net=host")
}
args = append(args,
"go-commit-watcher",
"/usr/local/bin/watcher",
"-repo="+conf.repo,
"-dash="+conf.dash,
"-poll="+conf.interval.String(),
"-http="+conf.httpAddr,
)
if conf.mirrorBase != "" {
dst, err := url.Parse(conf.mirrorBase)
@ -191,8 +205,13 @@ func startWatching(conf watchConfig) (err error) {
// Start a goroutine to wait for the watcher to die.
go func() {
exec.Command("docker", "wait", container).Run()
out, _ := exec.Command("docker", "logs", container).CombinedOutput()
exec.Command("docker", "rm", "-v", container).Run()
log.Printf("Watcher %v crashed. Restarting soon.", conf.repo)
const maxLogBytes = 1 << 10
if len(out) > maxLogBytes {
out = out[len(out)-maxLogBytes:]
}
log.Printf("Watcher %v crashed. Restarting soon. Logs: %s", conf.repo, out)
restartWatcherSoon(conf)
}()
return nil

Просмотреть файл

@ -16,6 +16,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
@ -24,6 +25,7 @@ import (
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -44,6 +46,7 @@ var (
network = flag.Bool("network", true, "Enable network calls (disable for testing)")
mirrorBase = flag.String("mirror", "", `Mirror repository base URL (eg "https://github.com/golang/")`)
filter = flag.String("filter", "", "Comma-separated list of directories or files to watch for new commits (only works on main repo)")
httpAddr = flag.String("http", "", "If non-empty, the listen address to run an HTTP server on")
)
var (
@ -80,6 +83,14 @@ func run() error {
}
defer os.RemoveAll(dir)
if *httpAddr != "" {
ln, err := net.Listen("tcp", *httpAddr)
if err != nil {
return err
}
go http.Serve(ln, nil)
}
errc := make(chan error)
go func() {
@ -88,11 +99,13 @@ func run() error {
name := (*repoURL)[strings.LastIndex(*repoURL, "/")+1:]
dst = *mirrorBase + name
}
name := strings.TrimPrefix(*repoURL, goBase)
r, err := NewRepo(dir, *repoURL, dst, "")
if err != nil {
errc <- err
return
}
http.Handle("/"+name+".tar.gz", r)
errc <- r.Watch()
}()
@ -113,6 +126,7 @@ func run() error {
errc <- err
return
}
http.Handle("/"+name+".tar.gz", r)
errc <- r.Watch()
}(path)
}
@ -276,6 +290,9 @@ func (r *Repo) postChildren(b *Branch, parent *Commit) error {
continue
}
if err := r.postCommit(c); err != nil {
if strings.Contains(err.Error(), "this package already has a first commit; aborting") {
return nil
}
return err
}
}
@ -653,6 +670,28 @@ func (r *Repo) push() error {
})
}
func (r *Repo) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "GET" && req.Method != "HEAD" {
w.WriteHeader(http.StatusBadRequest)
return
}
rev := req.FormValue("rev")
if rev == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
cmd := exec.Command("git", "archive", "--format=tgz", rev)
cmd.Dir = r.root
tgz, err := cmd.Output()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", strconv.Itoa(len(tgz)))
w.Header().Set("Content-Type", "application/x-compressed")
w.Write(tgz)
}
func try(n int, fn func() error) error {
var err error
for tries := 0; tries < n; tries++ {

Просмотреть файл

@ -6,7 +6,10 @@
// pieces of the Go continuous build system.
package dashboard
import "strings"
import (
"strconv"
"strings"
)
// Builders are the different build configurations.
// The keys are like "darwin-amd64" or "linux-386-387".
@ -38,16 +41,18 @@ type BuildConfig struct {
// request from the buildlet pool. If empty, it defaults to
// the value of Name.
//
// Note: we should probably start using this mechanism for
// more builder types, which combined with buildlet reuse
// could reduce latency. (e.g. "linux-386-387", "linux-amd64",
// and "linux-amd64-race" all sharing same buildlet and
// machine type, and able to jump onto each others
// buidlets... they vary only in env/args). For now we're
// only using this for ARM trybots.
// These should be used to minimize builder types, so the buildlet pool
// implementations can reuse buildlets from similar-enough builds.
// (e.g. a shared linux-386 trybot can be reused for some linux-amd64
// or linux-amd64-race tests, etc)
//
// TODO(bradfitz): break BuildConfig up into BuildConfig and
// BuildletConfig and have a BuildConfig refer to a
// BuildletConfig. There's no much confusion now.
BuildletType string
env []string // extra environment ("key=value") pairs
env []string // extra environment ("key=value") pairs
allScriptArgs []string
}
func (c *BuildConfig) Env() []string { return append([]string(nil), c.env...) }
@ -123,7 +128,7 @@ func (c *BuildConfig) AllScript() string {
if strings.HasPrefix(c.Name, "darwin-arm") {
return "src/iostest.bash"
}
if c.Name == "all-compile" {
if c.Name == "misc-compile" {
return "src/buildall.bash"
}
return "src/all.bash"
@ -136,6 +141,19 @@ func (c *BuildConfig) AllScript() string {
// but for now we've only set up the scripts and verified that the main
// configurations work.
func (c *BuildConfig) SplitMakeRun() bool {
if strings.HasPrefix(c.Name, "linux-arm") {
// On Scaleway, we don't want to snapshot these to GCS
// yet. That might be a lot of bandwidth and we
// haven't measure their speed yet. We might want to
// store snapshots within Scaleway instead. For now:
// use the old way.
return false
}
if strings.HasPrefix(c.Name, "darwin-") {
// TODO(bradfitz,crawshaw,adg): this is failing for some reason:
// Error: runTests: distTestList: Remote error: fork/exec /var/folders/5h/sqs3zkxd12zclcslj67vccqh0000gp/T/buildlet-scatch190808745/go/bin/go: no such file or directory
return false
}
switch c.AllScript() {
case "src/all.bash", "src/all.bat", "src/all.rc":
// These we've verified to work.
@ -151,7 +169,7 @@ func (c *BuildConfig) AllScriptArgs() []string {
if strings.HasPrefix(c.Name, "darwin-arm") {
return []string{"-restart"}
}
return nil
return append([]string(nil), c.allScriptArgs...)
}
// MakeScript returns the relative path to the operating system's script to
@ -217,6 +235,13 @@ func (c BuildConfig) ShortOwner() string {
return strings.TrimSuffix(c.Owner, "@golang.org")
}
// GCENumCPU reports the number of GCE CPUs this buildlet requires.
func (c *BuildConfig) GCENumCPU() int {
t := c.MachineType()
n, _ := strconv.Atoi(t[strings.LastIndex(t, "-")+1:])
return n
}
func init() {
addBuilder(BuildConfig{
Name: "freebsd-amd64-gce93",
@ -242,8 +267,9 @@ func init() {
env: []string{"CC=clang"},
})
addBuilder(BuildConfig{
Name: "freebsd-386-gce101",
VMImage: "freebsd-amd64-gce101",
Name: "freebsd-386-gce101",
VMImage: "freebsd-amd64-gce101",
//BuildletType: "freebsd-amd64-gce101",
machineType: "n1-highcpu-2",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.freebsd-amd64",
Go14URL: "https://storage.googleapis.com/go-builder-data/go1.4-freebsd-amd64.tar.gz",
@ -256,16 +282,18 @@ func init() {
env: []string{"GOARCH=386", "CC=clang"},
})
addBuilder(BuildConfig{
Name: "linux-386",
VMImage: "linux-buildlet-std",
Name: "linux-386",
VMImage: "linux-buildlet-std",
//BuildletType: "linux-amd64",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64",
env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GOARCH=386", "GOHOSTARCH=386"},
NumTestHelpers: 3,
})
addBuilder(BuildConfig{
Name: "linux-386-387",
Notes: "GO386=387",
VMImage: "linux-buildlet-std",
Name: "linux-386-387",
Notes: "GO386=387",
VMImage: "linux-buildlet-std",
//BuildletType: "linux-amd64",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64",
env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GOARCH=386", "GOHOSTARCH=386", "GO386=387"},
})
@ -276,13 +304,18 @@ func init() {
NumTestHelpers: 3,
})
addBuilder(BuildConfig{
Name: "all-compile",
TryOnly: true, // TODO: for speed, restrict this to builds not covered by other trybots
Name: "misc-compile",
TryOnly: true,
VMImage: "linux-buildlet-std",
machineType: "n1-highcpu-16", // CPU-bound, uses it well.
Notes: "Runs buildall.sh to compile stdlib for all GOOS/GOARCH, but doesn't run any tests.",
Notes: "Runs buildall.sh to compile stdlib for GOOS/GOARCH pairs not otherwise covered by trybots, but doesn't run any tests.",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64",
env: []string{"GOROOT_BOOTSTRAP=/go1.4"},
allScriptArgs: []string{
// Filtering pattern to buildall.bash:
// TODO: add darwin-386 and
"^(linux-arm64|linux-ppc64|linux-ppc64le|nacl-arm|plan9-amd64|solaris-amd64|netbsd-386|netbsd-amd64|netbsd-arm|freebsd-arm|darwin-386)$",
},
})
addBuilder(BuildConfig{
Name: "linux-amd64-nocgo",
@ -301,7 +334,8 @@ func init() {
Name: "linux-amd64-noopt",
Notes: "optimizations and inlining disabled",
VMImage: "linux-buildlet-std",
env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GO_GCFLAGS=-N -l"},
//BuildletType: "linux-amd64",
env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GO_GCFLAGS=-N -l"},
})
addBuilder(BuildConfig{
Name: "linux-amd64-race",
@ -311,8 +345,9 @@ func init() {
// TODO(bradfitz): make race.bash shardable, then: NumTestHelpers: 3
})
addBuilder(BuildConfig{
Name: "linux-386-clang",
VMImage: "linux-buildlet-clang",
Name: "linux-386-clang",
VMImage: "linux-buildlet-clang",
//BuildletType: "linux-amd64-clang",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64",
env: []string{"GOROOT_BOOTSTRAP=/go1.4", "CC=/usr/bin/clang", "GOHOSTARCH=386"},
})
@ -420,6 +455,7 @@ func init() {
VMImage: "linux-buildlet-nacl-v2",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.linux-amd64",
env: []string{"GOROOT_BOOTSTRAP=/go1.4", "GOOS=nacl", "GOARCH=386", "GOHOSTOS=linux", "GOHOSTARCH=amd64"},
//BuildletType: "nacl-amd64p32",
})
addBuilder(BuildConfig{
Name: "nacl-amd64p32",
@ -494,9 +530,10 @@ func init() {
env: []string{"GOARCH=amd64", "GOHOSTARCH=amd64"},
})
addBuilder(BuildConfig{
Name: "windows-386-gce",
VMImage: "windows-buildlet-v2",
machineType: "n1-highcpu-2",
Name: "windows-386-gce",
VMImage: "windows-buildlet-v2",
machineType: "n1-highcpu-2",
// TODO(bradfitz): once buildlet type vs. config type is split: BuildletType: "windows-amd64-gce",
buildletURL: "http://storage.googleapis.com/go-builder-data/buildlet.windows-amd64",
Go14URL: "https://storage.googleapis.com/go-builder-data/go1.4-windows-386.tar.gz",
RegularDisk: true,

3
env/commit-watcher/Dockerfile поставляемый
Просмотреть файл

@ -13,4 +13,5 @@ ADD /scripts/install-apt-deps.sh /scripts/
RUN /scripts/install-apt-deps.sh
ADD /scripts/build-commit-watcher.sh /scripts/
RUN GO_REV=go1.4 TOOLS_REV=a54d0066172 WATCHER_REV=f7dc6a2 /scripts/build-commit-watcher.sh && test -f /usr/local/bin/watcher
# Note that WATCHER_REV must be full for "git fetch origin $REV" later:
RUN GO_REV=go1.4 TOOLS_REV=a54d0066172 WATCHER_REV=89ace0b562ee0e4424941a285df09002d72a6a9a /scripts/build-commit-watcher.sh && test -f /usr/local/bin/watcher

Просмотреть файл

@ -20,6 +20,11 @@ GO_BUILD=$GOPATH/src/golang.org/x/build
mkdir -p $GO_BUILD
git clone https://go.googlesource.com/build $GO_BUILD
# Um, this didn't seem to work? Old git version in wheezy?
#git fetch https://go.googlesource.com/build $WATCHER_REV:origin/dummy-commit # in case it's a pending CL
# Hack, instead:
cd $GO_BUILD && git fetch https://go.googlesource.com/build refs/changes/50/10750/5
mkdir -p $PREFIX/bin
(cd $GO_BUILD && git reset --hard $WATCHER_REV && GOBIN=$PREFIX/bin /goroot/bin/go install golang.org/x/build/cmd/watcher)

96
internal/lru/cache.go Normal file
Просмотреть файл

@ -0,0 +1,96 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package lru implements an LRU cache.
package lru
import (
"container/list"
"sync"
)
// Cache is an LRU cache, safe for concurrent access.
type Cache struct {
maxEntries int
mu sync.Mutex
ll *list.List
cache map[interface{}]*list.Element
}
// *entry is the type stored in each *list.Element.
type entry struct {
key, value interface{}
}
// New returns a new cache with the provided maximum items.
func New(maxEntries int) *Cache {
return &Cache{
maxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}
// Add adds the provided key and value to the cache, evicting
// an old item if necessary.
func (c *Cache) Add(key, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
// Already in cache?
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
// Add to cache if not present
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.ll.Len() > c.maxEntries {
c.removeOldest()
}
}
// Get fetches the key's value from the cache.
// The ok result will be true if the item was found.
func (c *Cache) Get(key interface{}) (value interface{}, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
// RemoveOldest removes the oldest item in the cache and returns its key and value.
// If the cache is empty, the empty string and nil are returned.
func (c *Cache) RemoveOldest() (key, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
return c.removeOldest()
}
// note: must hold c.mu
func (c *Cache) removeOldest() (key, value interface{}) {
ele := c.ll.Back()
if ele == nil {
return
}
c.ll.Remove(ele)
ent := ele.Value.(*entry)
delete(c.cache, ent.key)
return ent.key, ent.value
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.ll.Len()
}

Просмотреть файл

@ -0,0 +1,59 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package lru
import (
"reflect"
"testing"
)
func TestLRU(t *testing.T) {
c := New(2)
expectMiss := func(k string) {
v, ok := c.Get(k)
if ok {
t.Fatalf("expected cache miss on key %q but hit value %v", k, v)
}
}
expectHit := func(k string, ev interface{}) {
v, ok := c.Get(k)
if !ok {
t.Fatalf("expected cache(%q)=%v; but missed", k, ev)
}
if !reflect.DeepEqual(v, ev) {
t.Fatalf("expected cache(%q)=%v; but got %v", k, ev, v)
}
}
expectMiss("1")
c.Add("1", "one")
expectHit("1", "one")
c.Add("2", "two")
expectHit("1", "one")
expectHit("2", "two")
c.Add("3", "three")
expectHit("3", "three")
expectHit("2", "two")
expectMiss("1")
}
func TestRemoveOldest(t *testing.T) {
c := New(2)
c.Add("1", "one")
c.Add("2", "two")
if k, v := c.RemoveOldest(); k != "1" || v != "one" {
t.Fatalf("oldest = %q, %q; want 1, one", k, v)
}
if k, v := c.RemoveOldest(); k != "2" || v != "two" {
t.Fatalf("oldest = %q, %q; want 2, two", k, v)
}
if k, v := c.RemoveOldest(); k != nil || v != nil {
t.Fatalf("oldest = %v, %v; want \"\", nil", k, v)
}
}

Просмотреть файл

@ -0,0 +1,111 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}

Просмотреть файл

@ -0,0 +1,73 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package singleflight
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestDo(t *testing.T) {
var g Group
v, err, _ := g.Do("key", func() (interface{}, error) {
return "bar", nil
})
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
t.Errorf("Do = %v; want %v", got, want)
}
if err != nil {
t.Errorf("Do error = %v", err)
}
}
func TestDoErr(t *testing.T) {
var g Group
someErr := errors.New("Some error")
v, err, _ := g.Do("key", func() (interface{}, error) {
return nil, someErr
})
if err != someErr {
t.Errorf("Do error = %v; want someErr %v", err, someErr)
}
if v != nil {
t.Errorf("unexpected non-nil value %#v", v)
}
}
func TestDoDupSuppress(t *testing.T) {
var g Group
c := make(chan string)
var calls int32
fn := func() (interface{}, error) {
atomic.AddInt32(&calls, 1)
return <-c, nil
}
const n = 10
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
v, err, _ := g.Do("key", fn)
if err != nil {
t.Errorf("Do error: %v", err)
}
if v.(string) != "bar" {
t.Errorf("got %q; want %q", v, "bar")
}
wg.Done()
}()
}
time.Sleep(100 * time.Millisecond) // let goroutines above block
c <- "bar"
wg.Wait()
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("number of calls = %d; want 1", got)
}
}