maintnerd: subscribe to pubsubhelper

Change-Id: Ia76b53c6448bc92850d4a65c15247547b132860c
Reviewed-on: https://go-review.googlesource.com/39635
Reviewed-by: Kevin Burke <kev@inburke.com>
This commit is contained in:
Brad Fitzpatrick 2017-04-05 23:30:39 +00:00
Родитель d5a7b1c5d7
Коммит da737d3d09
5 изменённых файлов: 136 добавлений и 7 удалений

Просмотреть файл

@ -338,6 +338,7 @@ func (gp *GerritProject) sync(ctx context.Context, loop bool) error {
gp.logf("init: %v", err)
return err
}
activityCh := gp.gerrit.c.activityChan("gerrit:" + gp.proj)
for {
if err := gp.syncOnce(ctx); err != nil {
gp.logf("sync: %v", err)
@ -346,10 +347,14 @@ func (gp *GerritProject) sync(ctx context.Context, loop bool) error {
if !loop {
return nil
}
timer := time.NewTimer(15 * time.Minute)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-time.After(1 * time.Minute):
case <-activityCh:
timer.Stop()
case <-timer.C:
}
}
}

Просмотреть файл

@ -1181,17 +1181,21 @@ func (gr *GitHubRepo) sync(ctx context.Context, tokenFile string, loop bool) err
gr: gr,
ghc: github.NewClient(hc),
}
activityCh := gr.github.c.activityChan("github:" + gr.id.String())
for {
err := p.sync(ctx)
if err == context.Canceled || !loop {
return err
}
p.logf("sync = %v; sleeping", err)
timer := time.NewTimer(15 * time.Minute)
select {
case <-time.After(30 * time.Second):
continue
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-activityCh:
timer.Stop()
case <-timer.C:
}
}
}

Просмотреть файл

@ -40,6 +40,9 @@ type Corpus struct {
debug bool
strIntern map[string]string // interned strings, including binary githashes
// pubsub:
activityChans map[string]chan struct{} // keyed by topic
// github-specific
github *GitHub
gerrit *Gerrit

Просмотреть файл

@ -31,10 +31,10 @@ var (
watchGithub = flag.String("watch-github", "", "Comma-separated list of owner/repo pairs to slurp")
// TODO: specify gerrit auth via gitcookies or similar
watchGerrit = flag.String("watch-gerrit", "", `Comma-separated list of Gerrit projects to watch, each of form "hostname/project" (e.g. "go.googlesource.com/go")`)
config = flag.String("config", "", "If non-empty, the name of a pre-defined config. Currently only 'go' is recognized.")
dataDir = flag.String("data-dir", "", "Local directory to write protobuf files to (default $HOME/var/maintnerd)")
debug = flag.Bool("debug", false, "Print debug logging information")
pubsub = flag.String("pubsub", "", "If non-empty, the golang.org/x/build/cmd/pubsubhelper URL scheme and hostname, without path")
config = flag.String("config", "", "If non-empty, the name of a pre-defined config. Currently only 'go' is recognized.")
dataDir = flag.String("data-dir", "", "Local directory to write protobuf files to (default $HOME/var/maintnerd)")
debug = flag.Bool("debug", false, "Print debug logging information")
)
func init() {
@ -129,6 +129,9 @@ func main() {
return
}
if *pubsub != "" {
corpus.StartPubSubHelperSubscribe(*pubsub)
}
log.Fatalf("Corpus.SyncLoop = %v", corpus.SyncLoop(ctx))
}
@ -139,6 +142,7 @@ func setGoConfig() {
if *watchGerrit != "" {
log.Fatalf("can't set both --config and --watch-gerrit")
}
*pubsub = "https://pubsubhelper.golang.org"
*watchGithub = "golang/go"
gerrc := gerrit.NewClient("https://go-review.googlesource.com/", gerrit.NoAuth)

113
maintner/subscribe.go Normal file
Просмотреть файл

@ -0,0 +1,113 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package maintner
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"golang.org/x/build/cmd/pubsubhelper/pubsubtypes"
)
func (c *Corpus) activityChan(topic string) chan struct{} {
c.mu.Lock()
defer c.mu.Unlock()
if ch, ok := c.activityChans[topic]; ok {
return ch
}
if c.activityChans == nil {
c.activityChans = map[string]chan struct{}{}
}
ch := make(chan struct{}) // unbuffered
c.activityChans[topic] = ch
return ch
}
func (c *Corpus) fire(topic string) {
ch := c.activityChan(topic)
select {
case ch <- struct{}{}:
log.Printf("Pubsub woke up sync for topic %q", topic)
default:
log.Printf("Pubsub event on topic %q discarded; already syncing?", topic)
}
}
// StartPubSubHelperSubscribe starts subscribing to a
// golang.org/x/build/cmd/pubsubhelper server, such
// as https://pubsubhelper.golang.org
func (c *Corpus) StartPubSubHelperSubscribe(urlBase string) {
go c.subscribeLoop(urlBase)
}
func (c *Corpus) subscribeLoop(urlBase string) {
var after time.Time
for {
newAfter, err := c.getEvent(urlBase, after)
if err != nil {
log.Printf("pubsub subscribe: %v", err)
time.Sleep(5 * time.Second)
continue
}
after = newAfter
}
}
var zt time.Time // a zero time.Time
func (c *Corpus) getEvent(urlBase string, after time.Time) (newAfter time.Time, err error) {
var afterStr string
if !after.IsZero() {
afterStr = after.UTC().Format(time.RFC3339Nano)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
req, _ := http.NewRequest("GET", urlBase+"/waitevent?after="+afterStr, nil)
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return zt, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return zt, errors.New(res.Status)
}
var evt pubsubtypes.Event
if err := json.NewDecoder(res.Body).Decode(&evt); err != nil {
return zt, err
}
if !evt.LongPollTimeout {
got, _ := json.MarshalIndent(evt, "", "\t")
log.Printf("Got pubsubhelper event: %s", got)
if gh := evt.GitHub; gh != nil {
topic := "github:" + gh.RepoOwner + "/" + gh.Repo
c.fire(topic)
}
if gr := evt.Gerrit; gr != nil {
c.fire(gerritTopicOfEvent(gr))
}
}
return evt.Time.Time(), nil
}
// Return topics like "gerrit:go.googlesource.com/build"
func gerritTopicOfEvent(gr *pubsubtypes.GerritEvent) string {
server := gr.URL // "https://code-review.googlesource.com/11970
if i := strings.Index(server, "//"); i != -1 {
server = server[i+2:] // code-review.googlesource.com/11970
}
if i := strings.Index(server, "/"); i != -1 {
server = server[:i] // code-review.googlesource.com
}
server = strings.Replace(server, "-review.googlesource.com", ".googlesource.com", 1)
return fmt.Sprintf("gerrit:%s/%s", server, gr.Project)
}