зеркало из https://github.com/golang/build.git
764 строки
21 KiB
Go
764 строки
21 KiB
Go
// Copyright 2014 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// The gitmirror binary watches the specified Gerrit repositories for
|
|
// new commits and syncs them to mirror repositories.
|
|
//
|
|
// It also serves tarballs over HTTP for the build system.
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/build/gerrit"
|
|
"golang.org/x/build/internal/envutil"
|
|
"golang.org/x/build/internal/gitauth"
|
|
"golang.org/x/build/internal/secret"
|
|
"golang.org/x/build/maintner"
|
|
"golang.org/x/build/maintner/godata"
|
|
repospkg "golang.org/x/build/repos"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var (
|
|
flagHTTPAddr = flag.String("http", "", "If non-empty, the listen address to run an HTTP server on")
|
|
flagCacheDir = flag.String("cachedir", "", "git cache directory. If empty a temp directory is made.")
|
|
flagPollInterval = flag.Duration("poll", 60*time.Second, "Remote repo poll interval")
|
|
flagMirror = flag.Bool("mirror", false, "whether to mirror to mirror repos; if disabled, it only runs in HTTP archive server mode")
|
|
flagMirrorGitHub = flag.Bool("mirror-github", true, "whether to mirror to GitHub when mirroring is enabled")
|
|
flagMirrorCSR = flag.Bool("mirror-csr", true, "whether to mirror to Cloud Source Repositories when mirroring is enabled")
|
|
flagSecretsDir = flag.String("secretsdir", "", "directory to load secrets from instead of GCP")
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
if *flagHTTPAddr != "" {
|
|
go func() {
|
|
err := http.ListenAndServe(*flagHTTPAddr, nil)
|
|
log.Fatalf("http server failed: %v", err)
|
|
}()
|
|
}
|
|
http.HandleFunc("/debug/env", handleDebugEnv)
|
|
http.HandleFunc("/debug/goroutines", handleDebugGoroutines)
|
|
|
|
if err := gitauth.Init(); err != nil {
|
|
log.Fatalf("gitauth: %v", err)
|
|
}
|
|
|
|
cacheDir, err := createCacheDir()
|
|
if err != nil {
|
|
log.Fatalf("creating cache dir: %v", err)
|
|
}
|
|
credsDir, err := os.MkdirTemp("", "gitmirror-credentials")
|
|
if err != nil {
|
|
log.Fatalf("creating credentials dir: %v", err)
|
|
}
|
|
defer os.RemoveAll(credsDir)
|
|
|
|
m := &gitMirror{
|
|
mux: http.DefaultServeMux,
|
|
repos: map[string]*repo{},
|
|
cacheDir: cacheDir,
|
|
homeDir: credsDir,
|
|
goBase: "https://go.googlesource.com/",
|
|
gerritClient: gerrit.NewClient("https://go-review.googlesource.com", gerrit.NoAuth),
|
|
mirrorGitHub: *flagMirrorGitHub,
|
|
mirrorCSR: *flagMirrorCSR,
|
|
timeoutScale: 1,
|
|
}
|
|
|
|
var eg errgroup.Group
|
|
for _, repo := range repospkg.ByGerritProject {
|
|
r := m.addRepo(repo)
|
|
eg.Go(r.init)
|
|
}
|
|
|
|
http.HandleFunc("/", m.handleRoot)
|
|
http.HandleFunc("/healthz", m.handleHealth)
|
|
|
|
if err := eg.Wait(); err != nil {
|
|
log.Fatalf("initializing repos: %v", err)
|
|
}
|
|
|
|
if *flagMirror {
|
|
if err := writeCredentials(credsDir); err != nil {
|
|
log.Fatalf("writing git credentials: %v", err)
|
|
}
|
|
if err := m.addMirrors(); err != nil {
|
|
log.Fatalf("configuring mirrors: %v", err)
|
|
}
|
|
}
|
|
|
|
for _, repo := range m.repos {
|
|
go repo.loop()
|
|
}
|
|
go m.pollGerritAndTickleLoop()
|
|
go m.subscribeToMaintnerAndTickleLoop()
|
|
|
|
shutdown := make(chan os.Signal, 1)
|
|
signal.Notify(shutdown, os.Interrupt)
|
|
<-shutdown
|
|
}
|
|
|
|
func writeCredentials(home string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
sshConfig := &bytes.Buffer{}
|
|
gitConfig := &bytes.Buffer{}
|
|
sshConfigPath := filepath.Join(home, "ssh_config")
|
|
// ssh ignores $HOME in favor of /etc/passwd, so we need to override ssh_config explicitly.
|
|
fmt.Fprintf(gitConfig, "[core]\n sshCommand=\"ssh -F %v\"\n", sshConfigPath)
|
|
|
|
// GitHub key, used as the default SSH private key.
|
|
if *flagMirrorGitHub {
|
|
privKey, err := retrieveSecret(ctx, secret.NameGitHubSSHKey)
|
|
if err != nil {
|
|
return fmt.Errorf("reading github key from secret manager: %v", err)
|
|
}
|
|
privKeyPath := filepath.Join(home, secret.NameGitHubSSHKey)
|
|
if err := os.WriteFile(privKeyPath, []byte(privKey+"\n"), 0600); err != nil {
|
|
return err
|
|
}
|
|
fmt.Fprintf(sshConfig, "Host github.com\n IdentityFile %v\n", privKeyPath)
|
|
}
|
|
|
|
// The gitmirror service account should already be available via GKE workload identity.
|
|
if *flagMirrorCSR {
|
|
fmt.Fprintf(gitConfig, "[credential \"https://source.developers.google.com\"]\n helper=gcloud.sh\n")
|
|
}
|
|
|
|
if err := os.WriteFile(filepath.Join(home, ".gitconfig"), gitConfig.Bytes(), 0600); err != nil {
|
|
return err
|
|
}
|
|
if err := os.WriteFile(sshConfigPath, sshConfig.Bytes(), 0600); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func retrieveSecret(ctx context.Context, name string) (string, error) {
|
|
if *flagSecretsDir != "" {
|
|
secret, err := os.ReadFile(filepath.Join(*flagSecretsDir, name))
|
|
return string(secret), err
|
|
}
|
|
sc := secret.MustNewClient()
|
|
defer sc.Close()
|
|
return sc.Retrieve(ctx, name)
|
|
}
|
|
|
|
func createCacheDir() (string, error) {
|
|
if *flagCacheDir == "" {
|
|
dir, err := os.MkdirTemp("", "gitmirror")
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer os.RemoveAll(dir)
|
|
return dir, nil
|
|
}
|
|
|
|
fi, err := os.Stat(*flagCacheDir)
|
|
if os.IsNotExist(err) {
|
|
if err := os.MkdirAll(*flagCacheDir, 0755); err != nil {
|
|
return "", fmt.Errorf("failed to create watcher's git cache dir: %v", err)
|
|
}
|
|
} else {
|
|
if err != nil {
|
|
return "", fmt.Errorf("invalid -cachedir: %v", err)
|
|
}
|
|
if !fi.IsDir() {
|
|
return "", fmt.Errorf("invalid -cachedir=%q; not a directory", *flagCacheDir)
|
|
}
|
|
}
|
|
return *flagCacheDir, nil
|
|
}
|
|
|
|
// A gitMirror watches Gerrit repositories, fetching the latest commits and
|
|
// optionally mirroring them.
|
|
type gitMirror struct {
|
|
mux *http.ServeMux
|
|
repos map[string]*repo
|
|
cacheDir string
|
|
// homeDir is used as $HOME for all commands, allowing easy configuration overrides.
|
|
homeDir string
|
|
goBase string // Base URL/path for Go upstream repos.
|
|
gerritClient *gerrit.Client
|
|
mirrorGitHub, mirrorCSR bool
|
|
timeoutScale int
|
|
}
|
|
|
|
func (m *gitMirror) addRepo(meta *repospkg.Repo) *repo {
|
|
name := meta.GoGerritProject
|
|
r := &repo{
|
|
name: name,
|
|
url: m.goBase + name,
|
|
meta: meta,
|
|
root: filepath.Join(m.cacheDir, name),
|
|
changed: make(chan bool, 1),
|
|
mirror: m,
|
|
}
|
|
m.mux.Handle("/"+name+".tar.gz", r)
|
|
m.mux.Handle("/debug/watcher/"+r.name, r)
|
|
m.repos[name] = r
|
|
return r
|
|
}
|
|
|
|
// addMirrors sets up mirroring for repositories that need it.
|
|
func (m *gitMirror) addMirrors() error {
|
|
for _, repo := range m.repos {
|
|
if m.mirrorGitHub && repo.meta.MirrorToGitHub {
|
|
if err := repo.addRemote("github", "git@github.com:"+repo.meta.GitHubRepo+".git", ""); err != nil {
|
|
return fmt.Errorf("adding GitHub remote: %v", err)
|
|
}
|
|
}
|
|
if m.mirrorCSR && repo.meta.MirrorToCSRProject != "" {
|
|
// Option "nokeycheck" skips Cloud Source Repositories' private
|
|
// key checking. We have dummy keys checked in as test data.
|
|
if err := repo.addRemote("csr", "https://source.developers.google.com/p/"+repo.meta.MirrorToCSRProject+"/r/"+repo.name, "nokeycheck"); err != nil {
|
|
return fmt.Errorf("adding CSR remote: %v", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GET /
|
|
// or:
|
|
// GET /debug/watcher/
|
|
func (m *gitMirror) handleRoot(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/" && r.URL.Path != "/debug/watcher/" {
|
|
http.NotFound(w, r)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
fmt.Fprint(w, "<html><body><pre>")
|
|
var names []string
|
|
for name := range m.repos {
|
|
names = append(names, name)
|
|
}
|
|
sort.Strings(names)
|
|
for _, name := range names {
|
|
fmt.Fprintf(w, "<a href='/debug/watcher/%s'>%s</a> - %s\n", name, name, m.repos[name].statusLine())
|
|
}
|
|
fmt.Fprint(w, "</pre></body></html>")
|
|
}
|
|
|
|
func (m *gitMirror) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
for _, r := range m.repos {
|
|
r.mu.Lock()
|
|
err := r.err
|
|
r.mu.Unlock()
|
|
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
fmt.Fprintf(w, "%v: %v\n", r.name, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
// a statusEntry is a status string at a specific time.
|
|
type statusEntry struct {
|
|
status string
|
|
t time.Time
|
|
}
|
|
|
|
// statusRing is a ring buffer of timestamped status messages.
|
|
type statusRing struct {
|
|
mu sync.Mutex // guards rest
|
|
head int // next position to fill
|
|
ent [50]statusEntry // ring buffer of entries; zero time means unpopulated
|
|
}
|
|
|
|
func (r *statusRing) add(status string) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
r.ent[r.head] = statusEntry{status, time.Now()}
|
|
r.head++
|
|
if r.head == len(r.ent) {
|
|
r.head = 0
|
|
}
|
|
}
|
|
|
|
func (r *statusRing) foreachDesc(fn func(statusEntry)) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
i := r.head
|
|
for {
|
|
i--
|
|
if i < 0 {
|
|
i = len(r.ent) - 1
|
|
}
|
|
if i == r.head || r.ent[i].t.IsZero() {
|
|
return
|
|
}
|
|
fn(r.ent[i])
|
|
}
|
|
}
|
|
|
|
type remote struct {
|
|
name string // name as configured in the repo.
|
|
pushOption string // optional extra push option (--push-option).
|
|
}
|
|
|
|
// repo represents a repository to be watched.
|
|
type repo struct {
|
|
name string
|
|
url string
|
|
root string // on-disk location of the bare git repo, *cacheDir/name
|
|
meta *repospkg.Repo
|
|
changed chan bool // sent to when a change comes in
|
|
status statusRing
|
|
dests []remote // destination remotes to mirror to
|
|
mirror *gitMirror
|
|
|
|
mu sync.Mutex
|
|
err error
|
|
firstBad time.Time
|
|
lastBad time.Time
|
|
firstGood time.Time
|
|
lastGood time.Time
|
|
}
|
|
|
|
// init sets up the repo, cloning the remote repository from r.url
|
|
// to a local --mirror (which implies --bare) repository at r.root.
|
|
func (r *repo) init() error {
|
|
canReuse := true
|
|
if _, err := os.Stat(filepath.Join(r.root, "FETCH_HEAD")); err != nil {
|
|
canReuse = false
|
|
r.logf("can't reuse git dir, no FETCH_HEAD: %v", err)
|
|
}
|
|
if canReuse {
|
|
r.setStatus("reusing git dir; running git fetch")
|
|
_, _, err := r.runGitLogged("fetch", "--prune", "origin")
|
|
if err != nil {
|
|
canReuse = false
|
|
r.logf("git fetch failed; proceeding to wipe + clone instead")
|
|
}
|
|
}
|
|
if !canReuse {
|
|
r.setStatus("need clone; removing cache root")
|
|
os.RemoveAll(r.root)
|
|
_, _, err := r.runGitLogged("clone", "--mirror", r.url, r.root)
|
|
if err != nil {
|
|
return fmt.Errorf("cloning %s: %v", r.url, err)
|
|
}
|
|
r.setStatus("cloned")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *repo) runGitLogged(args ...string) ([]byte, []byte, error) {
|
|
start := time.Now()
|
|
r.logf("running git %s", args)
|
|
stdout, stderr, err := r.runGitQuiet(args...)
|
|
if err == nil {
|
|
r.logf("ran git %s in %v", args, time.Since(start))
|
|
} else {
|
|
r.logf("git %s failed after %v: %v\nstderr: %v\n", args, time.Since(start), err, string(stderr))
|
|
}
|
|
return stdout, stderr, err
|
|
}
|
|
|
|
func (r *repo) runGitQuiet(args ...string) ([]byte, []byte, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
|
|
cmd := exec.Command("git", args...)
|
|
if args[0] == "clone" {
|
|
// Small hack: if we're cloning, the root doesn't exist yet.
|
|
envutil.SetDir(cmd, "/")
|
|
} else {
|
|
envutil.SetDir(cmd, r.root)
|
|
envutil.SetEnv(cmd, "GIT_DIR="+r.root)
|
|
}
|
|
envutil.SetEnv(cmd, "HOME="+r.mirror.homeDir)
|
|
cmd.Stdout, cmd.Stderr = stdout, stderr
|
|
err := runCmdContext(ctx, cmd)
|
|
return stdout.Bytes(), stderr.Bytes(), err
|
|
}
|
|
|
|
func (r *repo) setErr(err error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
change := (r.err != nil) != (err != nil)
|
|
now := time.Now()
|
|
if err != nil {
|
|
if change {
|
|
r.firstBad = now
|
|
}
|
|
r.lastBad = now
|
|
} else {
|
|
if change {
|
|
r.firstGood = now
|
|
}
|
|
r.lastGood = now
|
|
}
|
|
r.err = err
|
|
}
|
|
|
|
var startTime = time.Now()
|
|
|
|
func (r *repo) statusLine() string {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if r.lastGood.IsZero() {
|
|
if r.err != nil {
|
|
return fmt.Sprintf("broken; permanently? always failing, for %v", time.Since(r.firstBad))
|
|
}
|
|
if time.Since(startTime) < 5*time.Minute {
|
|
return "ok; starting up, no report yet"
|
|
}
|
|
return fmt.Sprintf("hung; hang at start-up? no report since start %v ago", time.Since(startTime))
|
|
}
|
|
if r.err == nil {
|
|
if sinceGood := time.Since(r.lastGood); sinceGood > 6*time.Minute {
|
|
return fmt.Sprintf("hung? no activity since last success %v ago", sinceGood)
|
|
}
|
|
if r.lastBad.After(time.Now().Add(-1 * time.Hour)) {
|
|
return fmt.Sprintf("ok; recent failure %v ago", time.Since(r.lastBad))
|
|
}
|
|
return "ok"
|
|
}
|
|
return fmt.Sprintf("broken for %v", time.Since(r.lastGood))
|
|
}
|
|
|
|
func (r *repo) setStatus(status string) {
|
|
r.status.add(status)
|
|
}
|
|
|
|
func (r *repo) addRemote(name, url, pushOption string) error {
|
|
r.dests = append(r.dests, remote{
|
|
name: name,
|
|
pushOption: pushOption,
|
|
})
|
|
if err := os.MkdirAll(filepath.Join(r.root, "remotes"), 0777); err != nil {
|
|
return err
|
|
}
|
|
// We want to include only the refs/heads/* and refs/tags/* namespaces
|
|
// in the mirrors. They correspond to published branches and tags.
|
|
// Leave out internal Gerrit namespaces such as refs/changes/*,
|
|
// refs/users/*, etc., because they're not helpful on other hosts.
|
|
remote := "URL: " + url + "\n" +
|
|
"Push: +refs/heads/*:refs/heads/*\n" +
|
|
"Push: +refs/tags/*:refs/tags/*\n"
|
|
return os.WriteFile(filepath.Join(r.root, "remotes", name), []byte(remote), 0777)
|
|
}
|
|
|
|
// loop continuously runs "git fetch" in the repo, checks for new
|
|
// commits and mirrors commits to a destination repo (if enabled).
|
|
func (r *repo) loop() {
|
|
for {
|
|
if err := r.loopOnce(); err != nil {
|
|
time.Sleep(10 * time.Second * time.Duration(r.mirror.timeoutScale))
|
|
continue
|
|
}
|
|
|
|
// We still run a timer but a very slow one, just
|
|
// in case the mechanism updating the repo tickler
|
|
// breaks for some reason.
|
|
timer := time.NewTimer(5 * time.Minute)
|
|
select {
|
|
case <-r.changed:
|
|
r.setStatus("got update tickle")
|
|
timer.Stop()
|
|
case <-timer.C:
|
|
r.setStatus("poll timer fired")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *repo) loopOnce() error {
|
|
if err := r.fetch(); err != nil {
|
|
r.logf("fetch failed: %v", err)
|
|
r.setErr(err)
|
|
return err
|
|
}
|
|
for _, dest := range r.dests {
|
|
if err := r.push(dest); err != nil {
|
|
r.logf("push failed: %v", err)
|
|
r.setErr(err)
|
|
return err
|
|
}
|
|
}
|
|
r.setErr(nil)
|
|
r.setStatus("waiting")
|
|
return nil
|
|
}
|
|
|
|
func (r *repo) logf(format string, args ...interface{}) {
|
|
log.Printf(r.name+": "+format, args...)
|
|
}
|
|
|
|
// fetch runs "git fetch" in the repository root.
|
|
// It tries three times, just in case it failed because of a transient error.
|
|
func (r *repo) fetch() error {
|
|
err := r.try(3, func(attempt int) error {
|
|
r.setStatus(fmt.Sprintf("running git fetch origin, attempt %d", attempt))
|
|
if _, stderr, err := r.runGitLogged("fetch", "--prune", "origin"); err != nil {
|
|
return fmt.Errorf("%v\n\n%s", err, stderr)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
r.setStatus("git fetch failed")
|
|
} else {
|
|
r.setStatus("ran git fetch")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// push runs "git push -f --mirror dest" in the repository root.
|
|
// It tries three times, just in case it failed because of a transient error.
|
|
func (r *repo) push(dest remote) error {
|
|
err := r.try(3, func(attempt int) error {
|
|
r.setStatus(fmt.Sprintf("syncing to %v, attempt %d", dest, attempt))
|
|
args := []string{"push", "-f", "--mirror"}
|
|
if dest.pushOption != "" {
|
|
args = append(args, "--push-option", dest.pushOption)
|
|
}
|
|
args = append(args, dest.name)
|
|
if _, stderr, err := r.runGitLogged(args...); err != nil {
|
|
return fmt.Errorf("%v\n\n%s", err, stderr)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
r.setStatus("sync to " + dest.name + " failed")
|
|
} else {
|
|
r.setStatus("did sync to " + dest.name)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (r *repo) fetchRevIfNeeded(ctx context.Context, rev string) error {
|
|
if _, _, err := r.runGitQuiet("cat-file", "-e", rev); err == nil {
|
|
return nil
|
|
}
|
|
r.logf("attempting to fetch missing revision %s from origin", rev)
|
|
_, _, err := r.runGitLogged("fetch", "origin", rev)
|
|
return err
|
|
}
|
|
|
|
// GET /<name>.tar.gz
|
|
// GET /debug/watcher/<name>
|
|
func (r *repo) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != "GET" && req.Method != "HEAD" {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
if strings.HasPrefix(req.URL.Path, "/debug/watcher/") {
|
|
r.serveStatus(w, req)
|
|
return
|
|
}
|
|
rev := req.FormValue("rev")
|
|
if rev == "" {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
|
defer cancel()
|
|
if err := r.fetchRevIfNeeded(ctx, rev); err != nil {
|
|
// Try the archive anyway, it might work
|
|
r.logf("error fetching revision %s: %v", rev, err)
|
|
}
|
|
tgz, _, err := r.runGitQuiet("archive", "--format=tgz", rev)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Length", strconv.Itoa(len(tgz)))
|
|
w.Header().Set("Content-Type", "application/x-compressed")
|
|
w.Write(tgz)
|
|
}
|
|
|
|
func (r *repo) serveStatus(w http.ResponseWriter, req *http.Request) {
|
|
w.Header().Set("Content-Type", "text/html")
|
|
fmt.Fprintf(w, "<html><head><title>watcher: %s</title><body><h1>watcher status for repo: %q</h1>\n",
|
|
r.name, r.name)
|
|
fmt.Fprintf(w, "<pre>\n")
|
|
nowRound := time.Now().Round(time.Second)
|
|
r.status.foreachDesc(func(ent statusEntry) {
|
|
fmt.Fprintf(w, "%v %-20s %v\n",
|
|
ent.t.In(time.UTC).Format(time.RFC3339),
|
|
nowRound.Sub(ent.t.Round(time.Second)).String()+" ago",
|
|
ent.status)
|
|
})
|
|
fmt.Fprintf(w, "\n</pre></body></html>")
|
|
}
|
|
|
|
func (r *repo) try(n int, fn func(attempt int) error) error {
|
|
var err error
|
|
for tries := 0; tries < n; tries++ {
|
|
time.Sleep(time.Duration(tries) * 5 * time.Second * time.Duration(r.mirror.timeoutScale)) // Linear back-off.
|
|
if err = fn(tries); err == nil {
|
|
break
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (m *gitMirror) notifyChanged(name string) {
|
|
repo, ok := m.repos[name]
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case repo.changed <- true:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// pollGerritAndTickleLoop polls Gerrit's JSON meta URL of all its URLs
|
|
// and their current branch heads. When this sees that one has
|
|
// changed, it tickles the channel for that repo and wakes up its
|
|
// poller, if its poller is in a sleep.
|
|
func (m *gitMirror) pollGerritAndTickleLoop() {
|
|
last := map[string]string{} // repo -> last seen hash
|
|
for {
|
|
gerritRepos, err := m.gerritMetaMap()
|
|
if err != nil {
|
|
log.Printf("pollGerritAndTickle: gerritMetaMap failed, skipping: %v", err)
|
|
gerritRepos = nil
|
|
}
|
|
for repo, hash := range gerritRepos {
|
|
if hash != last[repo] {
|
|
last[repo] = hash
|
|
m.notifyChanged(repo)
|
|
}
|
|
}
|
|
time.Sleep(*flagPollInterval)
|
|
}
|
|
}
|
|
|
|
// subscribeToMaintnerAndTickleLoop subscribes to maintner.golang.org
|
|
// and watches for any ref changes in realtime.
|
|
func (m *gitMirror) subscribeToMaintnerAndTickleLoop() {
|
|
for {
|
|
if err := m.subscribeToMaintnerAndTickle(); err != nil {
|
|
log.Printf("maintner loop: %v; retrying in 30 seconds", err)
|
|
time.Sleep(30 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *gitMirror) subscribeToMaintnerAndTickle() error {
|
|
ctx := context.Background()
|
|
retryTicker := time.NewTicker(10 * time.Second)
|
|
defer retryTicker.Stop() // we never return, though
|
|
for {
|
|
err := maintner.TailNetworkMutationSource(ctx, godata.Server, func(e maintner.MutationStreamEvent) error {
|
|
if e.Mutation != nil && e.Mutation.Gerrit != nil {
|
|
gm := e.Mutation.Gerrit
|
|
if strings.HasPrefix(gm.Project, "go.googlesource.com/") {
|
|
proj := strings.TrimPrefix(gm.Project, "go.googlesource.com/")
|
|
log.Printf("maintner refs for %s changed", gm.Project)
|
|
m.notifyChanged(proj)
|
|
}
|
|
}
|
|
return e.Err
|
|
})
|
|
log.Printf("maintner tail error: %v; sleeping+restarting", err)
|
|
|
|
// prevent retry looping faster than once every 10
|
|
// seconds; but usually retry immediately in the case
|
|
// where we've been running for a while already.
|
|
<-retryTicker.C
|
|
}
|
|
}
|
|
|
|
// gerritMetaMap returns the map from repo name (e.g. "go") to its
|
|
// latest master hash.
|
|
func (m *gitMirror) gerritMetaMap() (map[string]string, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
projs, err := m.gerritClient.ListProjects(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("gerritClient.ListProjects: %v", err)
|
|
}
|
|
result := map[string]string{}
|
|
for _, p := range projs {
|
|
b, err := m.gerritClient.GetBranch(ctx, p.Name, "master")
|
|
if errors.Is(err, gerrit.ErrResourceNotExist) {
|
|
continue
|
|
} else if err != nil {
|
|
return nil, fmt.Errorf(`gerritClient.GetBranch(ctx, %q, "master"): %v`, p.Name, err)
|
|
}
|
|
result[p.Name] = b.Revision
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GET /debug/goroutines
|
|
func handleDebugGoroutines(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
buf := make([]byte, 1<<20)
|
|
w.Write(buf[:runtime.Stack(buf, true)])
|
|
}
|
|
|
|
// GET /debug/env
|
|
func handleDebugEnv(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
for _, kv := range os.Environ() {
|
|
fmt.Fprintf(w, "%s\n", kv)
|
|
}
|
|
}
|
|
|
|
// runCmdContext allows OS-specific overrides of process execution behavior.
|
|
// See runCmdContextLinux.
|
|
var runCmdContext = runCmdContextDefault
|
|
|
|
// runCmdContextDefault runs cmd controlled by ctx.
|
|
func runCmdContextDefault(ctx context.Context, cmd *exec.Cmd) error {
|
|
if err := cmd.Start(); err != nil {
|
|
return err
|
|
}
|
|
resChan := make(chan error, 1)
|
|
go func() {
|
|
resChan <- cmd.Wait()
|
|
}()
|
|
|
|
select {
|
|
case err := <-resChan:
|
|
return err
|
|
case <-ctx.Done():
|
|
}
|
|
// Canceled. Interrupt and see if it ends voluntarily.
|
|
cmd.Process.Signal(os.Interrupt)
|
|
select {
|
|
case <-resChan:
|
|
return ctx.Err()
|
|
case <-time.After(time.Second):
|
|
}
|
|
// Didn't shut down in response to interrupt. Kill it hard.
|
|
cmd.Process.Kill()
|
|
<-resChan
|
|
return ctx.Err()
|
|
}
|