fix inconsistencies when provisioning hubs

This commit is contained in:
David Justice 2018-07-03 14:53:36 -07:00
Родитель edc24dc9e0
Коммит a0641e565b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
9 изменённых файлов: 114 добавлений и 70 удалений

8
Gopkg.lock сгенерированный
Просмотреть файл

@ -34,8 +34,8 @@
"services/storage/mgmt/2017-10-01/storage",
"version"
]
revision = "4650843026a7fdec254a8d9cf893693a254edd0b"
version = "v16.2.1"
revision = "7971189ecf5a584b9211f2527737f94bb979644e"
version = "v17.4.0"
[[projects]]
name = "github.com/Azure/azure-storage-blob-go"
@ -168,7 +168,7 @@
branch = "master"
name = "golang.org/x/net"
packages = ["context"]
revision = "d1d521f6884855bc0e59c3d011574bd0678f18bc"
revision = "87b3feba568e144938625fc5d80ec92566c1a8fe"
[[projects]]
branch = "master"
@ -191,6 +191,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "4c9f24ae0ab33d7421d8e0f90ab245dfc0ba58989c0c9900395784d4c98c4a12"
inputs-digest = "c6c460be8f334b53e45118ef4da364f6c2f5b3c204662c466766d885074842a6"
solver-name = "gps-cdcl"
solver-version = 1

Просмотреть файл

@ -4,7 +4,7 @@
[[constraint]]
name = "github.com/Azure/azure-sdk-for-go"
version = "16"
version = "17"
[[constraint]]
name = "github.com/Azure/azure-amqp-common-go"

Просмотреть файл

@ -253,13 +253,7 @@ func (h *EventProcessorHost) GetPartitionIDs() []string {
// PartitionIDsBeingProcessed returns the partition IDs currently receiving messages
func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string {
ids := make([]string, len(h.scheduler.receivers))
count := 0
for key := range h.scheduler.receivers {
ids[count] = key
count++
}
return ids
return h.scheduler.getPartitionIDsBeingProcessed()
}
// Close stops the EventHostProcessor from processing messages

Просмотреть файл

@ -53,25 +53,20 @@ func TestEventProcessorHost(t *testing.T) {
}
func (s *testSuite) TestSingle() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
hub, del := s.RandomHub()
defer del()
hub, del, err := s.RandomHub()
s.Require().NoError(err)
processor, err := s.newInMemoryEPH(*hub.Name)
if err != nil {
s.T().Fatal(err)
}
s.Require().NoError(err)
messages, err := s.sendMessages(*hub.Name, 10)
if err != nil {
s.T().Fatal(err)
}
s.Require().NoError(err)
var wg sync.WaitGroup
wg.Add(len(messages))
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
wg.Done()
return nil
@ -82,6 +77,7 @@ func (s *testSuite) TestSingle() {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
cancel()
del()
}()
end, _ := ctx.Deadline()
@ -89,14 +85,17 @@ func (s *testSuite) TestSingle() {
}
func (s *testSuite) TestMultiple() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
hub, del, err := s.RandomHub()
s.Require().NoError(err)
defer del()
hub, del := s.RandomHub()
numPartitions := len(*hub.PartitionIds)
sharedStore := new(sharedStore)
processors := make(map[string]*EventProcessorHost, numPartitions)
processorNames := make([]string, numPartitions)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
for i := 0; i < numPartitions; i++ {
processor, err := s.newInMemoryEPHWithOptions(*hub.Name, sharedStore)
if err != nil {

Просмотреть файл

@ -184,6 +184,20 @@ func (s *scheduler) Stop(ctx context.Context) error {
return lastErr
}
func (s *scheduler) getPartitionIDsBeingProcessed() []string {
s.receiverMu.Lock()
defer s.receiverMu.Unlock()
ids := make([]string, len(s.receivers))
count := 0
for id := range s.receivers {
ids[count] = id
count++
}
return ids
}
func (s *scheduler) startReceiver(ctx context.Context, lease LeaseMarker) error {
s.receiverMu.Lock()
defer s.receiverMu.Unlock()

Просмотреть файл

@ -42,6 +42,7 @@ import (
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@ -68,9 +69,9 @@ func (suite *eventHubSuite) TestNewHubWithNameAndEnvironment() {
revert := suite.captureEnv()
defer revert()
os.Clearenv()
suite.NoError(os.Setenv("EVENTHUB_CONNECTION_STRING", connStr))
require.NoError(suite.T(), os.Setenv("EVENTHUB_CONNECTION_STRING", connStr))
_, err := NewHubWithNamespaceNameAndEnvironment("hello", "world")
suite.NoError(err)
require.NoError(suite.T(), err)
}
func (suite *eventHubSuite) TestSasToken() {
@ -83,11 +84,9 @@ func (suite *eventHubSuite) TestSasToken() {
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
provider, err := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
if !suite.NoError(err) {
suite.FailNow("unable to build SAS token from environment vars")
}
hub, cleanup := suite.RandomHub()
suite.Require().NoError(err)
hub, cleanup, err := suite.RandomHub()
require.NoError(t, err)
defer cleanup()
client, closer := suite.newClientWithProvider(t, *hub.Name, provider)
defer closer()
@ -109,7 +108,8 @@ func (suite *eventHubSuite) TestPartitioned() {
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
hub, cleanup, err := suite.RandomHub()
require.NoError(t, err)
defer cleanup()
partitionID := (*hub.PartitionIds)[0]
client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID))
@ -193,7 +193,8 @@ func (suite *eventHubSuite) TestEpochReceivers() {
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
hub, cleanup, err := suite.RandomHub()
require.NoError(t, err)
defer cleanup()
partitionID := (*hub.PartitionIds)[0]
client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID))
@ -263,7 +264,8 @@ func (suite *eventHubSuite) TestMultiPartition() {
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
hub, cleanup, err := suite.RandomHub()
suite.Require().NoError(err)
defer cleanup()
client, closer := suite.newClient(t, *hub.Name)
defer closer()
@ -369,7 +371,8 @@ func (suite *eventHubSuite) TestHubManagement() {
for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
hub, cleanup, err := suite.RandomHub()
require.NoError(t, err)
defer cleanup()
client, closer := suite.newClient(t, *hub.Name)
defer closer()

Просмотреть файл

@ -24,7 +24,6 @@ package test
import (
"context"
"errors"
"flag"
"io"
"math/rand"
@ -38,6 +37,7 @@ import (
rm "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/uber/jaeger-client-go"
@ -83,19 +83,6 @@ func init() {
rand.Seed(time.Now().Unix())
}
// HubWithPartitions configures an Event Hub to have a specific number of partitions.
//
// Must be between 1 and 32
func HubWithPartitions(count int) HubMgmtOption {
return func(model *mgmt.Model) error {
if count < 1 || count > 32 {
return errors.New("count must be between 1 and 32")
}
model.PartitionCount = common.PtrInt64(int64(count))
return nil
}
}
// SetupSuite constructs the test suite from the environment and
func (suite *BaseSuite) SetupSuite() {
flag.Parse()
@ -106,7 +93,7 @@ func (suite *BaseSuite) SetupSuite() {
suite.SubscriptionID = mustGetEnv("AZURE_SUBSCRIPTION_ID")
suite.Namespace = mustGetEnv("EVENTHUB_NAMESPACE")
envName := os.Getenv("AZURE_ENVIRONMENT")
suite.TagID = RandomString("tag", 10)
suite.TagID = RandomString("tag", 5)
if envName == "" {
suite.Env = azure.PublicCloud
@ -140,19 +127,17 @@ func (suite *BaseSuite) TearDownSuite() {
}
// RandomHub creates a hub with a random'ish name
func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func(), error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout * 2)
defer cancel()
name := suite.RandomName("goehtest", 6)
model, err := suite.ensureEventHub(ctx, name, opts...)
if !suite.NoError(err) {
suite.FailNow("couldn't build a random hub")
}
return model, func() {
suite.DeleteEventHub(*model.Name)
if err := suite.DeleteEventHub(*model.Name); err != nil {
suite.T().Log(err)
}
}, err
}
// EnsureEventHub creates an Event Hub if it doesn't exist
@ -175,17 +160,41 @@ func (suite *BaseSuite) ensureEventHub(ctx context.Context, name string, opts ..
}
}
hub, err = client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *newHub)
if err != nil {
return nil, err
var lastErr error
deadline, _ := ctx.Deadline()
for time.Now().Before(deadline) {
hub, err = suite.tryHubCreate(ctx, client, name, newHub)
if err == nil {
lastErr = nil
break
}
lastErr = err
}
if lastErr != nil {
return nil, lastErr
}
}
return &hub, nil
}
func (suite *BaseSuite) tryHubCreate(ctx context.Context, client *mgmt.EventHubsClient, name string, hub *mgmt.Model) (mgmt.Model, error) {
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
//suite.T().Logf("trying to create hub named %q", name)
createdHub, err := client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *hub)
if err != nil {
//suite.T().Logf("failed to create hub named %q", name)
return mgmt.Model{}, err
}
return createdHub, err
}
// DeleteEventHub deletes an Event Hub within the given Namespace
func (suite *BaseSuite) DeleteEventHub(name string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
client := suite.getEventHubMgmtClient()
_, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, name)
@ -194,12 +203,26 @@ func (suite *BaseSuite) DeleteEventHub(name string) error {
func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
client := suite.getEventHubMgmtClient()
res, _ := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace)
res, err := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20))
if err != nil {
suite.T().Log("error listing namespaces")
suite.T().Error(err)
}
for res.NotDone() {
for _, val := range res.Values() {
if strings.Contains(suite.TagID, *val.Name) {
client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name)
if strings.Contains(*val.Name, suite.TagID) {
for i := 0; i < 5; i++ {
if _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name); err != nil {
suite.T().Logf("error deleting %q", *val.Name)
suite.T().Error(err)
time.Sleep(3 * time.Second)
} else {
break
}
}
} else {
suite.T().Logf("%q does not contain %q", *val.Name, suite.TagID)
}
}
res.Next()
@ -220,6 +243,10 @@ func (suite *BaseSuite) ensureProvisioned(tier mgmt.SkuTier) error {
func ensureResourceGroup(ctx context.Context, subscriptionID, name, location string, env azure.Environment) (*rm.Group, error) {
groupClient := getRmGroupClientWithToken(subscriptionID, env)
group, err := groupClient.Get(ctx, name)
if group.Response.Response == nil {
// tcp dial error or something else where the response was not populated
return nil, err
}
if group.StatusCode == http.StatusNotFound {
group, err = groupClient.CreateOrUpdate(ctx, name, rm.Group{Location: common.PtrString(location)})

Просмотреть файл

@ -38,6 +38,7 @@ import (
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
"github.com/stretchr/testify/require"
)
const (
@ -48,7 +49,10 @@ func (ts *testSuite) TestSingle() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
hub, delHub := ts.RandomHub()
hub, delHub, err := ts.RandomHub()
if !ts.NoError(err) {
ts.FailNow("could not build a hub")
}
delContainer := ts.newTestContainerByName(*hub.Name)
defer delContainer()
@ -85,7 +89,8 @@ func (ts *testSuite) TestMultiple() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
hub, delHub := ts.RandomHub()
hub, delHub, err := ts.RandomHub()
require.NoError(ts.T(), err)
defer delHub()
delContainer := ts.newTestContainerByName(*hub.Name)
defer delContainer()

Просмотреть файл

@ -30,6 +30,7 @@ import (
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func (ts *testSuite) TestLeaserStoreCreation() {
@ -170,7 +171,8 @@ func (ts *testSuite) leaserWithEPHAndLeases() (*LeaserCheckpointer, func()) {
func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) {
leaser, del := ts.newLeaser()
hub, delHub := ts.RandomHub()
hub, delHub, err := ts.RandomHub()
require.NoError(ts.T(), err)
delAll := func() {
delHub()
del()