зеркало из https://github.com/Azure/mirrorcat.git
Adding MirrorFinder which combines output of two children. (#3)
* Adding a first whack... but there is some non-determinism that causes occasional failure. * + MirrorFinder which combines output of two children. Doing this allows for decoration and merge of multiple MirrorFinders into a single logical one. * formatting
This commit is contained in:
Родитель
d43cf479f2
Коммит
99a39ce565
|
@ -0,0 +1,47 @@
|
|||
package mirrorcat
|
||||
|
||||
import "context"
|
||||
|
||||
// MergeFinder allows a mechanism to find mirror mappings from multiple underlying MirrorFinders.
|
||||
type MergeFinder []MirrorFinder
|
||||
|
||||
// FindMirrors enumerates each MirrorFinder, and
|
||||
func (haystack MergeFinder) FindMirrors(ctx context.Context, needle RemoteRef, results chan<- RemoteRef) (err error) {
|
||||
defer close(results)
|
||||
|
||||
for _, finder := range haystack {
|
||||
// Due to the fact that all FindMirrors implementations must close the results channel to communicate
|
||||
// that no more matches have been found, we must create a layer of separation between the merged results
|
||||
// and the results from each child MirrorFinder.
|
||||
intermediate := make(chan RemoteRef)
|
||||
|
||||
errs := make(chan error, 1)
|
||||
|
||||
// Kick-off a goroutine to fetch the child's matching mirrors.
|
||||
go func() {
|
||||
select {
|
||||
case errs <- finder.FindMirrors(ctx, needle, intermediate):
|
||||
// Intentionally Left Blank
|
||||
case <-ctx.Done():
|
||||
// This case prevents leaking this goroutine in the case that the underlying type
|
||||
// of `finder` does not respect cancellation tokens appropriately.
|
||||
errs <- ctx.Err()
|
||||
}
|
||||
}()
|
||||
|
||||
for mirror := range intermediate {
|
||||
select {
|
||||
case results <- mirror:
|
||||
// Intentionally Left Blank
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
err = <-errs
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
package mirrorcat_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/mirrorcat"
|
||||
)
|
||||
|
||||
func ExampleMergeFinder_FindMirrors() {
|
||||
const mainRepo = "github.com/Azure/mirrorcat"
|
||||
const secondaryRepo = "github.com/marstr/mirrorcat"
|
||||
|
||||
child1, child2 := mirrorcat.NewDefaultMirrorFinder(), mirrorcat.NewDefaultMirrorFinder()
|
||||
|
||||
orig := mirrorcat.RemoteRef{Repository: mainRepo, Ref: "master"}
|
||||
|
||||
child1.AddMirrors(orig, mirrorcat.RemoteRef{Repository: secondaryRepo, Ref: "master"})
|
||||
child2.AddMirrors(orig, mirrorcat.RemoteRef{Repository: mainRepo, Ref: "dev"})
|
||||
|
||||
subject := mirrorcat.MergeFinder([]mirrorcat.MirrorFinder{child1, child2})
|
||||
|
||||
results := make(chan mirrorcat.RemoteRef)
|
||||
|
||||
var err error
|
||||
go func() {
|
||||
err = subject.FindMirrors(context.Background(), orig, results)
|
||||
}()
|
||||
|
||||
for result := range results {
|
||||
fmt.Printf("%s:%s\n", result.Repository, result.Ref)
|
||||
}
|
||||
fmt.Println(err)
|
||||
|
||||
// Output:
|
||||
// github.com/marstr/mirrorcat:master
|
||||
// github.com/Azure/mirrorcat:dev
|
||||
// <nil>
|
||||
}
|
||||
|
||||
func TestMergeFinder_FindMirrors_RespectsCancel(t *testing.T) {
|
||||
const mainRepo = "github.com/Azure/mirrorcat"
|
||||
const secondaryRepo = "github.com/marstr/mirrorcat"
|
||||
|
||||
child1, child2 := mirrorcat.NewDefaultMirrorFinder(), mirrorcat.NewDefaultMirrorFinder()
|
||||
|
||||
orig := mirrorcat.RemoteRef{Repository: mainRepo, Ref: "master"}
|
||||
|
||||
child1.AddMirrors(orig, mirrorcat.RemoteRef{Repository: secondaryRepo, Ref: "master"})
|
||||
child2.AddMirrors(orig, mirrorcat.RemoteRef{Repository: mainRepo, Ref: "dev"})
|
||||
|
||||
subject := mirrorcat.MergeFinder([]mirrorcat.MirrorFinder{child1, child2})
|
||||
|
||||
outside, cancelOutside := context.WithTimeout(context.Background(), time.Second*3)
|
||||
defer cancelOutside()
|
||||
|
||||
inside, cancelInside := context.WithCancel(outside)
|
||||
|
||||
results := make(chan mirrorcat.RemoteRef)
|
||||
|
||||
errs := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
errs <- subject.FindMirrors(inside, orig, results)
|
||||
t.Log("Finished FindMirrors routine")
|
||||
}()
|
||||
|
||||
cancelInside()
|
||||
|
||||
select {
|
||||
case err := <-errs:
|
||||
t.Log("error received: ", err)
|
||||
if err == nil || !strings.Contains(err.Error(), "cancel") {
|
||||
t.Log("expected error to be a cancellation message")
|
||||
t.Fail()
|
||||
}
|
||||
case <-outside.Done():
|
||||
t.Errorf("timed out")
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package mirrorcat_test
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -31,7 +32,9 @@ func ExampleDefaultMirrorFinder() {
|
|||
|
||||
results := make(chan mirrorcat.RemoteRef)
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
go subject.FindMirrors(ctx, original, results)
|
||||
|
||||
loop:
|
||||
|
@ -70,30 +73,30 @@ func TestDefaultMirrorFinder_FindMirrors_RespectsCancellation(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
outer, cancelOuter := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancelOuter()
|
||||
|
||||
subject := mirrorcat.NewDefaultMirrorFinder()
|
||||
subject.AddMirrors(original, mirrors...)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
inner, cancelInner := context.WithCancel(outer)
|
||||
|
||||
results := make(chan mirrorcat.RemoteRef)
|
||||
go subject.FindMirrors(ctx, original, results)
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
errs <- subject.FindMirrors(inner, original, results)
|
||||
}()
|
||||
|
||||
cancel()
|
||||
|
||||
// There's a race-condition imposed here, because FindMirrors will race the cancel()
|
||||
// function to see which gets to the read first. By waiting for 20 milliseconds, we can
|
||||
// pretty well ensure that cancel() wins the race. (cancel needs to complete before
|
||||
// the read/write handshake happens on results between this function and FindMirrors)
|
||||
<-time.After(20 * time.Millisecond)
|
||||
cancelInner()
|
||||
|
||||
select {
|
||||
case _, ok := <-results:
|
||||
if ok {
|
||||
t.Logf("Able to read from results, and the channel was not closed.")
|
||||
case err := <-errs:
|
||||
t.Log("error received: ", err)
|
||||
if err == nil || !strings.Contains(err.Error(), "cancel") {
|
||||
t.Log("expected error to be a cancellation message")
|
||||
t.Fail()
|
||||
}
|
||||
default:
|
||||
t.Logf("Unable to read from results, it was not closed.")
|
||||
t.Fail()
|
||||
case <-outer.Done():
|
||||
t.Error("timed out")
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче