From a0641e565b9a89d60eb22e05ad1d2d30cd8f9f9b Mon Sep 17 00:00:00 2001 From: David Justice Date: Tue, 3 Jul 2018 14:53:36 -0700 Subject: [PATCH] fix inconsistencies when provisioning hubs --- Gopkg.lock | 8 ++-- Gopkg.toml | 2 +- eph/eph.go | 8 +--- eph/eph_test.go | 27 ++++++------- eph/scheduler.go | 14 +++++++ hub_test.go | 25 ++++++------ internal/test/suite.go | 87 +++++++++++++++++++++++++++-------------- storage/eph_test.go | 9 ++++- storage/storage_test.go | 4 +- 9 files changed, 114 insertions(+), 70 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index b0cceea..c24a9bd 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -34,8 +34,8 @@ "services/storage/mgmt/2017-10-01/storage", "version" ] - revision = "4650843026a7fdec254a8d9cf893693a254edd0b" - version = "v16.2.1" + revision = "7971189ecf5a584b9211f2527737f94bb979644e" + version = "v17.4.0" [[projects]] name = "github.com/Azure/azure-storage-blob-go" @@ -168,7 +168,7 @@ branch = "master" name = "golang.org/x/net" packages = ["context"] - revision = "d1d521f6884855bc0e59c3d011574bd0678f18bc" + revision = "87b3feba568e144938625fc5d80ec92566c1a8fe" [[projects]] branch = "master" @@ -191,6 +191,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "4c9f24ae0ab33d7421d8e0f90ab245dfc0ba58989c0c9900395784d4c98c4a12" + inputs-digest = "c6c460be8f334b53e45118ef4da364f6c2f5b3c204662c466766d885074842a6" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index a69ceab..06c50ca 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -4,7 +4,7 @@ [[constraint]] name = "github.com/Azure/azure-sdk-for-go" - version = "16" + version = "17" [[constraint]] name = "github.com/Azure/azure-amqp-common-go" diff --git a/eph/eph.go b/eph/eph.go index 54b1922..d27b3a2 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -253,13 +253,7 @@ func (h *EventProcessorHost) GetPartitionIDs() []string { // PartitionIDsBeingProcessed returns the partition IDs currently receiving messages func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string { - ids := make([]string, len(h.scheduler.receivers)) - count := 0 - for key := range h.scheduler.receivers { - ids[count] = key - count++ - } - return ids + return h.scheduler.getPartitionIDsBeingProcessed() } // Close stops the EventHostProcessor from processing messages diff --git a/eph/eph_test.go b/eph/eph_test.go index c9e53d4..14319f6 100644 --- a/eph/eph_test.go +++ b/eph/eph_test.go @@ -53,25 +53,20 @@ func TestEventProcessorHost(t *testing.T) { } func (s *testSuite) TestSingle() { - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() - - hub, del := s.RandomHub() - defer del() + hub, del, err := s.RandomHub() + s.Require().NoError(err) processor, err := s.newInMemoryEPH(*hub.Name) - if err != nil { - s.T().Fatal(err) - } + s.Require().NoError(err) messages, err := s.sendMessages(*hub.Name, 10) - if err != nil { - s.T().Fatal(err) - } + s.Require().NoError(err) var wg sync.WaitGroup wg.Add(len(messages)) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error { wg.Done() return nil @@ -82,6 +77,7 @@ func (s *testSuite) TestSingle() { closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) processor.Close(closeContext) cancel() + del() }() end, _ := ctx.Deadline() @@ -89,14 +85,17 @@ func (s *testSuite) TestSingle() { } func (s *testSuite) TestMultiple() { - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() + hub, del, err := s.RandomHub() + s.Require().NoError(err) + defer del() - hub, del := s.RandomHub() numPartitions := len(*hub.PartitionIds) sharedStore := new(sharedStore) processors := make(map[string]*EventProcessorHost, numPartitions) processorNames := make([]string, numPartitions) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() for i := 0; i < numPartitions; i++ { processor, err := s.newInMemoryEPHWithOptions(*hub.Name, sharedStore) if err != nil { diff --git a/eph/scheduler.go b/eph/scheduler.go index 4d95df5..ee81af2 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -184,6 +184,20 @@ func (s *scheduler) Stop(ctx context.Context) error { return lastErr } +func (s *scheduler) getPartitionIDsBeingProcessed() []string { + s.receiverMu.Lock() + defer s.receiverMu.Unlock() + + ids := make([]string, len(s.receivers)) + count := 0 + for id := range s.receivers { + ids[count] = id + count++ + } + + return ids +} + func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error { s.receiverMu.Lock() defer s.receiverMu.Unlock() diff --git a/hub_test.go b/hub_test.go index 5d73d9f..3a07ce1 100644 --- a/hub_test.go +++ b/hub_test.go @@ -42,6 +42,7 @@ import ( "github.com/Azure/azure-event-hubs-go/internal/test" "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -68,9 +69,9 @@ func (suite *eventHubSuite) TestNewHubWithNameAndEnvironment() { revert := suite.captureEnv() defer revert() os.Clearenv() - suite.NoError(os.Setenv("EVENTHUB_CONNECTION_STRING", connStr)) + require.NoError(suite.T(), os.Setenv("EVENTHUB_CONNECTION_STRING", connStr)) _, err := NewHubWithNamespaceNameAndEnvironment("hello", "world") - suite.NoError(err) + require.NoError(suite.T(), err) } func (suite *eventHubSuite) TestSasToken() { @@ -83,11 +84,9 @@ func (suite *eventHubSuite) TestSasToken() { for name, testFunc := range tests { setupTestTeardown := func(t *testing.T) { provider, err := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars()) - if !suite.NoError(err) { - suite.FailNow("unable to build SAS token from environment vars") - } - - hub, cleanup := suite.RandomHub() + suite.Require().NoError(err) + hub, cleanup, err := suite.RandomHub() + require.NoError(t, err) defer cleanup() client, closer := suite.newClientWithProvider(t, *hub.Name, provider) defer closer() @@ -109,7 +108,8 @@ func (suite *eventHubSuite) TestPartitioned() { for name, testFunc := range tests { setupTestTeardown := func(t *testing.T) { - hub, cleanup := suite.RandomHub() + hub, cleanup, err := suite.RandomHub() + require.NoError(t, err) defer cleanup() partitionID := (*hub.PartitionIds)[0] client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID)) @@ -193,7 +193,8 @@ func (suite *eventHubSuite) TestEpochReceivers() { for name, testFunc := range tests { setupTestTeardown := func(t *testing.T) { - hub, cleanup := suite.RandomHub() + hub, cleanup, err := suite.RandomHub() + require.NoError(t, err) defer cleanup() partitionID := (*hub.PartitionIds)[0] client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID)) @@ -263,7 +264,8 @@ func (suite *eventHubSuite) TestMultiPartition() { for name, testFunc := range tests { setupTestTeardown := func(t *testing.T) { - hub, cleanup := suite.RandomHub() + hub, cleanup, err := suite.RandomHub() + suite.Require().NoError(err) defer cleanup() client, closer := suite.newClient(t, *hub.Name) defer closer() @@ -369,7 +371,8 @@ func (suite *eventHubSuite) TestHubManagement() { for name, testFunc := range tests { setupTestTeardown := func(t *testing.T) { - hub, cleanup := suite.RandomHub() + hub, cleanup, err := suite.RandomHub() + require.NoError(t, err) defer cleanup() client, closer := suite.newClient(t, *hub.Name) defer closer() diff --git a/internal/test/suite.go b/internal/test/suite.go index 332d4d8..d262bf2 100644 --- a/internal/test/suite.go +++ b/internal/test/suite.go @@ -24,7 +24,6 @@ package test import ( "context" - "errors" "flag" "io" "math/rand" @@ -38,6 +37,7 @@ import ( rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" "github.com/Azure/go-autorest/autorest/azure" azauth "github.com/Azure/go-autorest/autorest/azure/auth" + "github.com/Azure/go-autorest/autorest/to" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" "github.com/uber/jaeger-client-go" @@ -83,19 +83,6 @@ func init() { rand.Seed(time.Now().Unix()) } -// HubWithPartitions configures an Event Hub to have a specific number of partitions. -// -// Must be between 1 and 32 -func HubWithPartitions(count int) HubMgmtOption { - return func(model *mgmt.Model) error { - if count < 1 || count > 32 { - return errors.New("count must be between 1 and 32") - } - model.PartitionCount = common.PtrInt64(int64(count)) - return nil - } -} - // SetupSuite constructs the test suite from the environment and func (suite *BaseSuite) SetupSuite() { flag.Parse() @@ -106,7 +93,7 @@ func (suite *BaseSuite) SetupSuite() { suite.SubscriptionID = mustGetEnv("AZURE_SUBSCRIPTION_ID") suite.Namespace = mustGetEnv("EVENTHUB_NAMESPACE") envName := os.Getenv("AZURE_ENVIRONMENT") - suite.TagID = RandomString("tag", 10) + suite.TagID = RandomString("tag", 5) if envName == "" { suite.Env = azure.PublicCloud @@ -140,19 +127,17 @@ func (suite *BaseSuite) TearDownSuite() { } // RandomHub creates a hub with a random'ish name -func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) +func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func(), error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout * 2) defer cancel() name := suite.RandomName("goehtest", 6) model, err := suite.ensureEventHub(ctx, name, opts...) - if !suite.NoError(err) { - suite.FailNow("couldn't build a random hub") - } - return model, func() { - suite.DeleteEventHub(*model.Name) - } + if err := suite.DeleteEventHub(*model.Name); err != nil { + suite.T().Log(err) + } + }, err } // EnsureEventHub creates an Event Hub if it doesn't exist @@ -175,17 +160,41 @@ func (suite *BaseSuite) ensureEventHub(ctx context.Context, name string, opts .. } } - hub, err = client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *newHub) - if err != nil { - return nil, err + var lastErr error + deadline, _ := ctx.Deadline() + for time.Now().Before(deadline) { + hub, err = suite.tryHubCreate(ctx, client, name, newHub) + if err == nil { + lastErr = nil + break + } + lastErr = err + } + + if lastErr != nil { + return nil, lastErr } } return &hub, nil } +func (suite *BaseSuite) tryHubCreate(ctx context.Context, client *mgmt.EventHubsClient, name string, hub *mgmt.Model) (mgmt.Model, error) { + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + + //suite.T().Logf("trying to create hub named %q", name) + createdHub, err := client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *hub) + if err != nil { + //suite.T().Logf("failed to create hub named %q", name) + return mgmt.Model{}, err + } + + return createdHub, err +} + // DeleteEventHub deletes an Event Hub within the given Namespace func (suite *BaseSuite) DeleteEventHub(name string) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() client := suite.getEventHubMgmtClient() _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, name) @@ -194,12 +203,26 @@ func (suite *BaseSuite) DeleteEventHub(name string) error { func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) { client := suite.getEventHubMgmtClient() - res, _ := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace) + res, err := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20)) + if err != nil { + suite.T().Log("error listing namespaces") + suite.T().Error(err) + } for res.NotDone() { for _, val := range res.Values() { - if strings.Contains(suite.TagID, *val.Name) { - client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name) + if strings.Contains(*val.Name, suite.TagID) { + for i := 0; i < 5; i++ { + if _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name); err != nil { + suite.T().Logf("error deleting %q", *val.Name) + suite.T().Error(err) + time.Sleep(3 * time.Second) + } else { + break + } + } + } else { + suite.T().Logf("%q does not contain %q", *val.Name, suite.TagID) } } res.Next() @@ -220,6 +243,10 @@ func (suite *BaseSuite) ensureProvisioned(tier mgmt.SkuTier) error { func ensureResourceGroup(ctx context.Context, subscriptionID, name, location string, env azure.Environment) (*rm.Group, error) { groupClient := getRmGroupClientWithToken(subscriptionID, env) group, err := groupClient.Get(ctx, name) + if group.Response.Response == nil { + // tcp dial error or something else where the response was not populated + return nil, err + } if group.StatusCode == http.StatusNotFound { group, err = groupClient.CreateOrUpdate(ctx, name, rm.Group{Location: common.PtrString(location)}) diff --git a/storage/eph_test.go b/storage/eph_test.go index 8db4616..89c52b2 100644 --- a/storage/eph_test.go +++ b/storage/eph_test.go @@ -38,6 +38,7 @@ import ( "github.com/Azure/azure-event-hubs-go/eph" "github.com/Azure/azure-event-hubs-go/internal/test" "github.com/Azure/azure-storage-blob-go/2016-05-31/azblob" + "github.com/stretchr/testify/require" ) const ( @@ -48,7 +49,10 @@ func (ts *testSuite) TestSingle() { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() - hub, delHub := ts.RandomHub() + hub, delHub, err := ts.RandomHub() + if !ts.NoError(err) { + ts.FailNow("could not build a hub") + } delContainer := ts.newTestContainerByName(*hub.Name) defer delContainer() @@ -85,7 +89,8 @@ func (ts *testSuite) TestMultiple() { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() - hub, delHub := ts.RandomHub() + hub, delHub, err := ts.RandomHub() + require.NoError(ts.T(), err) defer delHub() delContainer := ts.newTestContainerByName(*hub.Name) defer delContainer() diff --git a/storage/storage_test.go b/storage/storage_test.go index e621636..f983601 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -30,6 +30,7 @@ import ( "github.com/Azure/azure-event-hubs-go/eph" "github.com/Azure/azure-event-hubs-go/internal/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func (ts *testSuite) TestLeaserStoreCreation() { @@ -170,7 +171,8 @@ func (ts *testSuite) leaserWithEPHAndLeases() (*LeaserCheckpointer, func()) { func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) { leaser, del := ts.newLeaser() - hub, delHub := ts.RandomHub() + hub, delHub, err := ts.RandomHub() + require.NoError(ts.T(), err) delAll := func() { delHub() del()