From 99a39ce56500df70847dcb2303aa0dd3d461a8b4 Mon Sep 17 00:00:00 2001 From: Martin Strobel Date: Wed, 20 Dec 2017 13:46:06 -0800 Subject: [PATCH] Adding MirrorFinder which combines output of two children. (#3) * Adding a first whack... but there is some non-determinism that causes occasional failure. * + MirrorFinder which combines output of two children. Doing this allows for decoration and merge of multiple MirrorFinders into a single logical one. * formatting --- merge_finder.go | 47 ++++++++++++++++++++++++ merge_finder_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++ mirror_finder_test.go | 35 +++++++++--------- 3 files changed, 149 insertions(+), 16 deletions(-) create mode 100644 merge_finder.go create mode 100644 merge_finder_test.go diff --git a/merge_finder.go b/merge_finder.go new file mode 100644 index 0000000..825129e --- /dev/null +++ b/merge_finder.go @@ -0,0 +1,47 @@ +package mirrorcat + +import "context" + +// MergeFinder allows a mechanism to find mirror mappings from multiple underlying MirrorFinders. +type MergeFinder []MirrorFinder + +// FindMirrors enumerates each MirrorFinder, and +func (haystack MergeFinder) FindMirrors(ctx context.Context, needle RemoteRef, results chan<- RemoteRef) (err error) { + defer close(results) + + for _, finder := range haystack { + // Due to the fact that all FindMirrors implementations must close the results channel to communicate + // that no more matches have been found, we must create a layer of separation between the merged results + // and the results from each child MirrorFinder. + intermediate := make(chan RemoteRef) + + errs := make(chan error, 1) + + // Kick-off a goroutine to fetch the child's matching mirrors. + go func() { + select { + case errs <- finder.FindMirrors(ctx, needle, intermediate): + // Intentionally Left Blank + case <-ctx.Done(): + // This case prevents leaking this goroutine in the case that the underlying type + // of `finder` does not respect cancellation tokens appropriately. + errs <- ctx.Err() + } + }() + + for mirror := range intermediate { + select { + case results <- mirror: + // Intentionally Left Blank + case <-ctx.Done(): + return ctx.Err() + } + } + + err = <-errs + if err != nil { + return + } + } + return +} diff --git a/merge_finder_test.go b/merge_finder_test.go new file mode 100644 index 0000000..a8c8cf5 --- /dev/null +++ b/merge_finder_test.go @@ -0,0 +1,83 @@ +package mirrorcat_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/Azure/mirrorcat" +) + +func ExampleMergeFinder_FindMirrors() { + const mainRepo = "github.com/Azure/mirrorcat" + const secondaryRepo = "github.com/marstr/mirrorcat" + + child1, child2 := mirrorcat.NewDefaultMirrorFinder(), mirrorcat.NewDefaultMirrorFinder() + + orig := mirrorcat.RemoteRef{Repository: mainRepo, Ref: "master"} + + child1.AddMirrors(orig, mirrorcat.RemoteRef{Repository: secondaryRepo, Ref: "master"}) + child2.AddMirrors(orig, mirrorcat.RemoteRef{Repository: mainRepo, Ref: "dev"}) + + subject := mirrorcat.MergeFinder([]mirrorcat.MirrorFinder{child1, child2}) + + results := make(chan mirrorcat.RemoteRef) + + var err error + go func() { + err = subject.FindMirrors(context.Background(), orig, results) + }() + + for result := range results { + fmt.Printf("%s:%s\n", result.Repository, result.Ref) + } + fmt.Println(err) + + // Output: + // github.com/marstr/mirrorcat:master + // github.com/Azure/mirrorcat:dev + // +} + +func TestMergeFinder_FindMirrors_RespectsCancel(t *testing.T) { + const mainRepo = "github.com/Azure/mirrorcat" + const secondaryRepo = "github.com/marstr/mirrorcat" + + child1, child2 := mirrorcat.NewDefaultMirrorFinder(), mirrorcat.NewDefaultMirrorFinder() + + orig := mirrorcat.RemoteRef{Repository: mainRepo, Ref: "master"} + + child1.AddMirrors(orig, mirrorcat.RemoteRef{Repository: secondaryRepo, Ref: "master"}) + child2.AddMirrors(orig, mirrorcat.RemoteRef{Repository: mainRepo, Ref: "dev"}) + + subject := mirrorcat.MergeFinder([]mirrorcat.MirrorFinder{child1, child2}) + + outside, cancelOutside := context.WithTimeout(context.Background(), time.Second*3) + defer cancelOutside() + + inside, cancelInside := context.WithCancel(outside) + + results := make(chan mirrorcat.RemoteRef) + + errs := make(chan error, 1) + + go func() { + errs <- subject.FindMirrors(inside, orig, results) + t.Log("Finished FindMirrors routine") + }() + + cancelInside() + + select { + case err := <-errs: + t.Log("error received: ", err) + if err == nil || !strings.Contains(err.Error(), "cancel") { + t.Log("expected error to be a cancellation message") + t.Fail() + } + case <-outside.Done(): + t.Errorf("timed out") + } +} diff --git a/mirror_finder_test.go b/mirror_finder_test.go index 59ca0d7..7224997 100644 --- a/mirror_finder_test.go +++ b/mirror_finder_test.go @@ -3,6 +3,7 @@ package mirrorcat_test import ( "context" "fmt" + "strings" "testing" "time" @@ -31,7 +32,9 @@ func ExampleDefaultMirrorFinder() { results := make(chan mirrorcat.RemoteRef) - ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + go subject.FindMirrors(ctx, original, results) loop: @@ -70,30 +73,30 @@ func TestDefaultMirrorFinder_FindMirrors_RespectsCancellation(t *testing.T) { }, } + outer, cancelOuter := context.WithTimeout(context.Background(), 3*time.Second) + defer cancelOuter() + subject := mirrorcat.NewDefaultMirrorFinder() subject.AddMirrors(original, mirrors...) - ctx, cancel := context.WithCancel(context.Background()) + inner, cancelInner := context.WithCancel(outer) results := make(chan mirrorcat.RemoteRef) - go subject.FindMirrors(ctx, original, results) + errs := make(chan error, 1) + go func() { + errs <- subject.FindMirrors(inner, original, results) + }() - cancel() - - // There's a race-condition imposed here, because FindMirrors will race the cancel() - // function to see which gets to the read first. By waiting for 20 milliseconds, we can - // pretty well ensure that cancel() wins the race. (cancel needs to complete before - // the read/write handshake happens on results between this function and FindMirrors) - <-time.After(20 * time.Millisecond) + cancelInner() select { - case _, ok := <-results: - if ok { - t.Logf("Able to read from results, and the channel was not closed.") + case err := <-errs: + t.Log("error received: ", err) + if err == nil || !strings.Contains(err.Error(), "cancel") { + t.Log("expected error to be a cancellation message") t.Fail() } - default: - t.Logf("Unable to read from results, it was not closed.") - t.Fail() + case <-outer.Done(): + t.Error("timed out") } }