Wrap all sourceGateway inner calls w/callMgr

This commit is contained in:
sam boyer 2017-03-31 00:00:00 -04:00
Родитель 75dec7b2dc
Коммит b4f6a3f7df
8 изменённых файлов: 156 добавлений и 80 удалений

Просмотреть файл

@ -685,13 +685,6 @@ type httpMetadataDeducer struct {
func (hmd *httpMetadataDeducer) deduce(ctx context.Context, path string) (pathDeduction, error) {
hmd.once.Do(func() {
ctx, doneFunc, err := hmd.callMgr.setUpCall(ctx, path, ctHTTPMetadata)
if err != nil {
hmd.deduceErr = err
return
}
defer doneFunc()
opath := path
u, path, err := normalizeURI(path)
if err != nil {
@ -702,9 +695,16 @@ func (hmd *httpMetadataDeducer) deduce(ctx context.Context, path string) (pathDe
pd := pathDeduction{}
// Make the HTTP call to attempt to retrieve go-get metadata
root, vcs, reporoot, err := parseMetadata(ctx, path, u.Scheme)
var root, vcs, reporoot string
err = hmd.callMgr.do(ctx, path, ctHTTPMetadata, func(ctx context.Context) error {
root, vcs, reporoot, err = parseMetadata(ctx, path, u.Scheme)
return err
})
if err != nil {
hmd.deduceErr = fmt.Errorf("unable to deduce repository and source type for: %q", opath)
if err == context.Canceled || err == context.DeadlineExceeded {
hmd.deduceErr = err
}
return
}
pd.root = root

Просмотреть файл

@ -501,7 +501,7 @@ func TestDeduceProjectRoot(t *testing.T) {
t.Errorf("Wrong project root was deduced;\n\t(GOT) %s\n\t(WNT) %s", pr, in)
}
if sm.deduceCoord.rootxt.Len() != 3 {
t.Errorf("Root path trie should have three elements, one for each unique; has %v", sm.deduceCoord.rootxt.Len())
t.Errorf("Root path trie should have three elements, one for each unique root; has %v", sm.deduceCoord.rootxt.Len())
}
// Ensure that vcs extension-based matching comes through
@ -803,7 +803,7 @@ func TestCallManager(t *testing.T) {
typ: 0,
}
_, err := cm.run(ci)
_, err := cm.start(ci)
if err != nil {
t.Fatal("unexpected err on setUpCall:", err)
}
@ -817,11 +817,20 @@ func TestCallManager(t *testing.T) {
t.Fatalf("wrong count of running ci: wanted 1 got %v", tc.count)
}
// run another, but via setUpCall
_, doneFunc, err := cm.setUpCall(bgc, "foo", 0)
if err != nil {
t.Fatal("unexpected err on setUpCall:", err)
}
// run another, but via do
block, wait := make(chan struct{}), make(chan struct{})
go func() {
wait <- struct{}{}
err := cm.do(bgc, "foo", 0, func(ctx context.Context) error {
<-block
return nil
})
if err != nil {
t.Fatal("unexpected err on do() completion:", err)
}
close(wait)
}()
<-wait
tc, exists = cm.running[ci]
if !exists {
@ -832,7 +841,8 @@ func TestCallManager(t *testing.T) {
t.Fatalf("wrong count of running ci: wanted 2 got %v", tc.count)
}
doneFunc()
close(block)
<-wait
if len(cm.ran) != 0 {
t.Fatal("should not record metrics until last one drops")
}
@ -857,8 +867,13 @@ func TestCallManager(t *testing.T) {
}
cancelFunc()
_, err = cm.run(ci)
_, err = cm.start(ci)
if err == nil {
t.Fatal("should have errored on cm.run() after canceling cm's input context")
}
cm.do(bgc, "foo", 0, func(ctx context.Context) error {
t.Fatal("calls should not be initiated by do() after main context is cancelled")
return nil
})
}

Просмотреть файл

@ -19,17 +19,16 @@ import (
// * Allows control over when deduction logic triggers network activity
// * Makes it easy to attempt multiple URLs for a given import path
type maybeSource interface {
//try(ctx context.Context, cachedir string, c singleSourceCache) (source, string, error)
try(ctx context.Context, cachedir string, c singleSourceCache) (source, sourceState, error)
try(ctx context.Context, cachedir string, c singleSourceCache, cm *callManager) (source, sourceState, error)
getURL() string
}
type maybeSources []maybeSource
func (mbs maybeSources) try(ctx context.Context, cachedir string, c singleSourceCache) (source, sourceState, error) {
func (mbs maybeSources) try(ctx context.Context, cachedir string, c singleSourceCache, cm *callManager) (source, sourceState, error) {
var e sourceFailures
for _, mb := range mbs {
src, state, err := mb.try(ctx, cachedir, c)
src, state, err := mb.try(ctx, cachedir, c, cm)
if err == nil {
return src, state, nil
}
@ -77,7 +76,7 @@ type maybeGitSource struct {
url *url.URL
}
func (m maybeGitSource) try(ctx context.Context, cachedir string, c singleSourceCache) (source, sourceState, error) {
func (m maybeGitSource) try(ctx context.Context, cachedir string, c singleSourceCache, cm *callManager) (source, sourceState, error) {
ustr := m.url.String()
path := filepath.Join(cachedir, "sources", sanitizer.Replace(ustr))
@ -88,7 +87,6 @@ func (m maybeGitSource) try(ctx context.Context, cachedir string, c singleSource
src := &gitSource{
baseVCSSource: baseVCSSource{
dc: c,
crepo: &repo{
r: &gitRepo{r},
rpath: path,
@ -97,9 +95,15 @@ func (m maybeGitSource) try(ctx context.Context, cachedir string, c singleSource
}
// Pinging invokes the same action as calling listVersions, so just do that.
vl, err := src.listVersions(ctx)
var vl []PairedVersion
err = cm.do(ctx, "git:lv:maybe", ctListVersions, func(ctx context.Context) (err error) {
if vl, err = src.listVersions(ctx); err != nil {
return fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
}
return nil
})
if err != nil {
return nil, 0, fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
return nil, 0, err
}
c.storeVersionMap(vl, true)
@ -128,7 +132,7 @@ type maybeGopkginSource struct {
major uint64
}
func (m maybeGopkginSource) try(ctx context.Context, cachedir string, c singleSourceCache) (source, sourceState, error) {
func (m maybeGopkginSource) try(ctx context.Context, cachedir string, c singleSourceCache, cm *callManager) (source, sourceState, error) {
// We don't actually need a fully consistent transform into the on-disk path
// - just something that's unique to the particular gopkg.in domain context.
// So, it's OK to just dumb-join the scheme with the path.
@ -143,7 +147,6 @@ func (m maybeGopkginSource) try(ctx context.Context, cachedir string, c singleSo
src := &gopkginSource{
gitSource: gitSource{
baseVCSSource: baseVCSSource{
dc: c,
crepo: &repo{
r: &gitRepo{r},
rpath: path,
@ -153,10 +156,15 @@ func (m maybeGopkginSource) try(ctx context.Context, cachedir string, c singleSo
major: m.major,
}
// Pinging invokes the same action as calling listVersions, so just do that.
vl, err := src.listVersions(ctx)
var vl []PairedVersion
err = cm.do(ctx, "git:lv:maybe", ctListVersions, func(ctx context.Context) (err error) {
if vl, err = src.listVersions(ctx); err != nil {
return fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
}
return nil
})
if err != nil {
return nil, 0, fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
return nil, 0, err
}
c.storeVersionMap(vl, true)
@ -177,7 +185,7 @@ type maybeBzrSource struct {
url *url.URL
}
func (m maybeBzrSource) try(ctx context.Context, cachedir string, c singleSourceCache) (source, sourceState, error) {
func (m maybeBzrSource) try(ctx context.Context, cachedir string, c singleSourceCache, cm *callManager) (source, sourceState, error) {
ustr := m.url.String()
path := filepath.Join(cachedir, "sources", sanitizer.Replace(ustr))
@ -186,8 +194,14 @@ func (m maybeBzrSource) try(ctx context.Context, cachedir string, c singleSource
return nil, 0, unwrapVcsErr(err)
}
if !r.Ping() {
return nil, 0, fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
err = cm.do(ctx, "bzr:ping", ctSourcePing, func(ctx context.Context) error {
if !r.Ping() {
return fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
}
return nil
})
if err != nil {
return nil, 0, err
}
state := sourceIsSetUp | sourceExistsUpstream
@ -197,7 +211,6 @@ func (m maybeBzrSource) try(ctx context.Context, cachedir string, c singleSource
src := &bzrSource{
baseVCSSource: baseVCSSource{
dc: c,
crepo: &repo{
r: &bzrRepo{r},
rpath: path,
@ -216,7 +229,7 @@ type maybeHgSource struct {
url *url.URL
}
func (m maybeHgSource) try(ctx context.Context, cachedir string, c singleSourceCache) (source, sourceState, error) {
func (m maybeHgSource) try(ctx context.Context, cachedir string, c singleSourceCache, cm *callManager) (source, sourceState, error) {
ustr := m.url.String()
path := filepath.Join(cachedir, "sources", sanitizer.Replace(ustr))
@ -225,8 +238,14 @@ func (m maybeHgSource) try(ctx context.Context, cachedir string, c singleSourceC
return nil, 0, unwrapVcsErr(err)
}
if !r.Ping() {
return nil, 0, fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
err = cm.do(ctx, "hg:ping", ctSourcePing, func(ctx context.Context) error {
if !r.Ping() {
return fmt.Errorf("remote repository at %s does not exist, or is inaccessible", ustr)
}
return nil
})
if err != nil {
return nil, 0, err
}
state := sourceIsSetUp | sourceExistsUpstream
@ -236,7 +255,6 @@ func (m maybeHgSource) try(ctx context.Context, cachedir string, c singleSourceC
src := &hgSource{
baseVCSSource: baseVCSSource{
dc: c,
crepo: &repo{
r: &hgRepo{r},
rpath: path,

Просмотреть файл

@ -284,7 +284,9 @@ func (sg *sourceGateway) exportVersionTo(ctx context.Context, v Version, to stri
return err
}
return sg.src.exportRevisionTo(r, to)
return sg.callMgr.do(ctx, sg.src.upstreamURL(), ctExportTree, func(ctx context.Context) error {
return sg.src.exportRevisionTo(r, to)
})
}
func (sg *sourceGateway) getManifestAndLock(ctx context.Context, pr ProjectRoot, v Version, an ProjectAnalyzer) (Manifest, Lock, error) {
@ -306,7 +308,12 @@ func (sg *sourceGateway) getManifestAndLock(ctx context.Context, pr ProjectRoot,
return nil, nil, err
}
m, l, err = sg.src.getManifestAndLock(pr, r, an)
name, vers := an.Info()
label := fmt.Sprintf("%s:%s.%v", sg.src.upstreamURL(), name, vers)
err = sg.callMgr.do(ctx, label, ctGetManifestAndLock, func(ctx context.Context) error {
m, l, err = sg.src.getManifestAndLock(ctx, pr, r, an)
return err
})
if err != nil {
return nil, nil, err
}
@ -336,7 +343,11 @@ func (sg *sourceGateway) listPackages(ctx context.Context, pr ProjectRoot, v Ver
return pkgtree.PackageTree{}, err
}
ptree, err = sg.src.listPackages(pr, r)
label := fmt.Sprintf("%s:%s", pr, sg.src.upstreamURL())
err = sg.callMgr.do(ctx, label, ctListPackages, func(ctx context.Context) error {
ptree, err = sg.src.listPackages(pr, r)
return err
})
if err != nil {
return pkgtree.PackageTree{}, err
}
@ -456,14 +467,20 @@ func (sg *sourceGateway) require(ctx context.Context, wanted sourceState) (errSt
switch flag {
case sourceIsSetUp:
sg.src, addlState, err = sg.maybe.try(ctx, sg.cachedir, sg.cache)
sg.src, addlState, err = sg.maybe.try(ctx, sg.cachedir, sg.cache, sg.callMgr)
case sourceExistsUpstream:
if !sg.src.existsUpstream(ctx) {
err = fmt.Errorf("%s does not exist upstream", sg.src.upstreamURL())
}
err = sg.callMgr.do(ctx, sg.src.sourceType(), ctSourcePing, func(ctx context.Context) error {
if !sg.src.existsUpstream(ctx) {
return fmt.Errorf("%s does not exist upstream", sg.src.upstreamURL())
}
return nil
})
case sourceExistsLocally:
if !sg.src.existsLocally(ctx) {
err = sg.src.initLocal(ctx)
err = sg.callMgr.do(ctx, sg.src.sourceType(), ctSourceInit, func(ctx context.Context) error {
return sg.src.initLocal(ctx)
})
if err == nil {
addlState |= sourceHasLatestLocally
} else {
@ -472,12 +489,18 @@ func (sg *sourceGateway) require(ctx context.Context, wanted sourceState) (errSt
}
case sourceHasLatestVersionList:
var pvl []PairedVersion
pvl, err = sg.src.listVersions(ctx)
err = sg.callMgr.do(ctx, sg.src.sourceType(), ctListVersions, func(ctx context.Context) error {
pvl, err = sg.src.listVersions(ctx)
return err
})
if err != nil {
sg.cache.storeVersionMap(pvl, true)
}
case sourceHasLatestLocally:
err = sg.src.updateLocal(ctx)
err = sg.callMgr.do(ctx, sg.src.sourceType(), ctSourceFetch, func(ctx context.Context) error {
return sg.src.updateLocal(ctx)
})
}
if err != nil {
@ -505,19 +528,20 @@ type source interface {
initLocal(context.Context) error
updateLocal(context.Context) error
listVersions(context.Context) ([]PairedVersion, error)
getManifestAndLock(ProjectRoot, Revision, ProjectAnalyzer) (Manifest, Lock, error)
getManifestAndLock(context.Context, ProjectRoot, Revision, ProjectAnalyzer) (Manifest, Lock, error)
listPackages(ProjectRoot, Revision) (pkgtree.PackageTree, error)
revisionPresentIn(Revision) (bool, error)
exportRevisionTo(Revision, string) error
sourceType() string
}
type baseVCSSource struct {
// Object for the cache repository
crepo *repo
}
// The project metadata cache. This is (or is intended to be) persisted to
// disk, for reuse across solver runs.
dc singleSourceCache
func (bs *baseVCSSource) sourceType() string {
return string(bs.crepo.r.Vcs())
}
func (bs *baseVCSSource) existsLocally(ctx context.Context) bool {
@ -526,23 +550,27 @@ func (bs *baseVCSSource) existsLocally(ctx context.Context) bool {
// TODO reimpl for git
func (bs *baseVCSSource) existsUpstream(ctx context.Context) bool {
return bs.crepo.r.Ping()
return !bs.crepo.r.Ping()
}
func (bs *baseVCSSource) upstreamURL() string {
return bs.crepo.r.Remote()
}
func (bs *baseVCSSource) getManifestAndLock(pr ProjectRoot, r Revision, an ProjectAnalyzer) (Manifest, Lock, error) {
func (bs *baseVCSSource) getManifestAndLock(ctx context.Context, pr ProjectRoot, r Revision, an ProjectAnalyzer) (Manifest, Lock, error) {
bs.crepo.mut.Lock()
err := bs.crepo.r.UpdateVersion(r.String())
m, l, err := an.DeriveManifestAndLock(bs.crepo.r.LocalPath(), pr)
bs.crepo.mut.Unlock()
defer bs.crepo.mut.Unlock()
err := bs.crepo.r.UpdateVersion(r.String())
if err != nil {
return nil, nil, unwrapVcsErr(err)
}
m, l, err := an.DeriveManifestAndLock(bs.crepo.r.LocalPath(), pr)
if err != nil {
return nil, nil, err
}
if l != nil && l != Lock(nil) {
l = prepLock(l)
}
@ -562,18 +590,20 @@ func (bs *baseVCSSource) initLocal(ctx context.Context) error {
bs.crepo.mut.Lock()
err := bs.crepo.r.get(ctx)
bs.crepo.mut.Unlock()
if err != nil {
return unwrapVcsErr(err)
}
return nil
}
// updateLocal ensures the local data we have about the source is fully up to date
// with what's out there over the network.
// updateLocal ensures the local data (versions and code) we have about the
// source is fully up to date with that of the canonical upstream source.
func (bs *baseVCSSource) updateLocal(ctx context.Context) error {
bs.crepo.mut.Lock()
err := bs.crepo.r.update(ctx)
bs.crepo.mut.Unlock()
if err != nil {
return unwrapVcsErr(err)
}
@ -582,15 +612,14 @@ func (bs *baseVCSSource) updateLocal(ctx context.Context) error {
func (bs *baseVCSSource) listPackages(pr ProjectRoot, r Revision) (ptree pkgtree.PackageTree, err error) {
bs.crepo.mut.Lock()
// Check out the desired version for analysis
err = bs.crepo.r.UpdateVersion(string(r))
err = bs.crepo.r.UpdateVersion(r.String())
bs.crepo.mut.Unlock()
if err != nil {
err = unwrapVcsErr(err)
} else {
ptree, err = pkgtree.ListPackages(bs.crepo.r.LocalPath(), string(pr))
}
bs.crepo.mut.Unlock()
return
}
@ -609,5 +638,5 @@ func (bs *baseVCSSource) exportRevisionTo(r Revision, to string) error {
// TODO(sdboyer) this is a simplistic approach and relying on the tools
// themselves might make it faster, but git's the overwhelming case (and has
// its own method) so fine for now
return fs.CopyDir(bs.crepo.rpath, to)
return fs.CopyDir(bs.crepo.r.LocalPath(), to)
}

Просмотреть файл

@ -533,31 +533,32 @@ func newCallManager(ctx context.Context) *callManager {
}
}
// Helper function to register a call with a callManager, combine contexts, and
// create a to-be-deferred func to clean it all up.
func (cm *callManager) setUpCall(inctx context.Context, name string, typ callType) (cctx context.Context, doneFunc func(), err error) {
// do executes the incoming closure using a conjoined context, and keeps
// counters to ensure the sourceMgr can't finish Release()ing until after all
// calls have returned.
func (cm *callManager) do(inctx context.Context, name string, typ callType, f func(context.Context) error) error {
ci := callInfo{
name: name,
typ: typ,
}
octx, err := cm.run(ci)
octx, err := cm.start(ci)
if err != nil {
return nil, nil, err
return err
}
cctx, cancelFunc := constext.Cons(inctx, octx)
return cctx, func() {
cm.done(ci)
cancelFunc() // ensure constext cancel goroutine is cleaned up
}, nil
err = f(cctx)
cm.done(ci)
cancelFunc()
return err
}
func (cm *callManager) getLifetimeContext() context.Context {
return cm.ctx
}
func (cm *callManager) run(ci callInfo) (context.Context, error) {
func (cm *callManager) start(ci callInfo) (context.Context, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.ctx.Err() != nil {
@ -608,6 +609,12 @@ const (
ctHTTPMetadata callType = iota
ctListVersions
ctGetManifestAndLock
ctListPackages
ctSourcePing
ctSourceInit
ctSourceFetch
ctCheckoutVersion
ctExportTree
)
// callInfo provides metadata about an ongoing call.

Просмотреть файл

@ -56,7 +56,8 @@ func testGitSourceInteractions(t *testing.T) {
}
ctx := context.Background()
isrc, state, err := mb.try(ctx, cpath, newMemoryCache())
callMgr := newCallManager(ctx)
isrc, state, err := mb.try(ctx, cpath, newMemoryCache(), callMgr)
if err != nil {
t.Errorf("Unexpected error while setting up gitSource for test repo: %s", err)
rf()
@ -163,7 +164,8 @@ func testGopkginSourceInteractions(t *testing.T) {
}
ctx := context.Background()
isrc, state, err := mb.try(ctx, cpath, newMemoryCache())
callMgr := newCallManager(ctx)
isrc, state, err := mb.try(ctx, cpath, newMemoryCache(), callMgr)
if err != nil {
t.Errorf("Unexpected error while setting up gopkginSource for test repo: %s", err)
return
@ -307,7 +309,8 @@ func testBzrSourceInteractions(t *testing.T) {
}
ctx := context.Background()
isrc, state, err := mb.try(ctx, cpath, newMemoryCache())
callMgr := newCallManager(ctx)
isrc, state, err := mb.try(ctx, cpath, newMemoryCache(), callMgr)
if err != nil {
t.Errorf("Unexpected error while setting up bzrSource for test repo: %s", err)
rf()
@ -424,7 +427,8 @@ func testHgSourceInteractions(t *testing.T) {
}
ctx := context.Background()
isrc, state, err := mb.try(ctx, cpath, newMemoryCache())
callMgr := newCallManager(ctx)
isrc, state, err := mb.try(ctx, cpath, newMemoryCache(), callMgr)
if err != nil {
t.Errorf("Unexpected error while setting up hgSource for test repo: %s", err)
return

Просмотреть файл

@ -16,6 +16,7 @@ type ctxRepo interface {
vcs.Repo
get(context.Context) error
update(context.Context) error
// TODO(sdboyer) implement these, pronto
//updateVersion(context.Context) error
//ping(context.Context) (bool, error)
}

Просмотреть файл

@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"
"github.com/Masterminds/semver"
"github.com/sdboyer/gps/internal/fs"
@ -67,11 +68,12 @@ func (s *gitSource) exportRevisionTo(rev Revision, to string) error {
func (s *gitSource) listVersions(ctx context.Context) (vlist []PairedVersion, err error) {
r := s.crepo.r
var out []byte
c := exec.Command("git", "ls-remote", r.Remote())
c := newMonitoredCmd(exec.Command("git", "ls-remote", r.Remote()), 30*time.Second)
// Ensure no prompting for PWs
c.Env = mergeEnvLists([]string{"GIT_ASKPASS=", "GIT_TERMINAL_PROMPT=0"}, os.Environ())
out, err = c.CombinedOutput()
c.cmd.Env = mergeEnvLists([]string{"GIT_ASKPASS=", "GIT_TERMINAL_PROMPT=0"}, os.Environ())
out, err = c.combinedOutput(ctx)
if err != nil {
return nil, err