maintner: support for updating corpus in-place from its mutation source

And update godata package docs.

Change-Id: I781e50b20dfa1494fa7d65400cff8a2637ecf9e0
Reviewed-on: https://go-review.googlesource.com/42174
Reviewed-by: Kevin Burke <kev@inburke.com>
This commit is contained in:
Brad Fitzpatrick 2017-04-29 21:15:37 +00:00
Родитель 04f8c525c7
Коммит 1a1ef8e92a
12 изменённых файлов: 722 добавлений и 124 удалений

Просмотреть файл

@ -66,29 +66,14 @@ func main() {
log.Fatal(err)
}
bot := &gopherbot{ghc: ghc}
bot.initCorpus()
ctx := context.Background()
corpus, err := godata.Get(ctx)
if err != nil {
log.Fatal(err)
}
repo := corpus.GitHub().Repo("golang", "go")
if repo == nil {
log.Fatal("Failed to find Go repo.")
}
bot := &gopherbot{
ghc: ghc,
corpus: corpus,
gorepo: repo,
}
for {
var nextLoop time.Time
err := bot.doTasks(ctx)
if err != nil {
log.Print(err)
nextLoop = time.Now().Add(30 * time.Second)
}
if !*daemon {
if err != nil {
@ -96,15 +81,25 @@ func main() {
}
return
}
// TODO: if err != nil, pass a ctx with 30s timeout and retry the doTasks.
// Maybe use a better ctx above too.
if err := corpus.Update(ctx); err != nil {
log.Fatalf("corpus.Update: %v", err)
if err != nil {
log.Printf("sleeping 30s after previous error.")
time.Sleep(30 * time.Second)
}
if nextLoop.After(time.Now()) {
sleep := time.Until(nextLoop)
log.Printf("Sleeping for %v after previous error.", sleep)
time.Sleep(sleep)
for {
t0 := time.Now()
err := bot.corpus.Update(ctx)
if err != nil {
if err == maintner.ErrSplit {
log.Print("Corpus out of sync. Re-fetching corpus.")
bot.initCorpus()
} else {
log.Printf("corpus.Update: %v; sleeping 15s", err)
time.Sleep(15 * time.Second)
continue
}
}
log.Printf("got corpus update after %v", time.Since(t0))
break
}
}
}
@ -130,6 +125,22 @@ var tasks = []struct {
{"check cherry picks", (*gopherbot).checkCherryPicks},
}
func (b *gopherbot) initCorpus() {
ctx := context.Background()
corpus, err := godata.Get(ctx)
if err != nil {
log.Fatalf("godata.Get: %v", err)
}
repo := corpus.GitHub().Repo("golang", "go")
if repo == nil {
log.Fatal("Failed to find Go repo in Corpus.")
}
b.corpus = corpus
b.gorepo = repo
}
func (b *gopherbot) doTasks(ctx context.Context) error {
for _, task := range tasks {
if err := task.fn(b, ctx); err != nil {

Просмотреть файл

@ -222,10 +222,15 @@ type watchedGerritRepo struct {
project *GerritProject
}
// AddGerrit adds the Gerrit project with the given project to the corpus.
// TrackGerrit registers the Gerrit project with the given project as a project
// to watch and append to the mutation log. Only valid in leader mode.
// The provided string should be of the form "hostname/project", without a scheme
// or trailing slash.
func (c *Corpus) AddGerrit(gerritProj string) {
func (c *Corpus) TrackGerrit(gerritProj string) {
if c.mutationLogger == nil {
panic("can't TrackGerrit in non-leader mode")
}
c.mu.Lock()
defer c.mu.Unlock()

Просмотреть файл

@ -40,8 +40,9 @@ var statusTests = []struct {
}
func TestGetGerritStatus(t *testing.T) {
c := NewCorpus(&dummyMutationLogger{}, "")
c.AddGerrit("go.googlesource.com/build")
var c Corpus
c.EnableLeaderMode(new(dummyMutationLogger), "/fake/dir")
c.TrackGerrit("go.googlesource.com/build")
gp := c.gerrit.projects["go.googlesource.com/build"]
for _, tt := range statusTests {
gc := &GitCommit{Msg: tt.msg}

Просмотреть файл

@ -13,7 +13,6 @@ import (
"fmt"
"log"
"os/exec"
"sort"
"strconv"
"strings"
"time"
@ -419,42 +418,3 @@ func (c *Corpus) gitLocation(v []byte) *time.Location {
c.zoneCache[s] = loc
return loc
}
type FileCount struct {
File string
Count int
}
// queryFrequentlyModifiedFiles is an example query just for fun.
// It is not currently used by anything.
func (c *Corpus) QueryFrequentlyModifiedFiles(topN int) []FileCount {
c.mu.RLock()
defer c.mu.RUnlock()
n := map[string]int{} // file -> count
for _, gc := range c.gitCommit {
for _, f := range gc.Files {
n[modernizeFilename(f.File)]++
}
}
files := make([]FileCount, 0, len(n))
for file, count := range n {
files = append(files, FileCount{file, count})
}
sort.Slice(files, func(i, j int) bool {
return files[i].Count > files[j].Count
})
if len(files) > topN {
files = files[:topN]
}
return files
}
func modernizeFilename(f string) string {
if strings.HasPrefix(f, "src/pkg/") {
f = "src/" + strings.TrimPrefix(f, "src/pkg/")
}
if strings.HasPrefix(f, "src/http/") {
f = "src/net/http/" + strings.TrimPrefix(f, "src/http/")
}
return f
}

Просмотреть файл

@ -55,6 +55,28 @@ type GitHub struct {
repos map[GithubRepoID]*GitHubRepo
}
// ForeachRepo calls fn serially for each GithubRepo, stopping if fn
// returns an error. The function is called with lexically increasing
// repo IDs.
func (g *GitHub) ForeachRepo(fn func(*GitHubRepo) error) error {
var ids []GithubRepoID
for id := range g.repos {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool {
if ids[i].Owner < ids[i].Owner {
return true
}
return ids[i].Owner == ids[j].Owner && ids[i].Repo < ids[j].Repo
})
for _, id := range ids {
if err := fn(g.repos[id]); err != nil {
return err
}
}
return nil
}
// Repo returns the repo if it's known. Otherwise it returns nil.
func (g *GitHub) Repo(owner, repo string) *GitHubRepo {
return g.repos[GithubRepoID{owner, repo}]
@ -535,7 +557,14 @@ func (c *Corpus) initGithub() {
}
}
func (c *Corpus) AddGithub(owner, repo, token string) {
// TrackGithub registers the named Github repo as a repo to
// watch and append to the mutation log. Only valid in leader mode.
// The token is the auth token to use to make API calls.
func (c *Corpus) TrackGithub(owner, repo, token string) {
if c.mutationLogger == nil {
panic("can't TrackGerrit in non-leader mode")
}
c.mu.Lock()
defer c.mu.Unlock()
c.initGithub()

Просмотреть файл

@ -2,7 +2,10 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package godata loads the Go project's corpus of Git, Github, and Gerrit activity.
// Package godata loads the Go project's corpus of Git, Github, and
// Gerrit activity into memory to allow easy analysis without worrying
// about APIs and their pagination, quotas, and other nuisances and
// limitations.
package godata
import (
@ -16,7 +19,25 @@ import (
"golang.org/x/build/maintner"
)
// Get returns the Go project's corpus.
// Get returns the Go project's corpus, containing all Git commits,
// Github activity, and Gerrit activity and metadata since the
// beginning of the project.
//
// The initial call to Get will download approximately 350-400 MB of
// data into a directory "golang-maintner" under your operating
// system's user cache directory. Subsequent calls will only download
// what's changed since the previous call.
//
// Even with all the data already cached on local disk, a call to Get
// takes approximately 5 seconds to read the mutation log into memory.
// For daemons, use Corpus.Update to incrementally update an
// already-loaded Corpus.
//
// The in-memory representation is about 25% larger than its on-disk
// size. It's currently under 500 MB.
//
// See https://godoc.org/golang.org/x/build/maintner#Corpus for how
// to walk the data structure. Enjoy.
func Get(ctx context.Context) (*maintner.Corpus, error) {
targetDir := filepath.Join(xdgCacheDir(), "golang-maintner")
if err := os.MkdirAll(targetDir, 0700); err != nil {

Просмотреть файл

@ -26,7 +26,9 @@ type MutationLogger interface {
// DiskMutationLogger logs mutations to disk.
type DiskMutationLogger struct {
directory string
mu sync.RWMutex
mu sync.Mutex
done bool // true after first GetMutations
}
// NewDiskMutationLogger creates a new DiskMutationLogger, which will create
@ -58,8 +60,8 @@ func (d *DiskMutationLogger) Log(m *maintpb.Mutation) error {
}
func (d *DiskMutationLogger) ForeachFile(fn func(fullPath string, fi os.FileInfo) error) error {
d.mu.RLock()
defer d.mu.RUnlock()
d.mu.Lock()
defer d.mu.Unlock()
if d.directory == "" {
panic("empty directory")
}
@ -82,7 +84,21 @@ func (d *DiskMutationLogger) ForeachFile(fn func(fullPath string, fi os.FileInfo
}
func (d *DiskMutationLogger) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
d.mu.Lock()
wasDone := d.done
d.done = true
d.mu.Unlock()
if wasDone {
// TODO: support subsequent Update? for now we only
// support the initial loading. The network mutation
// source is the new implementation with Update
// support.
return nil
}
ch := make(chan MutationStreamEvent, 50) // buffered: overlap gunzip/unmarshal with loading
go func() {
err := d.ForeachFile(func(fullPath string, fi os.FileInfo) error {
return reclog.ForeachFileRecord(fullPath, func(off int64, hdr, rec []byte) error {

Просмотреть файл

@ -12,6 +12,7 @@ package maintner
import (
"context"
"errors"
"fmt"
"log"
"regexp"
@ -27,13 +28,13 @@ import (
// Corpus holds all of a project's metadata.
//
// There are two main phases to the Corpus: the catch-up phase, when the Corpus
// is populated from a MutationSource (disk, database), and the polling phase,
// when the Corpus polls for new events and stores/writes them to disk.
// Many public accessor methods are missing. File bugs at golang.org/issues/new.
type Corpus struct {
mutationLogger MutationLogger // non-nil when this is a self-updating corpus
mutationSource MutationSource // from Initialize
verbose bool
dataDir string
sawErrSplit bool
mu sync.RWMutex // guards all following fields
// corpus state:
@ -64,7 +65,9 @@ type polledGitCommits struct {
dir string
}
// EnableLeaderMode prepares c to be the leader.
// EnableLeaderMode prepares c to be the leader. This should only be
// called by the maintnerd process.
//
// The provided scratchDir will store git checkouts.
func (c *Corpus) EnableLeaderMode(logger MutationLogger, scratchDir string) {
c.mutationLogger = logger
@ -139,13 +142,13 @@ func (c *Corpus) debugf(format string, v ...interface{}) {
// TODO: figure out if this is accurate.
var gerritProjNameRx = regexp.MustCompile(`^[a-z0-9]+[a-z0-9\-\_]*$`)
// AddGoGitRepo registers a git directory to have its metadata slurped into the corpus.
// TrackGoGitRepo registers a git directory to have its metadata slurped into the corpus.
// The goRepo is a name like "go" or "net". The dir is a path on disk.
//
// TODO(bradfitz): this whole interface is temporary. Make this
// support any git repo and make this (optionally?) use the gitmirror
// service later instead of a separate copy on disk.
func (c *Corpus) AddGoGitRepo(goRepo, dir string) {
func (c *Corpus) TrackGoGitRepo(goRepo, dir string) {
if c.mutationLogger == nil {
panic("can't TrackGoGitRepo in non-leader mode")
}
if !gerritProjNameRx.MatchString(goRepo) {
panic(fmt.Sprintf("bogus goRepo value %q", goRepo))
}
@ -189,9 +192,47 @@ type MutationStreamEvent struct {
// MutationSource. It returns once it's up-to-date. To incrementally
// update it later, use the Update method.
func (c *Corpus) Initialize(ctx context.Context, src MutationSource) error {
if c.mutationSource != nil {
panic("duplicate call to Initialize")
}
c.mutationSource = src
log.Printf("Loading data from log %T ...", src)
return c.update(ctx)
}
// ErrSplit is returned when the the client notices the leader's
// mutation log has changed. This can happen if the leader restarts
// with uncommitted transactions. (The leader only commits mutations
// periodically.)
var ErrSplit = errors.New("maintner: leader server's history split, process out of sync")
// Update incrementally updates the corpus from its current state to
// the latest state from the MutationSource passed earlier to
// Initialize. It does not return until there's either a new change or
// the context expires.
// If Update returns ErrSplit, the corpus can longer be updated.
//
// Update must not be called concurrently with any other method or
// access of the corpus, including other Update calls.
func (c *Corpus) Update(ctx context.Context) error {
if c.mutationSource == nil {
panic("Update called with call to Initialize")
}
if c.sawErrSplit {
panic("Update called after previous Update call returned ErrSplit")
}
log.Printf("Updating data from log %T ...", c.mutationSource)
err := c.update(ctx)
if err == ErrSplit {
c.sawErrSplit = true
}
return err
}
func (c *Corpus) update(ctx context.Context) error {
src := c.mutationSource
ch := src.GetMutations(ctx)
done := ctx.Done()
log.Printf("Reloading data from log %T ...", src)
c.mu.Lock()
defer c.mu.Unlock()
for {
@ -202,7 +243,7 @@ func (c *Corpus) Initialize(ctx context.Context, src MutationSource) error {
return err
case e := <-ch:
if e.Err != nil {
log.Printf("Corpus.Initialize: %v", e.Err)
log.Printf("Corpus GetMutations: %v", e.Err)
return e.Err
}
if e.End {
@ -215,14 +256,6 @@ func (c *Corpus) Initialize(ctx context.Context, src MutationSource) error {
}
}
// Update incrementally updates the corpus from its current state to
// the latest state from the MutationSource passed earlier to
// Initialize. It does not return until there's either a new change or
// the context expires.
func (c *Corpus) Update(ctx context.Context) error {
panic("TODO")
}
// addMutation adds a mutation to the log and immediately processes it.
func (c *Corpus) addMutation(m *maintpb.Mutation) {
if c.verbose {

Просмотреть файл

@ -48,7 +48,7 @@ type mutationTest struct {
func (mt mutationTest) test(t *testing.T, muts ...*maintpb.Mutation) {
c := mt.corpus
if c == nil {
c = NewCorpus(&dummyMutationLogger{}, "")
c = new(Corpus)
}
for _, m := range muts {
c.processMutationLocked(m)
@ -74,7 +74,7 @@ func init() {
}
func TestProcessMutation_Github_NewIssue(t *testing.T) {
c := NewCorpus(&dummyMutationLogger{}, "")
c := new(Corpus)
github := &GitHub{c: c}
c.github = github
github.users = map[int64]*GitHubUser{
@ -113,7 +113,7 @@ func TestProcessMutation_Github_NewIssue(t *testing.T) {
}
func TestProcessMutation_Github(t *testing.T) {
c := NewCorpus(&dummyMutationLogger{}, "")
c := new(Corpus)
github := &GitHub{c: c}
c.github = github
github.repos = map[GithubRepoID]*GitHubRepo{
@ -175,7 +175,7 @@ func TestNewAssigneesHandlesNil(t *testing.T) {
}
func TestAssigneesDeleted(t *testing.T) {
c := NewCorpus(&dummyMutationLogger{}, "")
c := new(Corpus)
assignees := []*GitHubUser{u1, u2}
issue := &GitHubIssue{
Number: 3,

Просмотреть файл

@ -130,13 +130,13 @@ func main() {
if err != nil {
log.Fatalf("getting github token: %v", err)
}
corpus.AddGithub(splits[0], splits[1], token)
corpus.TrackGithub(splits[0], splits[1], token)
}
}
if *watchGerrit != "" {
for _, project := range strings.Split(*watchGerrit, ",") {
// token may be empty, that's OK.
corpus.AddGerrit(project)
corpus.TrackGerrit(project)
}
}

Просмотреть файл

@ -18,8 +18,9 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/proto"
"golang.org/x/build/maintner/maintpb"
"golang.org/x/build/maintner/reclog"
)
@ -42,6 +43,14 @@ type netMutSource struct {
server string
base *url.URL
cacheDir string
last []fileSeg
// Hooks for testing. If nil, unused:
testHookGetServerSegments func(context.Context) ([]LogSegmentJSON, error)
testHookWaitForServerSegmentUpdate func(context.Context) error
testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, error)
testHookFilePrefixSum224 func(file string, n int64) string
}
func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
@ -60,41 +69,184 @@ func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamE
return ch
}
func (ns *netMutSource) sendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
func (ns *netMutSource) waitForServerSegmentUpdate(ctx context.Context) error {
if fn := ns.testHookWaitForServerSegmentUpdate; fn != nil {
return fn(ctx)
}
// TODO: 5 second sleep is dumb. make it
// subscribe to pubsubhelper? maybe the
// server's response header should reference
// its pubsubhelper server URL. but then we
// can't assume activity means it'll be picked
// up right away. so maybe wait for activity,
// and then poll every second for 10 seconds
// or so, or until there's changes, and then
// go back to every 5 second polling or
// something. or maybe the maintnerd server should
// have its own long poll functionality.
// for now, just 5 second polling:
log.Printf("sleeping for 5s...")
select {
case <-time.After(5 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (ns *netMutSource) getServerSegments(ctx context.Context) ([]LogSegmentJSON, error) {
if fn := ns.testHookGetServerSegments; fn != nil {
return fn(ctx)
}
req, err := http.NewRequest("GET", ns.server, nil)
if err != nil {
return err
return nil, err
}
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
return nil, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("%s: %v", ns.server, res.Status)
return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
}
var segs []LogSegmentJSON
if err := json.NewDecoder(res.Body).Decode(&segs); err != nil {
return fmt.Errorf("decoding %s JSON: %v", ns.server, err)
err = json.NewDecoder(res.Body).Decode(&segs)
if err != nil {
return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
}
return segs, nil
}
// TODO: optimization: if already on GCE, skip sync to disk part and just
// read from network. fast & free network inside.
var fileSegs []fileSeg
for _, seg := range segs {
fileSeg, err := ns.syncSeg(ctx, seg)
func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
for {
segs, err := ns.getServerSegments(ctx)
if err != nil {
return fmt.Errorf("syncing segment %d: %v", seg.Number, err)
return nil, err
}
fileSegs = append(fileSegs, fileSeg)
// TODO: optimization: if already on GCE, skip sync to disk part and just
// read from network. fast & free network inside.
var fileSegs []fileSeg
for _, seg := range segs {
fileSeg, err := ns.syncSeg(ctx, seg)
if err != nil {
return nil, fmt.Errorf("syncing segment %d: %v", seg.Number, err)
}
fileSegs = append(fileSegs, fileSeg)
}
sumLast := sumSegSize(ns.last)
sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
if sumLast != sumCommon {
return nil, ErrSplit
}
sumCur := sumSegSize(fileSegs)
if sumCommon == sumCur {
// Nothing new. Wait.
if err := ns.waitForServerSegmentUpdate(ctx); err != nil {
return nil, err
}
continue
}
ns.last = fileSegs
newSegs := trimLeadingSegBytes(fileSegs, sumCommon)
return newSegs, nil
}
return foreachFileSeg(fileSegs, func(seg fileSeg) error {
}
func trimLeadingSegBytes(in []fileSeg, trim int64) []fileSeg {
// First trim off whole segments, sharing the same underlying memory.
for len(in) > 0 && trim >= in[0].size {
trim -= in[0].size
in = in[1:]
}
if len(in) == 0 {
return nil
}
// Now copy, since we'll be modifying the first element.
out := append([]fileSeg(nil), in...)
out[0].skip = trim
return out
}
// filePrefixSum224 returns the lowercase hex SHA-224 of the first n bytes of file.
func (ns *netMutSource) filePrefixSum224(file string, n int64) string {
if fn := ns.testHookFilePrefixSum224; fn != nil {
return fn(file, n)
}
f, err := os.Open(file)
if err != nil {
if !os.IsNotExist(err) {
log.Print(err)
}
return ""
}
defer f.Close()
h := sha256.New224()
_, err = io.CopyN(h, f, n)
if err != nil {
log.Print(err)
return ""
}
return fmt.Sprintf("%x", h.Sum(nil))
}
func sumSegSize(segs []fileSeg) (sum int64) {
for _, seg := range segs {
sum += seg.size
}
return
}
func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
for len(a) > 0 && len(b) > 0 {
sa, sb := a[0], b[0]
if sa.sha224 == sb.sha224 {
// Whole chunk in common.
sum += sa.size
a, b = a[1:], b[1:]
continue
}
if sa.size == sb.size {
// If they're the same size but different
// sums, it must've forked.
return
}
// See if one chunk is a prefix of the other.
// Make sa be the smaller one.
if sb.size < sa.size {
sa, sb = sb, sa
}
// Hash the beginning of the bigger size.
bPrefixSum := ns.filePrefixSum224(sb.file, sa.size)
if bPrefixSum == sa.sha224 {
sum += sa.size
}
break
}
return
}
func (ns *netMutSource) sendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
newSegs, err := ns.getNewSegments(ctx)
if err != nil {
return err
}
return foreachFileSeg(newSegs, func(seg fileSeg) error {
f, err := os.Open(seg.file)
if err != nil {
return err
}
defer f.Close()
if seg.skip > 0 {
if _, err := f.Seek(seg.skip, io.SeekStart); err != nil {
return err
}
}
return reclog.ForeachRecord(io.LimitReader(f, seg.size), func(off int64, hdr, rec []byte) error {
m := new(maintpb.Mutation)
if err := proto.Unmarshal(rec, m); err != nil {
@ -119,13 +271,21 @@ func foreachFileSeg(segs []fileSeg, fn func(seg fileSeg) error) error {
return nil
}
// TODO: add a constructor for this? or simplify it. make it Size +
// File + embedded LogSegmentJSON?
type fileSeg struct {
seg int
file string // full path
size int64
seg int
file string // full path
sha224 string
skip int64
size int64
}
func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (fileSeg, error) {
if fn := ns.testHookSyncSeg; fn != nil {
return fn(ctx, seg)
}
isFinalSeg := !strings.HasPrefix(seg.URL, "https://storage.googleapis.com/")
relURL, err := url.Parse(seg.URL)
if err != nil {
@ -138,7 +298,7 @@ func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (fileSe
// Do we already have it? Files named in their final form with the sha224 are considered
// complete and immutable.
if fi, err := os.Stat(frozen); err == nil && fi.Size() == seg.Size {
return fileSeg{seg.Number, frozen, fi.Size()}, nil
return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil
}
// See how much data we already have in the partial growing file.
@ -152,9 +312,9 @@ func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (fileSe
if err := os.Rename(partial, frozen); err != nil {
return fileSeg{}, err
}
return fileSeg{seg.Number, frozen, seg.Size}, nil
return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil
}
return fileSeg{seg.Number, partial, seg.Size}, nil
return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil
}
}
@ -213,7 +373,7 @@ func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (fileSe
return fileSeg{}, err
}
log.Printf("wrote %v", finalName)
return fileSeg{seg.Number, finalName, seg.Size}, nil
return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, nil
}
type LogSegmentJSON struct {

362
maintner/netsource_test.go Normal file
Просмотреть файл

@ -0,0 +1,362 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package maintner
import (
"context"
"fmt"
"reflect"
"testing"
)
func TestSumSegSize(t *testing.T) {
tests := []struct {
in []fileSeg
want int64
}{
{
in: []fileSeg{fileSeg{size: 1}},
want: 1,
},
{
in: []fileSeg{fileSeg{size: 1}, fileSeg{size: 100}},
want: 101,
},
{
in: nil,
want: 0,
},
}
for i, tt := range tests {
got := sumSegSize(tt.in)
if got != tt.want {
t.Errorf("%d. sumSegSize = %v; want %v", i, got, tt.want)
}
}
}
func TestSumCommonPrefixSize(t *testing.T) {
tests := []struct {
a, b []fileSeg
summer func(file string, n int64) string
want int64
}{
{
a: []fileSeg{fileSeg{size: 1, sha224: "abab"}},
b: []fileSeg{fileSeg{size: 1, sha224: "abab"}},
want: 1,
},
{
a: []fileSeg{fileSeg{size: 1, sha224: "abab"}},
b: []fileSeg{fileSeg{size: 1, sha224: "eeee"}},
want: 0,
},
{
a: []fileSeg{
fileSeg{size: 100, sha224: "abab"},
fileSeg{size: 100, sha224: "abab", file: "a.mutlog"},
},
b: []fileSeg{
fileSeg{size: 100, sha224: "abab"},
fileSeg{size: 50, sha224: "cccc"},
},
summer: func(file string, n int64) string {
if file == "a.mutlog" && n == 50 {
return "cccc"
}
return "xxx"
},
want: 150,
},
{
a: []fileSeg{
fileSeg{size: 100, sha224: "abab"},
fileSeg{size: 50, sha224: "cccc"},
},
b: []fileSeg{
fileSeg{size: 100, sha224: "abab"},
fileSeg{size: 100, sha224: "abab", file: "b.mutlog"},
},
summer: func(file string, n int64) string {
if file == "b.mutlog" && n == 50 {
return "cccc"
}
return "xxx"
},
want: 150,
},
}
for i, tt := range tests {
summer := tt.summer
if summer == nil {
summer = func(file string, n int64) string {
t.Errorf("%d. unexpected call to prefix summer for file=%q, n=%v", i, file, n)
return ""
}
}
ns := &netMutSource{
testHookFilePrefixSum224: summer,
}
got := ns.sumCommonPrefixSize(tt.a, tt.b)
if got != tt.want {
t.Errorf("%d. sumCommonPrefixSize = %v; want %v", i, got, tt.want)
}
}
}
func TestTrimLeadingSegBytes(t *testing.T) {
tests := []struct {
in []fileSeg
trim int64
want []fileSeg
}{
{
in: []fileSeg{fileSeg{size: 100}, fileSeg{size: 50}},
trim: 0,
want: []fileSeg{fileSeg{size: 100}, fileSeg{size: 50}},
},
{
in: []fileSeg{fileSeg{size: 100}, fileSeg{size: 50}},
trim: 150,
want: nil,
},
{
in: []fileSeg{fileSeg{size: 100}, fileSeg{size: 50}},
trim: 100,
want: []fileSeg{fileSeg{size: 50}},
},
{
in: []fileSeg{fileSeg{size: 100}, fileSeg{size: 50}},
trim: 25,
want: []fileSeg{fileSeg{size: 100, skip: 25}, fileSeg{size: 50}},
},
}
for i, tt := range tests {
copyIn := append([]fileSeg(nil), tt.in...)
got := trimLeadingSegBytes(tt.in, tt.trim)
if !reflect.DeepEqual(tt.in, copyIn) {
t.Fatalf("%d. trimLeadingSegBytes modified its input", i)
}
if !reflect.DeepEqual(got, tt.want) {
t.Fatalf("%d. trim = %+v; want %+v", i, got, tt.want)
}
}
}
func TestGetNewSegments(t *testing.T) {
type testCase struct {
name string
lastSegs []fileSeg
serverSegs [][]LogSegmentJSON
// prefixSum is the prefix sum to use if called.
// If empty, prefixSum calls are errors.
prefixSum string
wantWaits int
want []fileSeg
wantSplit bool
}
tests := []testCase{
{
name: "first_download",
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 100, SHA224: "abc"},
{Number: 2, Size: 200, SHA224: "def"},
},
},
want: []fileSeg{
{seg: 1, size: 100, sha224: "abc", file: "/fake/0001.mutlog"},
{seg: 2, size: 200, sha224: "def", file: "/fake/0002.mutlog"},
},
},
{
name: "incremental_download_growseg", // from first_download, segment 2 grows a bit
lastSegs: []fileSeg{
{seg: 1, size: 100, sha224: "abc", file: "/fake/0001.mutlog"},
{seg: 2, size: 200, sha224: "def", file: "/fake/0002.mutlog"},
},
prefixSum: "def",
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 100, SHA224: "abc"},
{Number: 2, Size: 205, SHA224: "defdef"},
},
},
want: []fileSeg{
{seg: 2, size: 205, sha224: "defdef", skip: 200, file: "/fake/0002.mutlog"},
},
},
{
name: "incremental_download_growseg_and_newseg", // from first_download, segment 2 grows, and segment 3 appears.
lastSegs: []fileSeg{
{seg: 1, size: 100, sha224: "abc", file: "/fake/0001.mutlog"},
{seg: 2, size: 200, sha224: "def", file: "/fake/0002.mutlog"},
},
prefixSum: "def",
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 100, SHA224: "abc"},
{Number: 2, Size: 250, SHA224: "defdef"},
{Number: 3, Size: 300, SHA224: "fff"},
},
},
want: []fileSeg{
{seg: 2, size: 250, sha224: "defdef", skip: 200, file: "/fake/0002.mutlog"},
{seg: 3, size: 300, sha224: "fff", skip: 0, file: "/fake/0003.mutlog"},
},
},
{
name: "incremental_download_newseg", // from first_download, segment 3 appears.
lastSegs: []fileSeg{
{seg: 1, size: 100, sha224: "abc", file: "/fake/0001.mutlog"},
{seg: 2, size: 200, sha224: "def", file: "/fake/0002.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 100, SHA224: "abc"},
{Number: 2, Size: 200, SHA224: "def"},
{Number: 3, Size: 300, SHA224: "fff"},
},
},
want: []fileSeg{
{seg: 3, size: 300, sha224: "fff", skip: 0, file: "/fake/0003.mutlog"},
},
},
{
name: "incremental_with_sleep",
lastSegs: []fileSeg{
{seg: 1, size: 101, sha224: "abc", file: "/fake/0001.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 101, SHA224: "abc"},
},
[]LogSegmentJSON{
{Number: 1, Size: 101, SHA224: "abc"},
{Number: 2, Size: 102, SHA224: "def"},
},
},
wantWaits: 1,
want: []fileSeg{
{seg: 2, size: 102, sha224: "def", skip: 0, file: "/fake/0002.mutlog"},
},
},
{
name: "split_error_diff_first_seg_same_size",
lastSegs: []fileSeg{
{seg: 1, size: 101, sha224: "abc", file: "/fake/0001.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 101, SHA224: "def"},
},
},
wantSplit: true,
},
{
name: "split_error_diff_first_seg_and_longer",
lastSegs: []fileSeg{
{seg: 1, size: 101, sha224: "abc", file: "/fake/0001.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 102, SHA224: "def"},
},
},
prefixSum: "ffffffffff", // no match
wantSplit: true,
},
{
name: "split_error_diff_first_seg_and_shorter",
lastSegs: []fileSeg{
{seg: 1, size: 101, sha224: "abc", file: "/fake/0001.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 50, SHA224: "def"},
},
},
prefixSum: "ffffffffff", // no match
wantSplit: true,
},
{
name: "split_error_diff_final_seg",
lastSegs: []fileSeg{
{seg: 1, size: 100, sha224: "abc", file: "/fake/0001.mutlog"},
{seg: 2, size: 2, sha224: "def", file: "/fake/0002.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 100, SHA224: "abc"},
{Number: 2, Size: 4, SHA224: "fff"},
},
},
prefixSum: "not_def",
wantSplit: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
serverSegCalls := 0
waits := 0
ns := &netMutSource{
last: tt.lastSegs,
testHookGetServerSegments: func(context.Context) (segs []LogSegmentJSON, err error) {
serverSegCalls++
if serverSegCalls > 10 {
t.Fatalf("infinite loop calling getServerSegments? num wait calls = %v", waits)
}
if len(tt.serverSegs) == 0 {
return nil, nil
}
segs = tt.serverSegs[0]
if len(tt.serverSegs) > 1 {
tt.serverSegs = tt.serverSegs[1:]
}
return segs, nil
},
testHookWaitForServerSegmentUpdate: func(context.Context) error {
waits++
return nil
},
testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, error) {
return fileSeg{
seg: seg.Number,
size: seg.Size,
sha224: seg.SHA224,
file: fmt.Sprintf("/fake/%04d.mutlog", seg.Number),
}, nil
},
testHookFilePrefixSum224: func(file string, n int64) string {
if tt.prefixSum != "" {
return tt.prefixSum
}
t.Errorf("unexpected call to filePrefixSum224(%q, %d)", file, n)
return "XXXX"
},
}
got, err := ns.getNewSegments(context.Background())
if tt.wantSplit && err == ErrSplit {
// Success.
return
}
if tt.wantSplit {
t.Fatalf("wanted ErrSplit; got %+v, %v", got, err)
}
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("mismatch\n got: %+v\nwant: %+v\n", got, tt.want)
}
if tt.wantWaits != waits {
t.Errorf("wait calls = %v; want %v", waits, tt.wantWaits)
}
})
}
}