maintner, maintner/maintnerd: support long polling for log changes

Addresses a TODO in the code, and removes the old 2 second poll loop.

Change-Id: Id698307bd7404e8ca3946fa16621674cca2eca6b
Reviewed-on: https://go-review.googlesource.com/42871
Reviewed-by: Kevin Burke <kev@inburke.com>
This commit is contained in:
Brad Fitzpatrick 2017-05-05 13:23:37 -07:00
Родитель d5f9cf81ad
Коммит 0d89bab264
6 изменённых файлов: 223 добавлений и 73 удалений

Просмотреть файл

@ -28,9 +28,21 @@ RUN go get -d github.com/golang/protobuf/proto && \
RUN go get -d cloud.google.com/go/storage && \
cd /go/src/cloud.google.com/go && git reset --hard 2f1da5d762c81a12c516bfb8a9ede96f42750361
RUN go get -d golang.org/x/crypto/acme/autocert && \
RUN go get golang.org/x/crypto/acme/autocert && \
cd /go/src/golang.org/x/crypto && git reset --hard c7af5bf2638a1164f2eb5467c39c6cffbd13a02e
# Optimization to speed COPY+go install steps later. This go install
# isn't required for correctness.
RUN go install golang.org/x/oauth2 \
golang.org/x/net/context \
github.com/google/go-github/github \
github.com/gregjones/httpcache \
go4.org/types \
golang.org/x/sync/errgroup \
github.com/golang/protobuf/proto \
cloud.google.com/go/storage \
golang.org/x/crypto/acme/autocert
COPY . /go/src/golang.org/x/build/
RUN go install -ldflags "-linkmode=external -extldflags '-static -pthread'" golang.org/x/build/maintner/maintnerd

Просмотреть файл

