// Copyright 2017 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package maintner import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "reflect" "regexp" "runtime" "slices" "sort" "strconv" "strings" "time" "github.com/google/go-github/v48/github" "github.com/gregjones/httpcache" "golang.org/x/build/maintner/maintpb" "golang.org/x/oauth2" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "google.golang.org/protobuf/types/known/timestamppb" ) // xFromCache is the synthetic response header added by the httpcache // package for responses fulfilled from cache due to a 304 from the server. const xFromCache = "X-From-Cache" // GitHubRepoID is a GitHub org & repo, lowercase. type GitHubRepoID struct { Owner, Repo string } func (id GitHubRepoID) String() string { return id.Owner + "/" + id.Repo } func (id GitHubRepoID) valid() bool { if id.Owner == "" || id.Repo == "" { // TODO: more validation. whatever GitHub requires. return false } return true } // GitHub holds data about a GitHub repo. type GitHub struct { c *Corpus users map[int64]*GitHubUser teams map[int64]*GitHubTeam repos map[GitHubRepoID]*GitHubRepo } // ForeachRepo calls fn serially for each GitHubRepo, stopping if fn // returns an error. The function is called with lexically increasing // repo IDs. func (g *GitHub) ForeachRepo(fn func(*GitHubRepo) error) error { var ids []GitHubRepoID for id := range g.repos { ids = append(ids, id) } sort.Slice(ids, func(i, j int) bool { if ids[i].Owner < ids[j].Owner { return true } return ids[i].Owner == ids[j].Owner && ids[i].Repo < ids[j].Repo }) for _, id := range ids { if err := fn(g.repos[id]); err != nil { return err } } return nil } // Repo returns the repo if it's known. Otherwise it returns nil. func (g *GitHub) Repo(owner, repo string) *GitHubRepo { return g.repos[GitHubRepoID{owner, repo}] } func (g *GitHub) getOrCreateRepo(owner, repo string) *GitHubRepo { if g == nil { panic("cannot call methods on nil GitHub") } id := GitHubRepoID{owner, repo} if !id.valid() { return nil } r, ok := g.repos[id] if ok { return r } r = &GitHubRepo{ github: g, id: id, issues: map[int32]*GitHubIssue{}, } g.repos[id] = r return r } type GitHubRepo struct { github *GitHub id GitHubRepoID issues map[int32]*GitHubIssue // num -> issue milestones map[int64]*GitHubMilestone labels map[int64]*GitHubLabel } func (gr *GitHubRepo) ID() GitHubRepoID { return gr.id } // Issue returns the provided issue number, or nil if it's not known. func (gr *GitHubRepo) Issue(n int32) *GitHubIssue { return gr.issues[n] } // ForeachLabel calls fn for each label in the repo, in unsorted order. // // Iteration ends if fn returns an error, with that error. func (gr *GitHubRepo) ForeachLabel(fn func(*GitHubLabel) error) error { for _, lb := range gr.labels { if err := fn(lb); err != nil { return err } } return nil } // ForeachMilestone calls fn for each milestone in the repo, in unsorted order. // // Iteration ends if fn returns an error, with that error. func (gr *GitHubRepo) ForeachMilestone(fn func(*GitHubMilestone) error) error { for _, m := range gr.milestones { if err := fn(m); err != nil { return err } } return nil } // ForeachIssue calls fn for each issue in the repo. // // If fn returns an error, iteration ends and ForeachIssue returns // with that error. // // The fn function is called serially, with increasingly numbered // issues. func (gr *GitHubRepo) ForeachIssue(fn func(*GitHubIssue) error) error { s := make([]*GitHubIssue, 0, len(gr.issues)) for _, gi := range gr.issues { s = append(s, gi) } sort.Slice(s, func(i, j int) bool { return s[i].Number < s[j].Number }) for _, gi := range s { if err := fn(gi); err != nil { return err } } return nil } // ForeachReview calls fn for each review event on the issue // // If the issue is not a PullRequest, then it returns early with no error. // // If fn returns an error, iteration ends and ForeachReview returns // with that error. // // The fn function is called serially, in chronological order. func (pr *GitHubIssue) ForeachReview(fn func(*GitHubReview) error) error { if !pr.PullRequest { return nil } s := make([]*GitHubReview, 0, len(pr.reviews)) for _, rv := range pr.reviews { s = append(s, rv) } sort.Slice(s, func(i, j int) bool { return s[i].Created.Before(s[j].Created) }) for _, rv := range s { if err := fn(rv); err != nil { return err } } return nil } func (g *GitHubRepo) getOrCreateMilestone(id int64) *GitHubMilestone { if id == 0 { panic("zero id") } m, ok := g.milestones[id] if ok { return m } if g.milestones == nil { g.milestones = map[int64]*GitHubMilestone{} } m = &GitHubMilestone{ID: id} g.milestones[id] = m return m } func (g *GitHubRepo) getOrCreateLabel(id int64) *GitHubLabel { if id == 0 { panic("zero id") } lb, ok := g.labels[id] if ok { return lb } if g.labels == nil { g.labels = map[int64]*GitHubLabel{} } lb = &GitHubLabel{ID: id} g.labels[id] = lb return lb } func (g *GitHubRepo) verbose() bool { return g.github != nil && g.github.c != nil && g.github.c.verbose } // GitHubUser represents a GitHub user. // It is a subset of https://developer.github.com/v3/users/#get-a-single-user type GitHubUser struct { ID int64 Login string } // GitHubTeam represents a GitHub team. // It is a subset of https://developer.github.com/v3/orgs/teams/#get-team type GitHubTeam struct { ID int64 // Slug is a URL-friendly representation of the team name. // It is unique across a GitHub organization. Slug string } // GitHubIssueRef is a reference to an issue (or pull request) number // in a repo. These are parsed from text making references such as // "golang/go#1234" or just "#1234" (with an implicit Repo). type GitHubIssueRef struct { Repo *GitHubRepo // must be non-nil Number int32 // GitHubIssue.Number } func (r GitHubIssueRef) String() string { return fmt.Sprintf("%s#%d", r.Repo.ID(), r.Number) } // GitHubIssue represents a GitHub issue. // This is maintner's in-memory representation. It differs slightly // from the API's *github.Issue type, notably in the lack of pointers // for all fields. // See https://developer.github.com/v3/issues/#get-a-single-issue type GitHubIssue struct { ID int64 Number int32 NotExist bool // if true, rest of fields should be ignored. Closed bool Locked bool PullRequest bool // if true, this issue is a Pull Request. All PRs are issues, but not all issues are PRs. User *GitHubUser Assignees []*GitHubUser Created time.Time Updated time.Time ClosedAt time.Time ClosedBy *GitHubUser // TODO(dmitshur): Implement (see golang.org/issue/28745). Title string Body string Milestone *GitHubMilestone // nil for unknown, noMilestone for none Labels map[int64]*GitHubLabel // label ID => label commentsUpdatedTil time.Time // max comment modtime seen commentsSyncedAsOf time.Time // as of server's Date header comments map[int64]*GitHubComment // by comment.ID eventMaxTime time.Time // latest time of any event in events map eventsSyncedAsOf time.Time // as of server's Date header reviewsSyncedAsOf time.Time // as of server's Date header events map[int64]*GitHubIssueEvent // by event.ID reviews map[int64]*GitHubReview // by event.ID } // LastModified reports the most recent time that any known metadata was updated. // In contrast to the Updated field, LastModified includes comments and events. // // TODO(bradfitz): this seems to not be working, at least events // aren't updating it. Investigate. func (gi *GitHubIssue) LastModified() time.Time { ret := gi.Updated if gi.commentsUpdatedTil.After(ret) { ret = gi.commentsUpdatedTil } if gi.eventMaxTime.After(ret) { ret = gi.eventMaxTime } return ret } // HasEvent reports whether there's any GitHubIssueEvent in this // issue's history of the given type. func (gi *GitHubIssue) HasEvent(eventType string) bool { for _, e := range gi.events { if e.Type == eventType { return true } } return false } // ForeachEvent calls fn for each event on the issue. // // If fn returns an error, iteration ends and ForeachEvent returns // with that error. // // The fn function is called serially, in order of the event's time. func (gi *GitHubIssue) ForeachEvent(fn func(*GitHubIssueEvent) error) error { // TODO: keep these sorted in the corpus s := make([]*GitHubIssueEvent, 0, len(gi.events)) for _, e := range gi.events { s = append(s, e) } sort.Slice(s, func(i, j int) bool { ci, cj := s[i].Created, s[j].Created if ci.Before(cj) { return true } return ci.Equal(cj) && s[i].ID < s[j].ID }) for _, e := range s { if err := fn(e); err != nil { return err } } return nil } // ForeachComment calls fn for each event on the issue. // // If fn returns an error, iteration ends and ForeachComment returns // with that error. // // The fn function is called serially, in order of the comment's time. func (gi *GitHubIssue) ForeachComment(fn func(*GitHubComment) error) error { // TODO: keep these sorted in the corpus s := make([]*GitHubComment, 0, len(gi.comments)) for _, e := range gi.comments { s = append(s, e) } sort.Slice(s, func(i, j int) bool { ci, cj := s[i].Created, s[j].Created if ci.Before(cj) { return true } return ci.Equal(cj) && s[i].ID < s[j].ID }) for _, e := range s { if err := fn(e); err != nil { return err } } return nil } // HasLabel reports whether the issue is labeled with the given label. func (gi *GitHubIssue) HasLabel(label string) bool { for _, lb := range gi.Labels { if lb.Name == label { return true } } return false } // HasLabelID returns whether the issue has a label with the given ID. func (gi *GitHubIssue) HasLabelID(id int64) bool { _, ok := gi.Labels[id] return ok } func (gi *GitHubIssue) getCreatedAt() time.Time { if gi == nil { return time.Time{} } return gi.Created } func (gi *GitHubIssue) getUpdatedAt() time.Time { if gi == nil { return time.Time{} } return gi.Updated } func (gi *GitHubIssue) getClosedAt() time.Time { if gi == nil { return time.Time{} } return gi.ClosedAt } // noMilestone is a sentinel value to explicitly mean no milestone. var noMilestone = new(GitHubMilestone) type GitHubLabel struct { ID int64 Name string // TODO: color? } // GenMutationDiff generates a diff from in-memory state 'a' (which // may be nil) to the current (non-nil) state b from GitHub. It // returns nil if there's no difference. func (a *GitHubLabel) GenMutationDiff(b *github.Label) *maintpb.GithubLabel { id := int64(b.GetID()) if a != nil && a.ID == id && a.Name == b.GetName() { // No change. return nil } return &maintpb.GithubLabel{Id: id, Name: b.GetName()} } func (lb *GitHubLabel) processMutation(mut maintpb.GithubLabel) { if lb.ID == 0 { panic("bogus label ID 0") } if lb.ID != mut.Id { panic(fmt.Sprintf("label ID = %v != mutation ID = %v", lb.ID, mut.Id)) } if mut.Name != "" { lb.Name = mut.Name } } type GitHubMilestone struct { ID int64 Title string Number int32 Closed bool } // IsNone reports whether ms represents the sentinel "no milestone" milestone. func (ms *GitHubMilestone) IsNone() bool { return ms == noMilestone } // IsUnknown reports whether ms is nil, which represents the unknown // state. Milestones should never be in this state, though. func (ms *GitHubMilestone) IsUnknown() bool { return ms == nil } // emptyMilestone is a non-nil *githubMilestone with zero values for // all fields. var emptyMilestone = new(GitHubMilestone) // GenMutationDiff generates a diff from in-memory state 'a' (which // may be nil) to the current (non-nil) state b from GitHub. It // returns nil if there's no difference. func (a *GitHubMilestone) GenMutationDiff(b *github.Milestone) *maintpb.GithubMilestone { var ret *maintpb.GithubMilestone // lazily inited by diff diff := func() *maintpb.GithubMilestone { if ret == nil { ret = &maintpb.GithubMilestone{Id: int64(b.GetID())} } return ret } if a == nil { a = emptyMilestone } if a.Title != b.GetTitle() { diff().Title = b.GetTitle() } if a.Number != int32(b.GetNumber()) { diff().Number = int64(b.GetNumber()) } if closed := b.GetState() == "closed"; a.Closed != closed { diff().Closed = &maintpb.BoolChange{Val: closed} } return ret } func (ms *GitHubMilestone) processMutation(mut maintpb.GithubMilestone) { if ms.ID == 0 { panic("bogus milestone ID 0") } if ms.ID != mut.Id { panic(fmt.Sprintf("milestone ID = %v != mutation ID = %v", ms.ID, mut.Id)) } if mut.Title != "" { ms.Title = mut.Title } if mut.Number != 0 { ms.Number = int32(mut.Number) } if mut.Closed != nil { ms.Closed = mut.Closed.Val } } // GitHubReview represents a review on a Pull Request. // For more details, see https://developer.github.com/v3/pulls/reviews/ type GitHubReview struct { ID int64 Actor *GitHubUser Body string State string // COMMENTED, APPROVED, CHANGES_REQUESTED CommitID string ActorAssociation string // CONTRIBUTOR Created time.Time OtherJSON string } // Proto converts GitHubReview to a protobuf func (e *GitHubReview) Proto() *maintpb.GithubReview { p := &maintpb.GithubReview{ Id: e.ID, Body: e.Body, State: e.State, CommitId: e.CommitID, ActorAssociation: e.ActorAssociation, } if e.OtherJSON != "" { p.OtherJson = []byte(e.OtherJSON) } if !e.Created.IsZero() { p.Created = timestamppb.New(e.Created) } if e.Actor != nil { p.ActorId = e.Actor.ID } return p } // r.github.c.mu must be held. func (r *GitHubRepo) newGithubReview(p *maintpb.GithubReview) *GitHubReview { g := r.github e := &GitHubReview{ ID: p.Id, Actor: g.getOrCreateUserID(p.ActorId), ActorAssociation: p.ActorAssociation, CommitID: p.CommitId, Body: p.Body, State: p.State, } if p.Created != nil { e.Created = p.Created.AsTime() } if len(p.OtherJson) > 0 { // TODO: parse it and see if we've since learned how // to deal with it? if r.verbose() { log.Printf("newGithubReview: unknown JSON in log: %s", p.OtherJson) } e.OtherJSON = string(p.OtherJson) } return e } type GitHubComment struct { ID int64 User *GitHubUser Created time.Time Updated time.Time Body string } // GitHubDismissedReview is the contents of a dismissed review event. For more // details, see https://developer.github.com/v3/issues/events/. type GitHubDismissedReviewEvent struct { ReviewID int64 State string // commented, approved, changes_requested DismissalMessage string } type GitHubIssueEvent struct { // TODO: this struct is a little wide. change it to an interface // instead? Maybe later, if memory profiling suggests it would help. // ID is the ID of the event. ID int64 // Type is one of: // * labeled, unlabeled // * milestoned, demilestoned // * assigned, unassigned // * locked, unlocked // * closed // * referenced // * renamed // * reopened // * comment_deleted // * head_ref_restored // * base_ref_changed // * subscribed // * mentioned // * review_requested, review_request_removed, review_dismissed Type string // OtherJSON optionally contains a JSON object of GitHub's API // response for any fields maintner was unable to extract at // the time. It is empty if maintner supported all the fields // when the mutation was created. OtherJSON string Created time.Time Actor *GitHubUser Label string // for type: "unlabeled", "labeled" Assignee *GitHubUser // for type: "assigned", "unassigned" Assigner *GitHubUser // for type: "assigned", "unassigned" Milestone string // for type: "milestoned", "demilestoned" From, To string // for type: "renamed" CommitID, CommitURL string // for type: "closed", "referenced" ... ? Reviewer *GitHubUser TeamReviewer *GitHubTeam ReviewRequester *GitHubUser DismissedReview *GitHubDismissedReviewEvent } func (e *GitHubIssueEvent) Proto() *maintpb.GithubIssueEvent { p := &maintpb.GithubIssueEvent{ Id: e.ID, EventType: e.Type, RenameFrom: e.From, RenameTo: e.To, } if e.OtherJSON != "" { p.OtherJson = []byte(e.OtherJSON) } if !e.Created.IsZero() { p.Created = timestamppb.New(e.Created) } if e.Actor != nil { p.ActorId = e.Actor.ID } if e.Assignee != nil { p.AssigneeId = e.Assignee.ID } if e.Assigner != nil { p.AssignerId = e.Assigner.ID } if e.Label != "" { p.Label = &maintpb.GithubLabel{Name: e.Label} } if e.Milestone != "" { p.Milestone = &maintpb.GithubMilestone{Title: e.Milestone} } if e.CommitID != "" { c := &maintpb.GithubCommit{CommitId: e.CommitID} if m := rxGithubCommitURL.FindStringSubmatch(e.CommitURL); m != nil { c.Owner = m[1] c.Repo = m[2] } p.Commit = c } if e.Reviewer != nil { p.ReviewerId = e.Reviewer.ID } if e.TeamReviewer != nil { p.TeamReviewer = &maintpb.GithubTeam{ Id: e.TeamReviewer.ID, Slug: e.TeamReviewer.Slug, } } if e.ReviewRequester != nil { p.ReviewRequesterId = e.ReviewRequester.ID } if e.DismissedReview != nil { p.DismissedReview = &maintpb.GithubDismissedReviewEvent{ ReviewId: e.DismissedReview.ReviewID, State: e.DismissedReview.State, DismissalMessage: e.DismissedReview.DismissalMessage, } } return p } var rxGithubCommitURL = regexp.MustCompile(`^https://api\.github\.com/repos/([^/]+)/([^/]+)/commits/`) // r.github.c.mu must be held. func (r *GitHubRepo) newGithubEvent(p *maintpb.GithubIssueEvent) *GitHubIssueEvent { g := r.github e := &GitHubIssueEvent{ ID: p.Id, Type: p.EventType, Actor: g.getOrCreateUserID(p.ActorId), Assignee: g.getOrCreateUserID(p.AssigneeId), Assigner: g.getOrCreateUserID(p.AssignerId), Reviewer: g.getOrCreateUserID(p.ReviewerId), TeamReviewer: g.getTeam(p.TeamReviewer), ReviewRequester: g.getOrCreateUserID(p.ReviewRequesterId), From: p.RenameFrom, To: p.RenameTo, } if p.Created != nil { e.Created = p.Created.AsTime() } if len(p.OtherJson) > 0 { // TODO: parse it and see if we've since learned how // to deal with it? if r.verbose() { log.Printf("newGithubEvent: unknown JSON in log: %s", p.OtherJson) } e.OtherJSON = string(p.OtherJson) } if p.Label != nil { e.Label = g.c.str(p.Label.Name) } if p.Milestone != nil { e.Milestone = g.c.str(p.Milestone.Title) } if c := p.Commit; c != nil { e.CommitID = c.CommitId if c.Owner != "" && c.Repo != "" { // TODO: this field is dumb. break it down. e.CommitURL = "https://api.github.com/repos/" + c.Owner + "/" + c.Repo + "/commits/" + c.CommitId } } if d := p.DismissedReview; d != nil { e.DismissedReview = &GitHubDismissedReviewEvent{ ReviewID: d.ReviewId, State: d.State, DismissalMessage: d.DismissalMessage, } } return e } // (requires corpus be locked for reads) func (gi *GitHubIssue) commentsSynced() bool { if gi.NotExist { // Issue doesn't exist, so can't sync its non-issues, // so consider it done. return true } return gi.commentsSyncedAsOf.After(gi.Updated) } // (requires corpus be locked for reads) func (gi *GitHubIssue) eventsSynced() bool { if gi.NotExist { // Issue doesn't exist, so can't sync its non-issues, // so consider it done. return true } return gi.eventsSyncedAsOf.After(gi.Updated) } // (requires corpus be locked for reads) func (gi *GitHubIssue) reviewsSynced() bool { if gi.NotExist { // Issue doesn't exist, so can't sync its non-issues, // so consider it done. return true } return gi.reviewsSyncedAsOf.After(gi.Updated) } func (c *Corpus) initGithub() { if c.github != nil { return } c.github = &GitHub{ c: c, repos: map[GitHubRepoID]*GitHubRepo{}, } } // SetGitHubLimiter sets a limiter that controls the rate of requests made // to GitHub APIs. If nil, requests are not limited. Only valid in leader mode. // The limiter must only be set before Sync or SyncLoop is called. func (c *Corpus) SetGitHubLimiter(l *rate.Limiter) { c.githubLimiter = l } // TrackGitHub registers the named GitHub repo as a repo to // watch and append to the mutation log. Only valid in leader mode. // The token is the auth token to use to make API calls. func (c *Corpus) TrackGitHub(owner, repo, token string) { if c.mutationLogger == nil { panic("can't TrackGitHub in non-leader mode") } c.mu.Lock() defer c.mu.Unlock() c.initGithub() gr := c.github.getOrCreateRepo(owner, repo) if gr == nil { log.Fatalf("invalid github owner/repo %q/%q", owner, repo) } c.watchedGithubRepos = append(c.watchedGithubRepos, watchedGithubRepo{ gr: gr, token: token, }) } type watchedGithubRepo struct { gr *GitHubRepo token string } // g.c.mu must be held func (g *GitHub) getUser(pu *maintpb.GithubUser) *GitHubUser { if pu == nil { return nil } if u := g.users[pu.Id]; u != nil { if pu.Login != "" && pu.Login != u.Login { u.Login = pu.Login } return u } if g.users == nil { g.users = make(map[int64]*GitHubUser) } u := &GitHubUser{ ID: pu.Id, Login: pu.Login, } g.users[pu.Id] = u return u } func (g *GitHub) getOrCreateUserID(id int64) *GitHubUser { if id == 0 { return nil } if u := g.users[id]; u != nil { return u } if g.users == nil { g.users = make(map[int64]*GitHubUser) } u := &GitHubUser{ID: id} g.users[id] = u return u } // g.c.mu must be held func (g *GitHub) getTeam(pt *maintpb.GithubTeam) *GitHubTeam { if pt == nil { return nil } if g.teams == nil { g.teams = make(map[int64]*GitHubTeam) } t := g.teams[pt.Id] if t == nil { t = &GitHubTeam{ ID: pt.Id, } g.teams[pt.Id] = t } if pt.Slug != "" { t.Slug = pt.Slug } return t } // newGithubUserProto creates a GithubUser with the minimum diff between // existing and g. The return value is nil if there were no changes. existing // may also be nil. func newGithubUserProto(existing *GitHubUser, g *github.User) *maintpb.GithubUser { if g == nil { return nil } id := int64(g.GetID()) if existing == nil { return &maintpb.GithubUser{ Id: id, Login: g.GetLogin(), } } hasChanges := false u := &maintpb.GithubUser{Id: id} if login := g.GetLogin(); existing.Login != login { u.Login = login hasChanges = true } // Add more fields here if hasChanges { return u } return nil } // deletedAssignees returns an array of user ID's that are present in existing // but not present in new. func deletedAssignees(existing []*GitHubUser, new []*github.User) []int64 { mp := make(map[int64]bool, len(existing)) for _, u := range new { id := int64(u.GetID()) mp[id] = true } toDelete := []int64{} for _, u := range existing { if _, ok := mp[u.ID]; !ok { toDelete = append(toDelete, u.ID) } } return toDelete } // newAssignees returns an array of diffs between existing and new. New users in // new will be present in the returned array in their entirety. Modified users // will appear containing only the ID field and changed fields. Unmodified users // will not appear in the returned array. func newAssignees(existing []*GitHubUser, new []*github.User) []*maintpb.GithubUser { mp := make(map[int64]*GitHubUser, len(existing)) for _, u := range existing { mp[u.ID] = u } changes := []*maintpb.GithubUser{} for _, u := range new { if existingUser, ok := mp[int64(u.GetID())]; ok { diffUser := &maintpb.GithubUser{ Id: int64(u.GetID()), } hasDiff := false if login := u.GetLogin(); existingUser.Login != login { diffUser.Login = login hasDiff = true } // check more User fields for diffs here, as we add them to the proto if hasDiff { changes = append(changes, diffUser) } } else { changes = append(changes, &maintpb.GithubUser{ Id: int64(u.GetID()), Login: u.GetLogin(), }) } } return changes } // setAssigneesFromProto returns a new array of assignees according to the // instructions in new (adds or modifies users in existing), and toDelete // (deletes them). c.mu must be held. func (g *GitHub) setAssigneesFromProto(existing []*GitHubUser, new []*maintpb.GithubUser, toDelete []int64) []*GitHubUser { c := g.c mp := make(map[int64]*GitHubUser) for _, u := range existing { mp[u.ID] = u } for _, u := range new { if existingUser, ok := mp[u.Id]; ok { if u.Login != "" { existingUser.Login = u.Login } // TODO: add other fields here when we add them for user. } else { c.debugf("adding assignee %q", u.Login) existing = append(existing, g.getUser(u)) } } // this is quadratic but the number of assignees is very unlikely to exceed, // say, 5. existing = slices.DeleteFunc(existing, func(u *GitHubUser) bool { return slices.Contains(toDelete, u.ID) }) return existing } // githubIssueDiffer generates a minimal diff (protobuf mutation) to // get a GitHub Issue from its in-memory state 'a' to the current // GitHub API state 'b'. type githubIssueDiffer struct { gr *GitHubRepo a *GitHubIssue // may be nil if no current state b *github.Issue // may NOT be nil } // returns nil if no changes. func (d githubIssueDiffer) Diff() *maintpb.GithubIssueMutation { var changed bool m := &maintpb.GithubIssueMutation{ Owner: d.gr.id.Owner, Repo: d.gr.id.Repo, Number: int32(d.b.GetNumber()), PullRequest: d.b.IsPullRequest(), } for _, f := range issueDiffMethods { if f(d, m) { if d.gr.verbose() { fname := strings.TrimPrefix(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), "golang.org/x/build/maintner.githubIssueDiffer.") log.Printf("Issue %d changed: %v", d.b.GetNumber(), fname) } changed = true } } if !changed { return nil } return m } // issueDiffMethods are the different steps githubIssueDiffer.Diff // goes through to compute a diff. The methods should return true if // any change was made. The order is irrelevant unless otherwise // documented in comments in the list below. var issueDiffMethods = []func(githubIssueDiffer, *maintpb.GithubIssueMutation) bool{ githubIssueDiffer.diffCreatedAt, githubIssueDiffer.diffUpdatedAt, githubIssueDiffer.diffUser, githubIssueDiffer.diffBody, githubIssueDiffer.diffTitle, githubIssueDiffer.diffMilestone, githubIssueDiffer.diffAssignees, githubIssueDiffer.diffClosedState, githubIssueDiffer.diffClosedAt, githubIssueDiffer.diffClosedBy, githubIssueDiffer.diffLockedState, githubIssueDiffer.diffLabels, } func (d githubIssueDiffer) diffCreatedAt(m *maintpb.GithubIssueMutation) bool { return d.diffTimeField(&m.Created, d.a.getCreatedAt(), d.b.GetCreatedAt()) } func (d githubIssueDiffer) diffUpdatedAt(m *maintpb.GithubIssueMutation) bool { return d.diffTimeField(&m.Updated, d.a.getUpdatedAt(), d.b.GetUpdatedAt()) } func (d githubIssueDiffer) diffClosedAt(m *maintpb.GithubIssueMutation) bool { return d.diffTimeField(&m.ClosedAt, d.a.getClosedAt(), d.b.GetClosedAt()) } func (d githubIssueDiffer) diffTimeField(dst **timestamppb.Timestamp, memTime, githubTime time.Time) bool { if githubTime.IsZero() || memTime.Equal(githubTime) { return false } *dst = timestamppb.New(githubTime) return true } func (d githubIssueDiffer) diffUser(m *maintpb.GithubIssueMutation) bool { var existing *GitHubUser if d.a != nil { existing = d.a.User } m.User = newGithubUserProto(existing, d.b.User) return m.User != nil } func (d githubIssueDiffer) diffClosedBy(m *maintpb.GithubIssueMutation) bool { var existing *GitHubUser if d.a != nil { existing = d.a.ClosedBy } m.ClosedBy = newGithubUserProto(existing, d.b.ClosedBy) return m.ClosedBy != nil } func (d githubIssueDiffer) diffBody(m *maintpb.GithubIssueMutation) bool { if d.a != nil && d.a.Body == d.b.GetBody() { return false } m.BodyChange = &maintpb.StringChange{Val: d.b.GetBody()} return true } func (d githubIssueDiffer) diffTitle(m *maintpb.GithubIssueMutation) bool { if d.a != nil && d.a.Title == d.b.GetTitle() { return false } m.Title = d.b.GetTitle() // TODO: emit a StringChange if we ever have a problem that we // legitimately need real issues with no titles reflected in // maintner's model. For now just ignore such changes, if // GitHub even permits the. return m.Title != "" } func (d githubIssueDiffer) diffMilestone(m *maintpb.GithubIssueMutation) bool { if d.a != nil && d.a.Milestone != nil { ma, mb := d.a.Milestone, d.b.Milestone if ma == noMilestone && d.b.Milestone == nil { // Unchanged. Still no milestone. return false } if mb != nil && ma.ID == int64(mb.GetID()) { // Unchanged. Same milestone. // TODO: detect milestone renames and emit mutation for that? return false } } if mb := d.b.Milestone; mb != nil { m.MilestoneId = int64(mb.GetID()) m.MilestoneNum = int64(mb.GetNumber()) m.MilestoneTitle = mb.GetTitle() } else { m.NoMilestone = true } return true } func (d githubIssueDiffer) diffAssignees(m *maintpb.GithubIssueMutation) bool { if d.a == nil { m.Assignees = newAssignees(nil, d.b.Assignees) return true } m.Assignees = newAssignees(d.a.Assignees, d.b.Assignees) m.DeletedAssignees = deletedAssignees(d.a.Assignees, d.b.Assignees) return len(m.Assignees) > 0 || len(m.DeletedAssignees) > 0 } func (d githubIssueDiffer) diffLabels(m *maintpb.GithubIssueMutation) bool { // Common case: no changes. Return false quickly without allocations. if d.a != nil && len(d.a.Labels) == len(d.b.Labels) { missing := false for _, gl := range d.b.Labels { if _, ok := d.a.Labels[int64(gl.GetID())]; !ok { missing = true break } } if !missing { return false } } toAdd := map[int64]*maintpb.GithubLabel{} for _, gl := range d.b.Labels { id := int64(gl.GetID()) if id == 0 { panic("zero label ID") } toAdd[id] = &maintpb.GithubLabel{Id: id, Name: gl.GetName()} } var toDelete []int64 if d.a != nil { for id := range d.a.Labels { if _, ok := toAdd[id]; ok { // Already had it. delete(toAdd, id) } else { // We had it, but no longer. toDelete = append(toDelete, id) } } } m.RemoveLabel = toDelete for _, labpb := range toAdd { m.AddLabel = append(m.AddLabel, labpb) } return len(m.RemoveLabel) > 0 || len(m.AddLabel) > 0 } func (d githubIssueDiffer) diffClosedState(m *maintpb.GithubIssueMutation) bool { bclosed := d.b.GetState() == "closed" if d.a != nil && d.a.Closed == bclosed { return false } m.Closed = &maintpb.BoolChange{Val: bclosed} return true } func (d githubIssueDiffer) diffLockedState(m *maintpb.GithubIssueMutation) bool { if d.a != nil && d.a.Locked == d.b.GetLocked() { return false } if d.a == nil && !d.b.GetLocked() { return false } m.Locked = &maintpb.BoolChange{Val: d.b.GetLocked()} return true } // newMutationFromIssue generates a GithubIssueMutation using the // smallest possible diff between a (the state we have in memory in // the corpus) and b (the current GitHub API state). // // If newMutationFromIssue returns nil, the provided github.Issue is no newer // than the data we have in the corpus. 'a' may be nil. func (r *GitHubRepo) newMutationFromIssue(a *GitHubIssue, b *github.Issue) *maintpb.Mutation { if b == nil || b.Number == nil { panic(fmt.Sprintf("github issue with nil number: %#v", b)) } gim := githubIssueDiffer{gr: r, a: a, b: b}.Diff() if gim == nil { // No changes. return nil } return &maintpb.Mutation{GithubIssue: gim} } func (r *GitHubRepo) missingIssues() []int32 { c := r.github.c c.mu.RLock() defer c.mu.RUnlock() var maxNum int32 for num := range r.issues { if num > maxNum { maxNum = num } } var missing []int32 for num := int32(1); num < maxNum; num++ { if _, ok := r.issues[num]; !ok { missing = append(missing, num) } } return missing } // processGithubMutation updates the corpus with the information in m. func (c *Corpus) processGithubMutation(m *maintpb.GithubMutation) { if c == nil { panic("nil corpus") } c.initGithub() gr := c.github.getOrCreateRepo(m.Owner, m.Repo) if gr == nil { log.Printf("bogus Owner/Repo %q/%q in mutation: %v", m.Owner, m.Repo, m) return } for _, lp := range m.Labels { lb := gr.getOrCreateLabel(lp.Id) lb.processMutation(*lp) } for _, mp := range m.Milestones { ms := gr.getOrCreateMilestone(mp.Id) ms.processMutation(*mp) } } // processGithubIssueMutation updates the corpus with the information in m. func (c *Corpus) processGithubIssueMutation(m *maintpb.GithubIssueMutation) { if c == nil { panic("nil corpus") } c.initGithub() gr := c.github.getOrCreateRepo(m.Owner, m.Repo) if gr == nil { log.Printf("bogus Owner/Repo %q/%q in mutation: %v", m.Owner, m.Repo, m) return } if m.Number == 0 { log.Printf("bogus zero Number in mutation: %v", m) return } gi, ok := gr.issues[m.Number] if !ok { gi = &GitHubIssue{ // User added below Number: m.Number, ID: m.Id, } if gr.issues == nil { gr.issues = make(map[int32]*GitHubIssue) } gr.issues[m.Number] = gi if m.NotExist { gi.NotExist = true return } gi.Created = m.Created.AsTime() } if m.NotExist != gi.NotExist { gi.NotExist = m.NotExist } if gi.NotExist { return } // Check Updated before all other fields so they don't update if this // Mutation is stale // (ignoring Created since it *should* never update) if m.Updated != nil { gi.Updated = m.Updated.AsTime() } if m.ClosedAt != nil { gi.ClosedAt = m.ClosedAt.AsTime() } if m.User != nil { gi.User = c.github.getUser(m.User) } if m.NoMilestone { gi.Milestone = noMilestone } else if m.MilestoneId != 0 { ms := gr.getOrCreateMilestone(m.MilestoneId) ms.processMutation(maintpb.GithubMilestone{ Id: m.MilestoneId, Title: m.MilestoneTitle, Number: m.MilestoneNum, }) gi.Milestone = ms } if m.ClosedBy != nil { gi.ClosedBy = c.github.getUser(m.ClosedBy) } if b := m.Closed; b != nil { gi.Closed = b.Val } if b := m.Locked; b != nil { gi.Locked = b.Val } if m.PullRequest { gi.PullRequest = true } gi.Assignees = c.github.setAssigneesFromProto(gi.Assignees, m.Assignees, m.DeletedAssignees) if m.Body != "" { gi.Body = m.Body } if m.BodyChange != nil { gi.Body = m.BodyChange.Val } if m.Title != "" { gi.Title = m.Title } if len(m.RemoveLabel) > 0 || len(m.AddLabel) > 0 { if gi.Labels == nil { gi.Labels = make(map[int64]*GitHubLabel) } for _, lid := range m.RemoveLabel { delete(gi.Labels, lid) } for _, lp := range m.AddLabel { lb := gr.getOrCreateLabel(lp.Id) lb.processMutation(*lp) gi.Labels[lp.Id] = lb } } for _, cmut := range m.Comment { if cmut.Id == 0 { log.Printf("Ignoring bogus comment mutation lacking Id: %v", cmut) continue } gc, ok := gi.comments[cmut.Id] if !ok { if gi.comments == nil { gi.comments = make(map[int64]*GitHubComment) } gc = &GitHubComment{ID: cmut.Id} gi.comments[gc.ID] = gc } if cmut.User != nil { gc.User = c.github.getUser(cmut.User) } if cmut.Created != nil { gc.Created = cmut.Created.AsTime().UTC() } if cmut.Updated != nil { gc.Updated = cmut.Updated.AsTime().UTC() } if cmut.Body != "" { gc.Body = cmut.Body } } if m.CommentStatus != nil && m.CommentStatus.ServerDate != nil { gi.commentsSyncedAsOf = m.CommentStatus.ServerDate.AsTime().UTC() } for _, emut := range m.Event { if emut.Id == 0 { log.Printf("Ignoring bogus event mutation lacking Id: %v", emut) continue } if gi.events == nil { gi.events = make(map[int64]*GitHubIssueEvent) } gie := gr.newGithubEvent(emut) gi.events[emut.Id] = gie if gie.Created.After(gi.eventMaxTime) { gi.eventMaxTime = gie.Created } } if m.EventStatus != nil && m.EventStatus.ServerDate != nil { gi.eventsSyncedAsOf = m.EventStatus.ServerDate.AsTime().UTC() } for _, rmut := range m.Review { if rmut.Id == 0 { log.Printf("Ignoring bogus review mutation lacking Id: %v", rmut) continue } if gi.reviews == nil { gi.reviews = make(map[int64]*GitHubReview) } gre := gr.newGithubReview(rmut) gi.reviews[rmut.Id] = gre if gre.Created.After(gi.eventMaxTime) { gi.eventMaxTime = gre.Created } } if m.ReviewStatus != nil && m.ReviewStatus.ServerDate != nil { gi.reviewsSyncedAsOf = m.ReviewStatus.ServerDate.AsTime().UTC() } } // githubCache is an httpcache.Cache wrapper that only // stores responses for: // - https://api.github.com/repos/$OWNER/$REPO/issues?direction=desc&page=1&sort=updated // - https://api.github.com/repos/$OWNER/$REPO/milestones?page=1 // - https://api.github.com/repos/$OWNER/$REPO/labels?page=1 type githubCache struct { httpcache.Cache } var rxGithubCacheURLs = regexp.MustCompile(`^https://api.github.com/repos/\w+/\w+/(issues|milestones|labels)\?(.+)`) func cacheableURL(urlStr string) bool { m := rxGithubCacheURLs.FindStringSubmatch(urlStr) if m == nil { return false } v, _ := url.ParseQuery(m[2]) if v.Get("page") != "1" { return false } switch m[1] { case "issues": return v.Get("sort") == "updated" && v.Get("direction") == "desc" case "milestones", "labels": return true default: panic("unexpected cache key base " + m[1]) } } func (c *githubCache) Set(urlKey string, res []byte) { // TODO: verify that the httpcache package guarantees that the // first string parameter to Set here is actually a // URL. Empirically they appear to be. if cacheableURL(urlKey) { c.Cache.Set(urlKey, res) } } // sync checks for new changes on a single GitHub repository and // updates the Corpus with any changes. If loop is true, it runs // forever. func (gr *GitHubRepo) sync(ctx context.Context, token string, loop bool) error { ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token}) hc := oauth2.NewClient(ctx, ts) if tr, ok := hc.Transport.(*http.Transport); ok { defer tr.CloseIdleConnections() } directTransport := hc.Transport if gr.github.c.githubLimiter != nil { directTransport = limitTransport{gr.github.c.githubLimiter, hc.Transport} } cachingTransport := &httpcache.Transport{ Transport: directTransport, Cache: &githubCache{Cache: httpcache.NewMemoryCache()}, MarkCachedResponses: true, // adds "X-From-Cache: 1" response header. } p := &githubRepoPoller{ c: gr.github.c, token: token, gr: gr, githubDirect: github.NewClient(&http.Client{Transport: directTransport}), githubCaching: github.NewClient(&http.Client{Transport: cachingTransport}), client: http.DefaultClient, } activityCh := gr.github.c.activityChan("github:" + gr.id.String()) var expectChanges bool // got webhook update, but haven't seen new data yet var sleepDelay time.Duration for { prevLastUpdate := p.lastUpdate err := p.sync(ctx, expectChanges) if err == context.Canceled || !loop { return err } sawChanges := !p.lastUpdate.Equal(prevLastUpdate) if sawChanges { expectChanges = false } // If we got woken up by a webhook, sometimes // immediately polling GitHub for the data results in // a cache hit saying nothing's changed. Don't believe // it. Polling quickly with exponential backoff until // we see what we're expecting. if expectChanges { if sleepDelay == 0 { sleepDelay = 1 * time.Second } else { sleepDelay *= 2 if sleepDelay > 15*time.Minute { sleepDelay = 15 * time.Minute } } p.logf("expect changes; re-polling in %v", sleepDelay) } else { sleepDelay = 15 * time.Minute } p.logf("sync = %v; sleeping", err) timer := time.NewTimer(sleepDelay) select { case <-ctx.Done(): timer.Stop() return ctx.Err() case <-activityCh: timer.Stop() expectChanges = true sleepDelay = 0 case <-timer.C: } } } type httpClient interface { Do(req *http.Request) (*http.Response, error) } // A githubRepoPoller updates the Corpus (gr.c) to have the latest // version of the GitHub repo rp, using the GitHub client ghc. type githubRepoPoller struct { c *Corpus // shortcut for gr.github.c gr *GitHubRepo token string lastUpdate time.Time // modified by sync githubCaching *github.Client githubDirect *github.Client // not caching client httpClient // the client used to poll github } func (p *githubRepoPoller) Owner() string { return p.gr.id.Owner } func (p *githubRepoPoller) Repo() string { return p.gr.id.Repo } func (p *githubRepoPoller) logf(format string, args ...interface{}) { log.Printf("sync github "+p.gr.id.String()+": "+format, args...) } func (p *githubRepoPoller) sync(ctx context.Context, expectChanges bool) error { p.logf("Beginning sync.") if err := p.syncIssues(ctx, expectChanges); err != nil { return err } if err := p.syncComments(ctx); err != nil { return err } if err := p.syncEvents(ctx); err != nil { return err } if err := p.syncReviews(ctx); err != nil { return err } return nil } func (p *githubRepoPoller) syncMilestones(ctx context.Context) error { var mut *maintpb.GithubMutation // lazy init var changes int err := p.foreachItem(ctx, 1, p.getMilestonePage, func(e interface{}) error { ms := e.(*github.Milestone) id := int64(ms.GetID()) p.c.mu.RLock() diff := p.gr.milestones[id].GenMutationDiff(ms) p.c.mu.RUnlock() if diff == nil { return nil } if mut == nil { mut = &maintpb.GithubMutation{ Owner: p.Owner(), Repo: p.Repo(), } } mut.Milestones = append(mut.Milestones, diff) changes++ return nil }) if err != nil { return err } p.logf("%d milestone changes.", changes) if changes == 0 { return nil } p.c.addMutation(&maintpb.Mutation{Github: mut}) return nil } func (p *githubRepoPoller) syncLabels(ctx context.Context) error { var mut *maintpb.GithubMutation // lazy init var changes int err := p.foreachItem(ctx, 1, p.getLabelPage, func(e interface{}) error { lb := e.(*github.Label) id := int64(lb.GetID()) p.c.mu.RLock() diff := p.gr.labels[id].GenMutationDiff(lb) p.c.mu.RUnlock() if diff == nil { return nil } if mut == nil { mut = &maintpb.GithubMutation{ Owner: p.Owner(), Repo: p.Repo(), } } mut.Labels = append(mut.Labels, diff) changes++ return nil }) if err != nil { return err } p.logf("%d label changes.", changes) if changes == 0 { return nil } p.c.addMutation(&maintpb.Mutation{Github: mut}) return nil } func (p *githubRepoPoller) getMilestonePage(ctx context.Context, page int) ([]interface{}, *github.Response, error) { ms, res, err := p.githubCaching.Issues.ListMilestones(ctx, p.Owner(), p.Repo(), &github.MilestoneListOptions{ State: "all", ListOptions: github.ListOptions{Page: page}, }) if err != nil { return nil, nil, err } its := make([]interface{}, len(ms)) for i, m := range ms { its[i] = m } return its, res, err } func (p *githubRepoPoller) getLabelPage(ctx context.Context, page int) ([]interface{}, *github.Response, error) { ls, res, err := p.githubCaching.Issues.ListLabels(ctx, p.Owner(), p.Repo(), &github.ListOptions{ Page: page, }) if err != nil { return nil, nil, err } its := make([]interface{}, len(ls)) for i, lb := range ls { its[i] = lb } return its, res, err } // foreachItem walks over all pages of items from getPage and calls fn for each item. // If the first page's response was cached, fn is never called. func (p *githubRepoPoller) foreachItem( ctx context.Context, page int, getPage func(ctx context.Context, page int) ([]interface{}, *github.Response, error), fn func(interface{}) error) error { for { select { case <-ctx.Done(): return ctx.Err() default: } items, res, err := getPage(ctx, page) if err != nil { if canRetry(ctx, err) { continue } return err } if len(items) == 0 { return nil } fromCache := page == 1 && res.Response.Header.Get(xFromCache) == "1" if fromCache { log.Printf("no new items of type %T", items[0]) // No need to walk over these again. return nil } // TODO: use res.Rate (sleep until Reset if Limit == 0) for _, it := range items { if err := fn(it); err != nil { return err } } if res.NextPage == 0 { return nil } page = res.NextPage } } func (p *githubRepoPoller) syncIssues(ctx context.Context, expectChanges bool) error { page := 1 seen := make(map[int64]bool) keepGoing := true owner, repo := p.gr.id.Owner, p.gr.id.Repo for keepGoing { ghc := p.githubCaching if expectChanges { ghc = p.githubDirect } issues, res, err := ghc.Issues.ListByRepo(ctx, owner, repo, &github.IssueListByRepoOptions{ State: "all", Sort: "updated", Direction: "desc", ListOptions: github.ListOptions{ Page: page, PerPage: 100, }, }) if err != nil { if canRetry(ctx, err) { continue } return err } // See https://developer.github.com/v3/activity/events/ for X-Poll-Interval: if pi := res.Response.Header.Get("X-Poll-Interval"); pi != "" { nsec, _ := strconv.Atoi(pi) d := time.Duration(nsec) * time.Second p.logf("Requested to adjust poll interval to %v", d) // TODO: return an error type up that the sync loop can use // to adjust its default interval. // For now, ignore. } fromCache := res.Response.Header.Get(xFromCache) == "1" if len(issues) == 0 { p.logf("issues: reached end.") break } didMilestoneLabelSync := false changes := 0 for _, is := range issues { id := int64(is.GetID()) if seen[id] { // If an issue gets updated (and bumped to the top) while we // are paging, it's possible the last issue from page N can // appear as the first issue on page N+1. Don't process that // issue twice. // https://github.com/google/go-github/issues/566 continue } seen[id] = true var mp *maintpb.Mutation p.c.mu.RLock() { gi := p.gr.issues[int32(*is.Number)] mp = p.gr.newMutationFromIssue(gi, is) } p.c.mu.RUnlock() if mp == nil { continue } // If there's something new (not a cached response), // then check for updated milestones and labels before // creating issue mutations below. Doesn't matter // much, but helps to have it all loaded. if !fromCache && !didMilestoneLabelSync { didMilestoneLabelSync = true group, ctx := errgroup.WithContext(ctx) group.Go(func() error { return p.syncMilestones(ctx) }) group.Go(func() error { return p.syncLabels(ctx) }) if err := group.Wait(); err != nil { return err } } changes++ p.logf("changed issue %d: %s", is.GetNumber(), is.GetTitle()) p.c.addMutation(mp) p.lastUpdate = time.Now() } if changes == 0 { missing := p.gr.missingIssues() if len(missing) == 0 { p.logf("no changed issues; cached=%v", fromCache) return nil } if len(missing) > 0 { p.logf("%d missing github issues.", len(missing)) } if len(missing) < 100 { keepGoing = false } } p.c.mu.RLock() num := len(p.gr.issues) p.c.mu.RUnlock() p.logf("After page %d: %v issues, %v changes, %v issues in memory", page, len(issues), changes, num) page++ } missing := p.gr.missingIssues() if len(missing) > 0 { p.logf("remaining issues: %v", missing) for _, num := range missing { p.logf("getting issue %v ...", num) var issue *github.Issue var err error for { issue, _, err = p.githubDirect.Issues.Get(ctx, owner, repo, int(num)) if canRetry(ctx, err) { continue } break } if ge, ok := err.(*github.ErrorResponse); ok && (ge.Response.StatusCode == http.StatusNotFound || ge.Response.StatusCode == http.StatusGone) { mut := &maintpb.Mutation{ GithubIssue: &maintpb.GithubIssueMutation{ Owner: owner, Repo: repo, Number: num, NotExist: true, }, } p.logf("issue %d is gone, marking as NotExist", num) p.c.addMutation(mut) continue } else if err != nil { return err } mp := p.gr.newMutationFromIssue(nil, issue) if mp == nil { continue } p.logf("modified issue %d: %s", issue.GetNumber(), issue.GetTitle()) p.c.addMutation(mp) p.lastUpdate = time.Now() } } return nil } func (p *githubRepoPoller) issueNumbersWithStaleCommentSync() (issueNums []int32) { p.c.mu.RLock() defer p.c.mu.RUnlock() for n, gi := range p.gr.issues { if !gi.commentsSynced() { issueNums = append(issueNums, n) } } sort.Slice(issueNums, func(i, j int) bool { return issueNums[i] < issueNums[j] }) return issueNums } func (p *githubRepoPoller) syncComments(ctx context.Context) error { for { nums := p.issueNumbersWithStaleCommentSync() if len(nums) == 0 { return nil } remain := len(nums) for _, num := range nums { p.logf("comment sync: %d issues remaining; syncing issue %v", remain, num) if err := p.syncCommentsOnIssue(ctx, num); err != nil { p.logf("comment sync on issue %d: %v", num, err) return err } remain-- } } } func (p *githubRepoPoller) syncCommentsOnIssue(ctx context.Context, issueNum int32) error { p.c.mu.RLock() issue := p.gr.issues[issueNum] if issue == nil { p.c.mu.RUnlock() return fmt.Errorf("unknown issue number %v", issueNum) } since := issue.commentsUpdatedTil p.c.mu.RUnlock() owner, repo := p.gr.id.Owner, p.gr.id.Repo morePages := true // at least try the first. might be empty. for morePages { opt := &github.IssueListCommentsOptions{ Direction: github.String("asc"), Sort: github.String("updated"), ListOptions: github.ListOptions{PerPage: 100}, } if !since.IsZero() { opt.Since = &since } ics, res, err := p.githubDirect.Issues.ListComments(ctx, owner, repo, int(issueNum), opt) if canRetry(ctx, err) { continue } else if ge, ok := err.(*github.ErrorResponse); ok && (ge.Response.StatusCode == http.StatusNotFound || ge.Response.StatusCode == http.StatusGone) { mut := &maintpb.Mutation{ GithubIssue: &maintpb.GithubIssueMutation{ Owner: owner, Repo: repo, Number: issueNum, NotExist: true, }, } p.logf("issue %d comments are gone, marking as NotExist", issueNum) p.c.addMutation(mut) return nil } else if err != nil { return err } serverDate, err := http.ParseTime(res.Header.Get("Date")) if err != nil { return fmt.Errorf("invalid server Date response: %v", err) } serverDate = serverDate.UTC() p.logf("Number of comments on issue %d since %v: %v", issueNum, since, len(ics)) mut := &maintpb.Mutation{ GithubIssue: &maintpb.GithubIssueMutation{ Owner: owner, Repo: repo, Number: issueNum, }, } p.c.mu.RLock() for _, ic := range ics { if ic.ID == nil || ic.Body == nil || ic.User == nil || ic.CreatedAt == nil || ic.UpdatedAt == nil { // Bogus. p.logf("bogus comment: %v", ic) continue } created := timestamppb.New(*ic.CreatedAt) updated := timestamppb.New(*ic.UpdatedAt) since = *ic.UpdatedAt // for next round id := int64(*ic.ID) cur := issue.comments[id] // TODO: does a reaction update a comment's UpdatedAt time? var cmut *maintpb.GithubIssueCommentMutation if cur == nil { cmut = &maintpb.GithubIssueCommentMutation{ Id: id, User: &maintpb.GithubUser{ Id: int64(*ic.User.ID), Login: *ic.User.Login, }, Body: *ic.Body, Created: created, Updated: updated, } } else if !cur.Updated.Equal(*ic.UpdatedAt) || cur.Body != *ic.Body { cmut = &maintpb.GithubIssueCommentMutation{ Id: id, } if !cur.Updated.Equal(*ic.UpdatedAt) { cmut.Updated = updated } if cur.Body != *ic.Body { cmut.Body = *ic.Body } } if cmut != nil { mut.GithubIssue.Comment = append(mut.GithubIssue.Comment, cmut) } } p.c.mu.RUnlock() if res.NextPage == 0 { mut.GithubIssue.CommentStatus = &maintpb.GithubIssueSyncStatus{ ServerDate: timestamppb.New(serverDate), } morePages = false } p.c.addMutation(mut) } return nil } func (p *githubRepoPoller) issueNumbersWithStaleEventSync() (issueNums []int32) { p.c.mu.RLock() defer p.c.mu.RUnlock() for n, gi := range p.gr.issues { if !gi.eventsSynced() { issueNums = append(issueNums, n) } } sort.Slice(issueNums, func(i, j int) bool { return issueNums[i] < issueNums[j] }) return issueNums } func (p *githubRepoPoller) syncEvents(ctx context.Context) error { for { nums := p.issueNumbersWithStaleEventSync() if len(nums) == 0 { return nil } remain := len(nums) for _, num := range nums { p.logf("event sync: %d issues remaining; syncing issue %v", remain, num) if err := p.syncEventsOnIssue(ctx, num); err != nil { p.logf("event sync on issue %d: %v", num, err) return err } remain-- } } } func (p *githubRepoPoller) syncEventsOnIssue(ctx context.Context, issueNum int32) error { const perPage = 100 p.c.mu.RLock() gi := p.gr.issues[issueNum] if gi == nil { panic(fmt.Sprintf("bogus issue %v", issueNum)) } have := len(gi.events) p.c.mu.RUnlock() skipPages := have / perPage mut := &maintpb.Mutation{ GithubIssue: &maintpb.GithubIssueMutation{ Owner: p.Owner(), Repo: p.Repo(), Number: issueNum, }, } err := p.foreachItem(ctx, 1+skipPages, func(ctx context.Context, page int) ([]interface{}, *github.Response, error) { u := fmt.Sprintf("https://api.github.com/repos/%s/%s/issues/%v/events?per_page=%v&page=%v", p.Owner(), p.Repo(), issueNum, perPage, page) req, _ := http.NewRequest("GET", u, nil) req.Header.Set("Authorization", "Bearer "+p.token) req.Header.Set("User-Agent", "golang-x-build-maintner/1.0") ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() req = req.WithContext(ctx) res, err := p.client.Do(req) if err != nil { log.Printf("Fetching %s: %v", u, err) return nil, nil, err } log.Printf("Fetching %s: %v", u, res.Status) ghResp := makeGithubResponse(res) if err := github.CheckResponse(res); err != nil { log.Printf("Fetching %s: %v: %+v", u, res.Status, res.Header) log.Printf("GitHub error %s: %v", u, ghResp) return nil, nil, err } evts, err := parseGithubEvents(res.Body) if err != nil { return nil, nil, fmt.Errorf("%s: parse github events: %v", u, err) } is := make([]interface{}, len(evts)) for i, v := range evts { is[i] = v } serverDate, err := http.ParseTime(res.Header.Get("Date")) if err != nil { return nil, nil, fmt.Errorf("invalid server Date response: %v", err) } mut.GithubIssue.EventStatus = &maintpb.GithubIssueSyncStatus{ ServerDate: timestamppb.New(serverDate.UTC()), } return is, ghResp, err }, func(v interface{}) error { ge := v.(*GitHubIssueEvent) p.c.mu.RLock() _, ok := gi.events[ge.ID] p.c.mu.RUnlock() if ok { // Already have it. And they're // assumed to be immutable, so the // copy we already have should be // good. Don't add to mutation log. return nil } mut.GithubIssue.Event = append(mut.GithubIssue.Event, ge.Proto()) return nil }) if err != nil { return err } p.c.addMutation(mut) return nil } // parseGithubEvents parses the JSON array of GitHub issue events in r. // It does this the very manual way (using map[string]interface{}) // instead of using nice types because https://golang.org/issue/15314 // isn't implemented yet and also because even if it were implemented, // this code still wants to preserve any unknown fields to store in // the "OtherJSON" field for future updates of the code to parse. (If // GitHub adds new Event types in the future, we want to archive them, // even if we don't understand them) func parseGithubEvents(r io.Reader) ([]*GitHubIssueEvent, error) { var jevents []map[string]interface{} jd := json.NewDecoder(r) jd.UseNumber() if err := jd.Decode(&jevents); err != nil { return nil, err } var evts []*GitHubIssueEvent for _, em := range jevents { for k, v := range em { if v == nil { delete(em, k) } } delete(em, "url") e := &GitHubIssueEvent{} e.Type, _ = em["event"].(string) delete(em, "event") e.ID = jint64(em["id"]) delete(em, "id") // TODO: store these two more compactly: e.CommitID, _ = em["commit_id"].(string) // "5383ecf5a0824649ffcc0349f00f0317575753d0" delete(em, "commit_id") e.CommitURL, _ = em["commit_url"].(string) // "https://api.github.com/repos/bradfitz/go-issue-mirror/commits/5383ecf5a0824649ffcc0349f00f0317575753d0" delete(em, "commit_url") getUser := func(field string, gup **GitHubUser) { am, ok := em[field].(map[string]interface{}) if !ok { return } delete(em, field) gu := &GitHubUser{ID: jint64(am["id"])} gu.Login, _ = am["login"].(string) *gup = gu } getUser("actor", &e.Actor) getUser("assignee", &e.Assignee) getUser("assigner", &e.Assigner) getUser("requested_reviewer", &e.Reviewer) getUser("review_requester", &e.ReviewRequester) if lm, ok := em["label"].(map[string]interface{}); ok { delete(em, "label") e.Label, _ = lm["name"].(string) } if mm, ok := em["milestone"].(map[string]interface{}); ok { delete(em, "milestone") e.Milestone, _ = mm["title"].(string) } if rm, ok := em["rename"].(map[string]interface{}); ok { delete(em, "rename") e.From, _ = rm["from"].(string) e.To, _ = rm["to"].(string) } if createdStr, ok := em["created_at"].(string); ok { delete(em, "created_at") var err error e.Created, err = time.Parse(time.RFC3339, createdStr) if err != nil { return nil, err } e.Created = e.Created.UTC() } if dr, ok := em["dismissed_review"]; ok { delete(em, "dismissed_review") drm := dr.(map[string]interface{}) dro := &GitHubDismissedReviewEvent{} dro.ReviewID = jint64(drm["review_id"]) if state, ok := drm["state"].(string); ok { dro.State = state } else { log.Printf("got type %T for 'state' field, expected string in %+v", drm["state"], drm) } dro.DismissalMessage, _ = drm["dismissal_message"].(string) e.DismissedReview = dro } if rt, ok := em["requested_team"]; ok { delete(em, "requested_team") rtm, ok := rt.(map[string]interface{}) if !ok { log.Printf("got value %+v for 'requested_team' field, wanted a map with 'id' and 'slug' fields", rt) } else { t := &GitHubTeam{} t.ID = jint64(rtm["id"]) t.Slug, _ = rtm["slug"].(string) e.TeamReviewer = t } } delete(em, "node_id") // GitHub API v4 Global Node ID; don't store it. delete(em, "lock_reason") // Not stored. otherJSON, _ := json.Marshal(em) e.OtherJSON = string(otherJSON) if e.OtherJSON == "{}" { e.OtherJSON = "" } if e.OtherJSON != "" { log.Printf("warning: storing unknown field(s) in GitHub issue event: %s", e.OtherJSON) } evts = append(evts, e) } return evts, nil } func (p *githubRepoPoller) issueNumbersWithStaleReviewsSync() (issueNums []int32) { p.c.mu.RLock() defer p.c.mu.RUnlock() for n, gi := range p.gr.issues { if gi.PullRequest && !gi.reviewsSynced() { issueNums = append(issueNums, n) } } sort.Slice(issueNums, func(i, j int) bool { return issueNums[i] < issueNums[j] }) return issueNums } func (p *githubRepoPoller) syncReviews(ctx context.Context) error { for { nums := p.issueNumbersWithStaleReviewsSync() if len(nums) == 0 { return nil } remain := len(nums) for _, num := range nums { p.logf("reviews sync: %d issues remaining; syncing issue %v", remain, num) if err := p.syncReviewsOnPullRequest(ctx, num); err != nil { p.logf("review sync on issue %d: %v", num, err) return err } remain-- } } } func (p *githubRepoPoller) syncReviewsOnPullRequest(ctx context.Context, issueNum int32) error { const perPage = 100 p.c.mu.RLock() gi := p.gr.issues[issueNum] if gi == nil { p.c.mu.RUnlock() panic(fmt.Sprintf("bogus issue %v", issueNum)) } if !gi.PullRequest { p.c.mu.RUnlock() return nil } have := len(gi.reviews) p.c.mu.RUnlock() skipPages := have / perPage mut := &maintpb.Mutation{ GithubIssue: &maintpb.GithubIssueMutation{ Owner: p.Owner(), Repo: p.Repo(), Number: issueNum, }, } err := p.foreachItem(ctx, 1+skipPages, func(ctx context.Context, page int) ([]interface{}, *github.Response, error) { u := fmt.Sprintf("https://api.github.com/repos/%s/%s/pulls/%v/reviews?per_page=%v&page=%v", p.Owner(), p.Repo(), issueNum, perPage, page) req, _ := http.NewRequest("GET", u, nil) req.Header.Set("Authorization", "Bearer "+p.token) req.Header.Set("User-Agent", "golang-x-build-maintner/1.0") ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() req = req.WithContext(ctx) res, err := http.DefaultClient.Do(req) if err != nil { log.Printf("Fetching %s: %v", u, err) return nil, nil, err } log.Printf("Fetching %s: %v", u, res.Status) ghResp := makeGithubResponse(res) if err := github.CheckResponse(res); err != nil { log.Printf("Fetching %s: %v: %+v", u, res.Status, res.Header) log.Printf("GitHub error %s: %v", u, ghResp) return nil, nil, err } evts, err := parseGithubReviews(res.Body) if err != nil { return nil, nil, fmt.Errorf("%s: parse github pr reviews: %v", u, err) } is := make([]interface{}, len(evts)) for i, v := range evts { is[i] = v } serverDate, err := http.ParseTime(res.Header.Get("Date")) if err != nil { return nil, nil, fmt.Errorf("invalid server Date response: %v", err) } mut.GithubIssue.ReviewStatus = &maintpb.GithubIssueSyncStatus{ ServerDate: timestamppb.New(serverDate.UTC()), } return is, ghResp, err }, func(v interface{}) error { ge := v.(*GitHubReview) p.c.mu.RLock() _, ok := gi.reviews[ge.ID] p.c.mu.RUnlock() if ok { // Already have it. And they're // assumed to be immutable, so the // copy we already have should be // good. Don't add to mutation log. return nil } mut.GithubIssue.Review = append(mut.GithubIssue.Review, ge.Proto()) return nil }) if err != nil { return err } p.c.addMutation(mut) return nil } // parseGithubReviews parses the JSON array of GitHub reviews in r. // It does this the very manual way (using map[string]interface{}) // instead of using nice types because https://golang.org/issue/15314 // isn't implemented yet and also because even if it were implemented, // this code still wants to preserve any unknown fields to store in // the "OtherJSON" field for future updates of the code to parse. (If // GitHub adds new Event types in the future, we want to archive them, // even if we don't understand them) func parseGithubReviews(r io.Reader) ([]*GitHubReview, error) { var jevents []map[string]interface{} jd := json.NewDecoder(r) jd.UseNumber() if err := jd.Decode(&jevents); err != nil { return nil, err } var evts []*GitHubReview for _, em := range jevents { for k, v := range em { if v == nil { delete(em, k) } } e := &GitHubReview{} e.ID = jint64(em["id"]) delete(em, "id") e.Body, _ = em["body"].(string) delete(em, "body") e.State, _ = em["state"].(string) delete(em, "state") // TODO: store these two more compactly: e.CommitID, _ = em["commit_id"].(string) // "5383ecf5a0824649ffcc0349f00f0317575753d0" delete(em, "commit_id") getUser := func(field string, gup **GitHubUser) { am, ok := em[field].(map[string]interface{}) if !ok { return } delete(em, field) gu := &GitHubUser{ID: jint64(am["id"])} gu.Login, _ = am["login"].(string) *gup = gu } getUser("user", &e.Actor) e.ActorAssociation, _ = em["author_association"].(string) delete(em, "author_association") if createdStr, ok := em["submitted_at"].(string); ok { delete(em, "submitted_at") var err error e.Created, err = time.Parse(time.RFC3339, createdStr) if err != nil { return nil, err } e.Created = e.Created.UTC() } delete(em, "node_id") // GitHub API v4 Global Node ID; don't store it. delete(em, "html_url") // not needed. delete(em, "pull_request_url") // not needed. delete(em, "_links") // not needed. (duplicate data of above two nodes) otherJSON, _ := json.Marshal(em) e.OtherJSON = string(otherJSON) if e.OtherJSON == "{}" { e.OtherJSON = "" } if e.OtherJSON != "" { log.Printf("warning: storing unknown field(s) in GitHub review: %s", e.OtherJSON) } evts = append(evts, e) } return evts, nil } // jint64 return an int64 from the provided JSON object value v. func jint64(v interface{}) int64 { switch v := v.(type) { case nil: return 0 case json.Number: n, _ := strconv.ParseInt(string(v), 10, 64) return n default: panic(fmt.Sprintf("unexpected type %T", v)) } } // copy of go-github's parseRate, basically. func parseRate(r *http.Response) github.Rate { var rate github.Rate // Note: even though the header names below are not canonical (the // canonical form would be X-Ratelimit-Limit), this particular // casing is what GitHub returns. See headerRateRemaining in // package go-github. if limit := r.Header.Get("X-RateLimit-Limit"); limit != "" { rate.Limit, _ = strconv.Atoi(limit) } if remaining := r.Header.Get("X-RateLimit-Remaining"); remaining != "" { rate.Remaining, _ = strconv.Atoi(remaining) } if reset := r.Header.Get("X-RateLimit-Reset"); reset != "" { if v, _ := strconv.ParseInt(reset, 10, 64); v != 0 { rate.Reset = github.Timestamp{time.Unix(v, 0)} } } return rate } // Copy of go-github's func newResponse, basically. func makeGithubResponse(res *http.Response) *github.Response { gr := &github.Response{Response: res} gr.Rate = parseRate(res) for _, lv := range res.Header["Link"] { for _, link := range strings.Split(lv, ",") { segs := strings.Split(strings.TrimSpace(link), ";") if len(segs) < 2 { continue } // ensure href is properly formatted if !strings.HasPrefix(segs[0], "<") || !strings.HasSuffix(segs[0], ">") { continue } // try to pull out page parameter u, err := url.Parse(segs[0][1 : len(segs[0])-1]) if err != nil { continue } page := u.Query().Get("page") if page == "" { continue } for _, seg := range segs[1:] { switch strings.TrimSpace(seg) { case `rel="next"`: gr.NextPage, _ = strconv.Atoi(page) case `rel="prev"`: gr.PrevPage, _ = strconv.Atoi(page) case `rel="first"`: gr.FirstPage, _ = strconv.Atoi(page) case `rel="last"`: gr.LastPage, _ = strconv.Atoi(page) } } } } return gr } var rxReferences = regexp.MustCompile(`(?:\b([\w\-]+)/([\w\-]+))?\#(\d+)\b`) // parseGithubRefs parses references to GitHub issues from commit message commitMsg. // Multiple references to the same issue are deduplicated. func (c *Corpus) parseGithubRefs(gerritProj string, commitMsg string) []GitHubIssueRef { // Use of rxReferences by itself caused this function to take 20% of the CPU time. // TODO(bradfitz): stop using regexps here. // But in the meantime, help the regexp engine with this one weird trick: // Reduce the length of the string given to FindAllStringSubmatch. // Discard all lines before the first line containing a '#'. // The "Fixes #nnnn" is usually at the end, so this discards most of the input. // Now CPU is only 2% instead of 20%. hash := strings.IndexByte(commitMsg, '#') if hash == -1 { return nil } nl := strings.LastIndexByte(commitMsg[:hash], '\n') commitMsg = commitMsg[nl+1:] // TODO: use FindAllStringSubmatchIndex instead, so we can // back up and see what's behind it and ignore "#1", "#2", // "#3" 'references' which are actually bullets or ARM // disassembly, and only respect them as real if they have the // word "Fixes " or "Issue " or similar before them. ms := rxReferences.FindAllStringSubmatch(commitMsg, -1) if len(ms) == 0 { return nil } /* e.g. 2017/03/30 21:42:07 matches: [["golang/go#9327" "golang" "go" "9327"]] 2017/03/30 21:42:07 matches: [["golang/go#16512" "golang" "go" "16512"] ["golang/go#18404" "golang" "go" "18404"]] 2017/03/30 21:42:07 matches: [["#1" "" "" "1"]] 2017/03/30 21:42:07 matches: [["#10234" "" "" "10234"]] 2017/03/30 21:42:31 matches: [["GoogleCloudPlatform/gcloud-golang#262" "GoogleCloudPlatform" "gcloud-golang" "262"]] 2017/03/30 21:42:31 matches: [["GoogleCloudPlatform/google-cloud-go#481" "GoogleCloudPlatform" "google-cloud-go" "481"]] */ c.initGithub() github := c.GitHub() refs := make([]GitHubIssueRef, 0, len(ms)) for _, m := range ms { owner, repo, numStr := strings.ToLower(m[1]), strings.ToLower(m[2]), m[3] num, err := strconv.ParseInt(numStr, 10, 32) if err != nil { continue } if owner == "" { if gerritProj == "go.googlesource.com/go" { owner, repo = "golang", "go" } else { continue } } ref := GitHubIssueRef{github.getOrCreateRepo(owner, repo), int32(num)} if contains(refs, ref) { continue } refs = append(refs, ref) } return refs } // contains reports whether refs contains the reference ref. func contains(refs []GitHubIssueRef, ref GitHubIssueRef) bool { for _, r := range refs { if r == ref { return true } } return false } type limitTransport struct { limiter *rate.Limiter base http.RoundTripper } func (t limitTransport) RoundTrip(r *http.Request) (*http.Response, error) { limiter := t.limiter // NOTE(cbro): limiter should not be nil, but check defensively. if limiter != nil { if err := limiter.Wait(r.Context()); err != nil { return nil, err } } return t.base.RoundTrip(r) } // canRetry reports whether ctx hasn't been canceled and err is a non-nil retryable error. // If so, it blocks until enough time passes so that it's acceptable to retry immediately. func canRetry(ctx context.Context, err error) bool { switch e := err.(type) { case *github.RateLimitError: log.Printf("GitHub rate limit error: %s, waiting until %s", e.Message, e.Rate.Reset.Time) ctx, cancel := context.WithDeadline(ctx, e.Rate.Reset.Time) defer cancel() <-ctx.Done() return ctx.Err() != context.Canceled case *github.AbuseRateLimitError: if e.RetryAfter != nil { log.Printf("GitHub rate abuse error: %s, waiting for %s", e.Message, *e.RetryAfter) ctx, cancel := context.WithTimeout(ctx, *e.RetryAfter) defer cancel() <-ctx.Done() return ctx.Err() != context.Canceled } log.Printf("GitHub rate abuse error: %s", e.Message) } return false }