@ -45,6 +45,7 @@ type gcsLog struct {
bucket *storage.BucketHandle
mu sync.Mutex // guards the following
cond *sync.Cond
seg map[int]gcsLogSegment
curNum int
logBuf bytes.Buffer
@ -67,17 +68,25 @@ func (s gcsLogSegment) String() string {
return fmt.Sprintf("{gcsLogSegment num=%v, size=%v, sha=%v, created=%v}", s.num, s.size, s.sha224, s.created.Format(time.RFC3339))
}
// newGCSLogBase returns a new gcsLog instance without any association
// with Google Cloud Storage.
func newGCSLogBase() *gcsLog {
gl := &gcsLog{
seg: map[int]gcsLogSegment{},
}
gl.cond = sync.NewCond(&gl.mu)
return gl
}
func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {
sc, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("storage.NewClient: %v", err)
}
gl := &gcsLog{
sc: sc,
bucketName: bucketName,
bucket: sc.Bucket(bucketName),
seg: map[int]gcsLogSegment{},
}
gl := newGCSLogBase()
gl.sc = sc
gl.bucketName = bucketName
gl.bucket = sc.Bucket(bucketName)
if err := gl.initLoad(ctx); err != nil {
return nil, err
}
@ -192,16 +201,95 @@ func (gl *gcsLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
startSeg, _ := strconv.Atoi(r.FormValue("startseg"))
if startSeg < 0 {
http.Error(w, "bad seg", http.StatusBadRequest)
http.Error(w, "bad startseg", http.StatusBadRequest)
return
}
// Long poll if request contains non-zero waitsizenot parameter.
// The client's provided 'waitsizenot' value is the sum of the segment
// sizes they already know. They're waiting for something new.
if s := r.FormValue("waitsizenot"); s != "" {
oldSize, err := strconv.ParseInt(s, 10, 64)
if err != nil || oldSize < 0 {
http.Error(w, "bad waitsizenot", http.StatusBadRequest)
return
}
// Return a 304 if there's no activity in just under a minute.
// This keeps some occasional activity on the TCP connection
// so we (and any proxies) know it's alive, and can fit
// within reason read/write deadlines on either side.
ctx, cancel := context.WithTimeout(r.Context(), 55*time.Second)
defer cancel()
changed := gl.waitSizeNot(ctx, oldSize)
if !changed {
w.WriteHeader(http.StatusNotModified)
return
}
}
segs := gl.getJSONLogs(startSeg)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Sum-Segment-Size", fmt.Sprint(sumSegmentSizes(segs)))
body, _ := json.MarshalIndent(segs, "", "\t")
w.Write(body)
}
// sumSegmentSizes returns the sum of each seg.Size in segs.
func sumSegmentSizes(segs []maintner.LogSegmentJSON) (sum int64) {
for _, seg := range segs {
sum += seg.Size
}
return sum
}
// waitSizeNot blocks until the sum of gcsLog is not v, or the context expires.
// It reports whether the size changed.
func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
returned := make(chan struct{})
defer close(returned)
go gl.waitSizeNotAwaitContextOrChange(ctx, returned)
gl.mu.Lock()
defer gl.mu.Unlock()
for {
if curSize := gl.sumSizeLocked(); curSize != v {
log.Printf("waitSize fired. from %d => %d", v, curSize)
return true
}
select {
case <-ctx.Done():
return false
default:
gl.cond.Wait()
}
}
}
// waitSizeNotAwaitContextOrChange is part of waitSizeNot.
// It's a goroutine that selects on two channels and calls
// sync.Cond.Broadcast to wake up the waitSizeNot waiter if the
// context expires.
func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
select {
case <-ctx.Done():
gl.cond.Broadcast()
case <-returned:
// No need to do a wakeup. Caller is already gone.
}
}
func (gl *gcsLog) sumSizeLocked() int64 {
var sum int64
for n, seg := range gl.seg {
if n != gl.curNum {
sum += seg.size
}
}
sum += int64(gl.logBuf.Len())
return sum
}
func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
gl.mu.Lock()
defer gl.mu.Unlock()
@ -272,6 +360,7 @@ func (gl *gcsLog) Log(m *maintpb.Mutation) error {
if err := reclog.WriteRecord(gcsLogWriter{gl}, int64(gl.logBuf.Len()), data); err != nil {
return err
}
gl.cond.Broadcast() // wake any long-polling subscribers
// Otherwise schedule a periodic flush.
if gl.flushTimer == nil {

Просмотреть файл

@ -0,0 +1,44 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"context"
"testing"
"time"
"golang.org/x/build/maintner/maintpb"
)
func TestGCSLogWakeup_Timeout(t *testing.T) {
testGCSLogWakeup(t, false)
}
func TestGCSLogWakeup_Activity(t *testing.T) {
testGCSLogWakeup(t, true)
}
func testGCSLogWakeup(t *testing.T, activity bool) {
gl := newGCSLogBase()
waitc := make(chan bool, 1)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go func() {
waitc <- gl.waitSizeNot(ctx, 0)
}()
if activity {
if err := gl.Log(new(maintpb.Mutation)); err != nil {
t.Fatal(err)
}
}
select {
case got := <-waitc:
if got != activity {
t.Errorf("changed = %v; want %v", got, activity)
}
case <-time.After(2 * time.Second):
t.Errorf("timeout")
}
}

Просмотреть файл

@ -186,7 +186,19 @@ func main() {
http.NotFound(w, r)
return
}
io.WriteString(w, "<html><body>This is <a href='https://godoc.org/golang.org/x/build/maintner/maintnerd'>maintnerd</a>, the <a href='https://godoc.org/golang.org/x/build/maintner'>maintner</a> server.</body>")
io.WriteString(w, `<html>
<body>
<p>
This is <a href='https://godoc.org/golang.org/x/build/maintner/maintnerd'>maintnerd</a>,
the <a href='https://godoc.org/golang.org/x/build/maintner'>maintner</a> server.
See the <a href='https://godoc.org/golang.org/x/build/maintner/godata'>godata package</a> for
a client.
</p>
<ul>
<li><a href='/logs'>/logs</a>
</ul>
</body></html>
`)
})
errc := make(chan error)

Просмотреть файл

@ -47,10 +47,10 @@ type netMutSource struct {
last []fileSeg
// Hooks for testing. If nil, unused:
testHookGetServerSegments func(context.Context) ([]LogSegmentJSON, error)
testHookWaitForServerSegmentUpdate func(context.Context) error
testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, error)
testHookFilePrefixSum224 func(file string, n int64) string
testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
testHookWaitAfterServerDupData func(context.Context) error
testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, error)
testHookFilePrefixSum224 func(file string, n int64) string
}
func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
@ -69,60 +69,54 @@ func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamE
return ch
}
func (ns *netMutSource) waitForServerSegmentUpdate(ctx context.Context) error {
if fn := ns.testHookWaitForServerSegmentUpdate; fn != nil {
return fn(ctx)
}
// TODO: few second sleep is dumb. make it
// subscribe to pubsubhelper? maybe the
// server's response header should reference
// its pubsubhelper server URL. but then we
// can't assume activity means it'll be picked
// up right away. so maybe wait for activity,
// and then poll every second for 10 seconds
// or so, or until there's changes, and then
// go back to every 2 second polling or
// something. or maybe the maintnerd server should
// have its own long poll functionality.
// for now, just 2 second polling:
select {
case <-time.After(2 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (ns *netMutSource) getServerSegments(ctx context.Context) ([]LogSegmentJSON, error) {
// waitSizeNot optionally specifies that the request should long-poll waiting for the server
// to have a sum of log segment sizes different than the value specified.
func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
if fn := ns.testHookGetServerSegments; fn != nil {
return fn(ctx)
return fn(ctx, waitSizeNot)
}
req, err := http.NewRequest("GET", ns.server, nil)
if err != nil {
return nil, err
logsURL := ns.server
if waitSizeNot > 0 {
logsURL += fmt.Sprintf("?waitsizenot=%d", waitSizeNot)
}
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
for {
req, err := http.NewRequest("GET", logsURL, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
// If we're doing a long poll and the server replies
// with a 304 response, that means the server is just
// heart-beating us and trying to get a response back
// within its various deadlines. But we should just
// try again.
if waitSizeNot > 0 && res.StatusCode == http.StatusNotModified {
res.Body.Close()
continue
}
defer res.Body.Close()
if res.StatusCode != 200 {
return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
}
var segs []LogSegmentJSON
err = json.NewDecoder(res.Body).Decode(&segs)
if err != nil {
return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
}
return segs, nil
}
defer res.Body.Close()
if res.StatusCode != 200 {
return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
}
var segs []LogSegmentJSON
err = json.NewDecoder(res.Body).Decode(&segs)
if err != nil {
return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
}
return segs, nil
}
func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
for {
segs, err := ns.getServerSegments(ctx)
sumLast := sumSegSize(ns.last)
segs, err := ns.getServerSegments(ctx, sumLast)
if err != nil {
return nil, err
}
@ -137,16 +131,25 @@ func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
}
fileSegs = append(fileSegs, fileSeg)
}
sumLast := sumSegSize(ns.last)
sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
if sumLast != sumCommon {
return nil, ErrSplit
}
sumCur := sumSegSize(fileSegs)
if sumCommon == sumCur {
// Nothing new. Wait.
if err := ns.waitForServerSegmentUpdate(ctx); err != nil {
return nil, err
// Nothing new. This shouldn't happen once the
// server is updated to respect the
// "?waitsizenot=NNN" long polling parameter.
// But keep this brief pause as a backup to
// prevent spinning and because clients &
// servers won't be updated simultaneously.
if ns.testHookGetServerSegments == nil {
log.Printf("maintner.netsource: server returned unchanged log segments; old server?")
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(1 * time.Second):
}
continue
}

Просмотреть файл

@ -155,7 +155,6 @@ func TestGetNewSegments(t *testing.T) {
// If empty, prefixSum calls are errors.
prefixSum string
wantWaits int
want []fileSeg
wantSplit bool
}
@ -240,7 +239,6 @@ func TestGetNewSegments(t *testing.T) {
{Number: 2, Size: 102, SHA224: "def"},
},
},
wantWaits: 1,
want: []fileSeg{
{seg: 2, size: 102, sha224: "def", skip: 0, file: "/fake/0002.mutlog"},
},
@ -305,7 +303,7 @@ func TestGetNewSegments(t *testing.T) {
waits := 0
ns := &netMutSource{
last: tt.lastSegs,
testHookGetServerSegments: func(context.Context) (segs []LogSegmentJSON, err error) {
testHookGetServerSegments: func(_ context.Context, waitSizeNot int64) (segs []LogSegmentJSON, err error) {
serverSegCalls++
if serverSegCalls > 10 {
t.Fatalf("infinite loop calling getServerSegments? num wait calls = %v", waits)
@ -319,10 +317,6 @@ func TestGetNewSegments(t *testing.T) {
}
return segs, nil
},
testHookWaitForServerSegmentUpdate: func(context.Context) error {
waits++
return nil
},
testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, error) {
return fileSeg{
seg: seg.Number,
@ -353,10 +347,6 @@ func TestGetNewSegments(t *testing.T) {
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("mismatch\n got: %+v\nwant: %+v\n", got, tt.want)
}
if tt.wantWaits != waits {
t.Errorf("wait calls = %v; want %v", waits, tt.wantWaits)
}
})
}
}