diff --git a/.gitignore b/.gitignore index c805fda..8d18833 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,13 @@ vendor .idea .DS_Store + +.env + +# Test Infrastructure +terraform.tfvars +*.auto.tfvars +*.tfstate +*.tfstate.backup +.terraform/ +.terraform.tfstate.lock.info \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index f209193..e130578 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,6 +27,7 @@ jobs: - export GO111MODULE=on - make test-cover - goveralls -coverprofile=cover.out -service=travis-ci + - make destroy script: - export GO111MODULE=on - make diff --git a/Makefile b/Makefile index 952b2e1..771b305 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ GOSTATICCHECK = $(BIN)/staticcheck V = 0 Q = $(if $(filter 1,$V),,@) M = $(shell printf "\033[34;1m▶\033[0m") -TIMEOUT = 360 +TIMEOUT = 720 .PHONY: all all: fmt lint vet tidy build @@ -35,8 +35,8 @@ test-race: ARGS=-race ## Run tests with race detector test-cover: ARGS=-cover -coverprofile=cover.out -v ## Run tests in verbose mode with coverage $(TEST_TARGETS): NAME=$(MAKECMDGOALS:test-%=%) $(TEST_TARGETS): test -check test tests: cyclo lint vet ; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests - $Q $(GO) test -timeout $(TIMEOUT)s $(ARGS) ./... +check test tests: cyclo lint vet terraform.tfstate; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests + $(GO) test -timeout $(TIMEOUT)s $(ARGS) ./... .PHONY: vet vet: ; $(info $(M) running vet…) @ ## Run vet @@ -64,6 +64,17 @@ fmt: ; $(info $(M) running gofmt…) @ ## Run gofmt on all source files cyclo: ; $(info $(M) running gocyclo...) @ ## Run gocyclo on all source files $Q $(GOCYCLO) -over 19 $$($(GO_FILES)) +terraform.tfstate: azuredeploy.tf $(wildcard terraform.tfvars) .terraform ; $(info $(M) running terraform...) @ ## Run terraform to provision infrastructure needed for testing + $Q TF_VAR_azure_client_secret="$${AZURE_CLIENT_SECRET}" terraform apply -auto-approve + $Q terraform output > .env + +.terraform: + $Q terraform init + +.Phony: destroy +destroy-sb: ; $(info $(M) running terraform destroy of the eventhub namespace...) + $(Q) terraform destroy --auto-approve + # Misc .PHONY: clean diff --git a/_examples/helloworld/producer/main.go b/_examples/helloworld/producer/main.go index cc0c1da..3de2edc 100644 --- a/_examples/helloworld/producer/main.go +++ b/_examples/helloworld/producer/main.go @@ -8,8 +8,8 @@ import ( "os" "time" - "github.com/Azure/azure-event-hubs-go" "github.com/Azure/azure-amqp-common-go/aad" + "github.com/Azure/azure-event-hubs-go" mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub" "github.com/Azure/go-autorest/autorest/azure" azauth "github.com/Azure/go-autorest/autorest/azure/auth" @@ -29,7 +29,10 @@ func main() { fmt.Print("Enter text: ") text, _ := reader.ReadString('\n') ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - hub.Send(ctx, eventhub.NewEventFromString(text)) + err := hub.Send(ctx, eventhub.NewEventFromString(text)) + if err != nil { + log.Fatal(err) + } if text == "exit\n" { break } @@ -95,4 +98,3 @@ func getEventHubMgmtClient() *mgmt.EventHubsClient { client.Authorizer = a return &client } - diff --git a/azuredeploy.tf b/azuredeploy.tf new file mode 100644 index 0000000..af7ecff --- /dev/null +++ b/azuredeploy.tf @@ -0,0 +1,163 @@ +variable "location" { + # eastus support AAD authentication, which at the time of writing this is in preview. + # see: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-role-based-access-control + description = "Azure datacenter to deploy to." + default = "eastus" +} + +variable "eventhub_name_prefix" { + description = "Input your unique Azure Service Bus Namespace name" + default = "azureehtests" +} + +variable "resource_group_name" { + description = "Resource group to provision test infrastructure in." + default = "eventhub-go-tests" +} + +variable "azure_client_secret" { + description = "(Optional) piped in from env var so .env will be updated if there is an existing client secret" + default = "foo" +} + +# Data resources used to get SubID and Tennant Info +data "azurerm_client_config" "current" {} + +resource "random_string" "name" { + keepers = { + # Generate a new id each time we switch to a new resource group + group_name = "${var.resource_group_name}" + } + + length = 8 + upper = false + special = false + number = false +} + +# Create resource group for all of the things +resource "azurerm_resource_group" "test" { + name = "${var.resource_group_name}" + location = "${var.location}" +} + +# Create an Event Hub namespace for testing +resource "azurerm_eventhub_namespace" "test" { + name = "${var.eventhub_name_prefix}-${random_string.name.result}" + location = "${azurerm_resource_group.test.location}" + resource_group_name = "${azurerm_resource_group.test.name}" + sku = "standard" +} + +resource "azurerm_storage_account" "test" { + name = "${var.eventhub_name_prefix}${random_string.name.result}" + resource_group_name = "${var.resource_group_name}" + location = "${azurerm_resource_group.test.location}" + account_replication_type = "LRS" + account_tier = "Standard" +} + +# Generate a random secret fo the service principal +resource "random_string" "secret" { + count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}" + length = 32 + upper = true + special = true + number = true +} + +// Application for AAD authentication +resource "azurerm_azuread_application" "test" { + count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}" + name = "eventhubstest" + homepage = "https://eventhubstest" + identifier_uris = ["https://eventhubstest"] + reply_urls = ["https://eventhubstest"] + available_to_other_tenants = false + oauth2_allow_implicit_flow = true +} + +# Create a service principal, which represents a linkage between the AAD application and the password +resource "azurerm_azuread_service_principal" "test" { + count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}" + application_id = "${azurerm_azuread_application.test.application_id}" +} + +# Create a new service principal password which will be the AZURE_CLIENT_SECRET env var +resource "azurerm_azuread_service_principal_password" "test" { + count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}" + service_principal_id = "${azurerm_azuread_service_principal.test.id}" + value = "${random_string.secret.result}" + end_date = "2030-01-01T01:02:03Z" +} + +# This provides the new AAD application the rights to managed, send and receive from the Event Hubs instance +resource "azurerm_role_assignment" "created_service_principal_eh" { + count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}" + scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}/providers/Microsoft.EventHub/namespaces/${azurerm_eventhub_namespace.test.name}" + role_definition_name = "Owner" + principal_id = "${azurerm_azuread_service_principal.test.id}" +} + +# This provides the existing AAD application the rights to managed, send and receive from the Event Hubs instance +resource "azurerm_role_assignment" "existing_service_principal_eh" { + count = "${data.azurerm_client_config.current.service_principal_object_id != "" ? 1 : 0}" + scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}/providers/Microsoft.EventHub/namespaces/${azurerm_eventhub_namespace.test.name}" + role_definition_name = "Owner" + principal_id = "${data.azurerm_client_config.current.service_principal_object_id}" +} + +# This provides the new AAD application the rights to managed the resource group +resource "azurerm_role_assignment" "created_service_principal_rg" { + count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}" + scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}" + role_definition_name = "Owner" + principal_id = "${azurerm_azuread_service_principal.test.id}" +} + +# This provides the existing AAD application the rights to managed the resource group +resource "azurerm_role_assignment" "existing_service_principal_rg" { + count = "${data.azurerm_client_config.current.service_principal_object_id != "" ? 1 : 0}" + scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}" + role_definition_name = "Owner" + principal_id = "${data.azurerm_client_config.current.service_principal_object_id}" +} + + +output "TEST_EVENTHUB_RESOURCE_GROUP" { + value = "${var.resource_group_name}" +} + +output "EVENTHUB_CONNECTION_STRING" { + value = "Endpoint=sb://${azurerm_eventhub_namespace.test.name}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${azurerm_eventhub_namespace.test.default_primary_key}" + sensitive = true +} + +output "EVENTHUB_NAMESPACE" { + value = "${azurerm_eventhub_namespace.test.name}" +} + +output "AZURE_SUBSCRIPTION_ID" { + value = "${data.azurerm_client_config.current.subscription_id}" +} + +output "TEST_EVENTHUB_LOCATION" { + value = "${var.location}" +} + +output "AZURE_TENANT_ID" { + value = "${data.azurerm_client_config.current.tenant_id}" +} + +output "AZURE_CLIENT_ID" { + value = "${azurerm_azuread_application.test.0.application_id == "" ? data.azurerm_client_config.current.client_id : azurerm_azuread_application.test.0.application_id}" +} + +output "AZURE_CLIENT_SECRET" { + value = "${azurerm_azuread_service_principal_password.test.0.value == "" ? var.azure_client_secret : azurerm_azuread_service_principal_password.test.0.value}" + sensitive = true +} + +output "STORAGE_ACCOUNT_NAME" { + value = "${azurerm_storage_account.test.name}" +} diff --git a/eph/eph.go b/eph/eph.go index fc4ef28..665d7ab 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -275,7 +275,9 @@ func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID delete(h.handlers, string(id)) if len(h.handlers) == 0 { - h.Close(ctx) + if err := h.Close(ctx); err != nil { + log.For(ctx).Error(err) + } } } diff --git a/go.mod b/go.mod index 9c75198..bba13c0 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/dimchansky/utfbom v1.0.0 // indirect github.com/fortytw2/leaktest v1.2.0 // indirect + github.com/joho/godotenv v1.3.0 github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 github.com/kr/pretty v0.1.0 // indirect github.com/mitchellh/go-homedir v1.0.0 // indirect @@ -21,5 +22,5 @@ require ( golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba // indirect google.golang.org/api v0.0.0-20181018171847-1ee037c97071 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect - pack.ag/amqp v0.7.4 + pack.ag/amqp v0.10.2 ) diff --git a/go.sum b/go.sum index c75779f..a3c56c0 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs= @@ -76,5 +78,5 @@ gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -pack.ag/amqp v0.7.4 h1:SKaYf2EzcgN2bN9GTMCNu+xn9aOxMPYNgeyqYvAEtnM= -pack.ag/amqp v0.7.4/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= +pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U= +pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= diff --git a/hub_test.go b/hub_test.go index f30e819..278d794 100644 --- a/hub_test.go +++ b/hub_test.go @@ -283,11 +283,11 @@ func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) { } func testSendTooBig(ctx context.Context, t *testing.T, client *Hub, _ string) { - data := make([]byte, 256*1024) + data := make([]byte, 2600*1024) _, _ = rand.Read(data) event := NewEvent(data) err := client.Send(ctx, event) - assert.Error(t, err) + assert.Error(t, err, "encoded message size exceeds max of 1048576") } func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) { @@ -448,9 +448,7 @@ func testMultiSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par } for idx, message := range messages { - if !assert.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) { - assert.FailNow(t, "unable to send message") - } + require.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) } for _, partitionID := range partitionIDs { @@ -562,14 +560,17 @@ func testHubPartitionRuntimeInformation(ctx context.Context, t *testing.T, clien } func TestEnvironmentalCreation(t *testing.T) { - os.Setenv("EVENTHUB_NAME", "foo") + require.NoError(t, os.Setenv("EVENTHUB_NAME", "foo")) _, err := NewHubFromEnvironment() assert.Nil(t, err) - os.Unsetenv("EVENTHUB_NAME") + require.NoError(t, os.Unsetenv("EVENTHUB_NAME")) } func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubOption) (*Hub, func()) { - provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&suite.Env)) + provider, err := aad.NewJWTProvider( + aad.JWTProviderWithEnvironmentVars(), + aad.JWTProviderWithAzureEnvironment(&suite.Env), + ) if !suite.NoError(err) { suite.FailNow("unable to make a new JWT provider") } diff --git a/internal/test/suite.go b/internal/test/suite.go index f421c6e..d5fc42b 100644 --- a/internal/test/suite.go +++ b/internal/test/suite.go @@ -39,6 +39,7 @@ import ( "github.com/Azure/go-autorest/autorest/azure" azauth "github.com/Azure/go-autorest/autorest/azure/auth" "github.com/Azure/go-autorest/autorest/to" + "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" "go.opencensus.io/exporter/jaeger" @@ -54,23 +55,17 @@ const ( defaultTimeout = 1 * time.Minute ) -const ( - // Location is the Azure geographic location the test suite will use for provisioning - Location = "eastus" - - // ResourceGroupName is the name of the resource group the test suite will use for provisioning - ResourceGroupName = "ehtest" -) - type ( // BaseSuite encapsulates a end to end test of Event Hubs with build up and tear down of all EH resources BaseSuite struct { suite.Suite - SubscriptionID string - Namespace string - Env azure.Environment - TagID string - closer io.Closer + SubscriptionID string + Namespace string + ResourceGroupName string + Location string + Env azure.Environment + TagID string + closer io.Closer } // HubMgmtOption represents an option for configuring an Event Hub. @@ -81,6 +76,7 @@ type ( func init() { rand.Seed(time.Now().Unix()) + loadEnv() } // SetupSuite constructs the test suite from the environment and @@ -90,8 +86,10 @@ func (suite *BaseSuite) SetupSuite() { log.SetLevel(log.DebugLevel) } - suite.SubscriptionID = mustGetEnv("AZURE_SUBSCRIPTION_ID") - suite.Namespace = mustGetEnv("EVENTHUB_NAMESPACE") + suite.SubscriptionID = MustGetEnv("AZURE_SUBSCRIPTION_ID") + suite.Namespace = MustGetEnv("EVENTHUB_NAMESPACE") + suite.ResourceGroupName = MustGetEnv("TEST_EVENTHUB_RESOURCE_GROUP") + suite.Location = MustGetEnv("TEST_EVENTHUB_LOCATION") envName := os.Getenv("AZURE_ENVIRONMENT") suite.TagID = RandomString("tag", 5) @@ -122,7 +120,7 @@ func (suite *BaseSuite) TearDownSuite() { defer cancel() suite.deleteAllTaggedEventHubs(ctx) if suite.closer != nil { - suite.closer.Close() + suite.NoError(suite.closer.Close()) } } @@ -139,10 +137,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) { suite.Require().Len(*model.PartitionIds, 4) return model, func() { if model != nil { - err := suite.DeleteEventHub(*model.Name) - if err != nil { - suite.T().Log(err) - } + suite.DeleteEventHub(*model.Name) } } } @@ -150,7 +145,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) { // EnsureEventHub creates an Event Hub if it doesn't exist func (suite *BaseSuite) ensureEventHub(ctx context.Context, name string, opts ...HubMgmtOption) (*mgmt.Model, error) { client := suite.getEventHubMgmtClient() - hub, err := client.Get(ctx, ResourceGroupName, suite.Namespace, name) + hub, err := client.Get(ctx, suite.ResourceGroupName, suite.Namespace, name) if err != nil { newHub := &mgmt.Model{ @@ -189,26 +184,26 @@ func (suite *BaseSuite) tryHubCreate(ctx context.Context, client *mgmt.EventHubs ctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() - _, err := client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *hub) + _, err := client.CreateOrUpdate(ctx, suite.ResourceGroupName, suite.Namespace, name, *hub) if err != nil { return mgmt.Model{}, err } - return client.Get(ctx, ResourceGroupName, suite.Namespace, name) + return client.Get(ctx, suite.ResourceGroupName, suite.Namespace, name) } // DeleteEventHub deletes an Event Hub within the given Namespace -func (suite *BaseSuite) DeleteEventHub(name string) error { +func (suite *BaseSuite) DeleteEventHub(name string) { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() client := suite.getEventHubMgmtClient() - _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, name) - return err + _, err := client.Delete(ctx, suite.ResourceGroupName, suite.Namespace, name) + suite.NoError(err) } func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) { client := suite.getEventHubMgmtClient() - res, err := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20)) + res, err := client.ListByNamespace(ctx, suite.ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20)) if err != nil { suite.T().Log("error listing namespaces") suite.T().Error(err) @@ -218,7 +213,7 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) { for _, val := range res.Values() { if strings.Contains(*val.Name, suite.TagID) { for i := 0; i < 5; i++ { - if _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name); err != nil { + if _, err := client.Delete(ctx, suite.ResourceGroupName, suite.Namespace, *val.Name); err != nil { suite.T().Logf("error deleting %q", *val.Name) suite.T().Error(err) time.Sleep(3 * time.Second) @@ -230,12 +225,12 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) { suite.T().Logf("%q does not contain %q", *val.Name, suite.TagID) } } - res.Next() + suite.NoError(res.Next()) } } func (suite *BaseSuite) ensureProvisioned(tier mgmt.SkuTier) error { - _, err := ensureResourceGroup(context.Background(), suite.SubscriptionID, ResourceGroupName, Location, suite.Env) + _, err := ensureResourceGroup(context.Background(), suite.SubscriptionID, suite.ResourceGroupName, suite.Location, suite.Env) if err != nil { return err } @@ -332,7 +327,7 @@ func (suite *BaseSuite) getEventHubMgmtClient() *mgmt.EventHubsClient { } func (suite *BaseSuite) ensureNamespace() (*mgmt.EHNamespace, error) { - ns, err := ensureNamespace(context.Background(), suite.SubscriptionID, ResourceGroupName, suite.Namespace, Location, suite.Env) + ns, err := ensureNamespace(context.Background(), suite.SubscriptionID, suite.ResourceGroupName, suite.Namespace, suite.Location, suite.Env) if err != nil { return nil, err } @@ -365,7 +360,9 @@ func (suite *BaseSuite) setupTracing() error { } exporter, err := jaeger.NewExporter(jaeger.Options{ AgentEndpoint: "localhost:6831", - ServiceName: "eh-trace", + Process: jaeger.Process{ + ServiceName: "eh-tests", + }, }) if err != nil { return err @@ -374,7 +371,8 @@ func (suite *BaseSuite) setupTracing() error { return nil } -func mustGetEnv(key string) string { +// MustGetEnv will panic or return the env var for a given string key +func MustGetEnv(key string) string { v := os.Getenv(key) if v == "" { panic("Env variable '" + key + "' required for integration tests.") @@ -395,3 +393,36 @@ func RandomString(prefix string, length int) string { } return prefix + string(b) } + +func loadEnv() { + lookForMe := []string{".env", "../.env", "../../.env"} + var reader io.ReadCloser + for _, env := range lookForMe { + r, err := os.Open(env) + if err == nil { + reader = r + break + } + } + + if reader == nil { + log.Fatalf("no .env files were found in %v", lookForMe) + } + + defer func() { + if err := reader.Close(); err != nil { + log.Fatal(err) + } + }() + + envMap, err := godotenv.Parse(reader) + if err != nil { + log.Fatal(err) + } + + for key, val := range envMap { + if err := os.Setenv(key, val); err != nil { + log.Fatal(err) + } + } +} diff --git a/sender.go b/sender.go index f8091f6..4e5fe3a 100644 --- a/sender.go +++ b/sender.go @@ -25,6 +25,7 @@ package eventhub import ( "context" "fmt" + "net" "time" "github.com/Azure/azure-amqp-common-go/log" @@ -144,7 +145,13 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return err } switch err.(type) { - case *amqp.Error, *amqp.DetachError: + case *amqp.Error, *amqp.DetachError, net.Error: + if netErr, ok := err.(net.Error); ok { + if !netErr.Temporary(){ + return netErr + } + } + duration := s.recoveryBackoff.Duration() log.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error()) time.Sleep(duration) @@ -202,6 +209,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error { } amqpSender, err := amqpSession.NewSender( + amqp.LinkReceiverSettle(amqp.ModeSecond), amqp.LinkTargetAddress(s.getAddress()), ) if err != nil { diff --git a/storage/credential_test.go b/storage/credential_test.go index 33f2a3a..a58d6de 100644 --- a/storage/credential_test.go +++ b/storage/credential_test.go @@ -24,18 +24,13 @@ package storage import ( "context" - "errors" "log" "net/url" "strings" "testing" - "github.com/Azure/azure-amqp-common-go" "github.com/Azure/azure-event-hubs-go/internal/test" - "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage" "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/Azure/go-autorest/autorest/azure" - azauth "github.com/Azure/go-autorest/autorest/azure/auth" "github.com/stretchr/testify/suite" ) @@ -54,23 +49,18 @@ func TestStorage(t *testing.T) { func (ts *testSuite) SetupSuite() { ts.BaseSuite.SetupSuite() - ts.AccountName = strings.ToLower(test.RandomString("ehtest", 6)) - ts.Require().NoError(ts.ensureStorageAccount()) + ts.AccountName = test.MustGetEnv("STORAGE_ACCOUNT_NAME") } func (ts *testSuite) TearDownSuite() { ts.BaseSuite.TearDownSuite() - err := ts.deleteStorageAccount() - if err != nil { - ts.T().Error(err) - } } func (ts *testSuite) TestCredential() { containerName := "foo" blobName := "bar" message := "Hello World!!" - tokenProvider, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) + tokenProvider, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) if err != nil { ts.T().Fatal(err) } @@ -84,7 +74,11 @@ func (ts *testSuite) TestCredential() { } containerURL := azblob.NewContainerURL(*fooURL, pipeline) - defer containerURL.Delete(ctx, azblob.ContainerAccessConditions{}) + defer func(){ + if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil { + log.Fatal(err, res) + } + }() _, err = containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) if err != nil { ts.T().Error(err) @@ -95,67 +89,4 @@ func (ts *testSuite) TestCredential() { if err != nil { ts.T().Error(err) } -} - -func (ts *testSuite) deleteStorageAccount() error { - ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - defer cancel() - - client := getStorageAccountMgmtClient(ts.SubscriptionID, ts.Env) - _, err := client.Delete(ctx, test.ResourceGroupName, ts.AccountName) - return err -} - -func (ts *testSuite) ensureStorageAccount() error { - ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - defer cancel() - - client := getStorageAccountMgmtClient(ts.SubscriptionID, ts.Env) - accounts, err := client.ListByResourceGroup(ctx, test.ResourceGroupName) - if err != nil { - return err - } - - if accounts.Response.Response == nil { - return errors.New("response is nil and error is not nil") - } - - if accounts.Response.Response != nil && accounts.StatusCode == 404 { - return errors.New("resource group does not exist") - } - - for _, account := range *accounts.Value { - if ts.AccountName == *account.Name { - // provisioned, so return - return nil - } - } - - res, err := client.Create(ctx, test.ResourceGroupName, ts.AccountName, storage.AccountCreateParameters{ - Sku: &storage.Sku{ - Name: storage.StandardLRS, - Tier: storage.Standard, - }, - Kind: storage.BlobStorage, - Location: common.PtrString(test.Location), - AccountPropertiesCreateParameters: &storage.AccountPropertiesCreateParameters{ - AccessTier: storage.Hot, - }, - }) - - if err != nil { - return err - } - - return res.WaitForCompletionRef(ctx, client.Client) -} - -func getStorageAccountMgmtClient(subscriptionID string, env azure.Environment) *storage.AccountsClient { - client := storage.NewAccountsClientWithBaseURI(env.ResourceManagerEndpoint, subscriptionID) - a, err := azauth.NewAuthorizerFromEnvironment() - if err != nil { - log.Fatal(err) - } - client.Authorizer = a - return &client -} +} \ No newline at end of file diff --git a/storage/eph_test.go b/storage/eph_test.go index 2af3c9d..ad5c706 100644 --- a/storage/eph_test.go +++ b/storage/eph_test.go @@ -35,7 +35,6 @@ import ( "github.com/Azure/azure-amqp-common-go/auth" "github.com/Azure/azure-event-hubs-go" "github.com/Azure/azure-event-hubs-go/eph" - "github.com/Azure/azure-event-hubs-go/internal/test" "github.com/Azure/azure-storage-blob-go/azblob" ) @@ -55,7 +54,9 @@ func (ts *testSuite) TestSingle() { ts.Require().NoError(err) defer func() { closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processor.Close(closeContext) + if err := processor.Close(closeContext); err != nil { + ts.Error(err) + } cancel() delHub() }() @@ -66,12 +67,13 @@ func (ts *testSuite) TestSingle() { var wg sync.WaitGroup wg.Add(len(messages)) - processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error { + _, err = processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error { wg.Done() return nil }) + ts.Require().NoError(err) - processor.StartNonBlocking(ctx) + ts.Require().NoError(processor.StartNonBlocking(ctx)) end, _ := ctx.Deadline() waitUntil(ts.T(), &wg, time.Until(end)) } @@ -85,7 +87,7 @@ func (ts *testSuite) TestMultiple() { delContainer := ts.newTestContainerByName(*hub.Name) defer delContainer() - cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, *hub.Name, AADSASCredentialWithEnvironmentVars()) + cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, *hub.Name, AADSASCredentialWithEnvironmentVars()) ts.Require().NoError(err) numPartitions := len(*hub.PartitionIds) processors := make(map[string]*eph.EventProcessorHost, numPartitions) @@ -105,7 +107,9 @@ func (ts *testSuite) TestMultiple() { defer func() { for _, processor := range processors { closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processor.Close(closeContext) + if err := processor.Close(closeContext); err != nil { + ts.Error(err) + } cancel() } delHub() @@ -140,7 +144,7 @@ func (ts *testSuite) TestMultiple() { } closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) - processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition + ts.Require().NoError(processors[processorNames[numPartitions-1]].Close(closeContext)) // close the last partition delete(processors, processorNames[numPartitions-1]) cancel() @@ -175,7 +179,7 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) + cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) ts.Require().NoError(err) pipeline := azblob.NewPipeline(cred, azblob.PipelineOptions{}) @@ -189,7 +193,9 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() { return func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - containerURL.Delete(ctx, azblob.ContainerAccessConditions{}) + if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil { + ts.NoError(err, res) + } } } @@ -197,7 +203,7 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error) client := ts.newClient(ts.T(), hubName) defer func() { closeContext, cancel := context.WithTimeout(context.Background(), 30*time.Second) - client.Close(closeContext) + ts.NoError(client.Close(closeContext)) cancel() }() @@ -213,13 +219,13 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - client.SendBatch(ctx, eventhub.NewEventBatch(events)) + ts.NoError(client.SendBatch(ctx, eventhub.NewEventBatch(events))) return messages, ctx.Err() } func (ts *testSuite) newStorageBackedEPH(hubName, containerName string) (*eph.EventProcessorHost, error) { - cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) + cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) ts.Require().NoError(err) leaserCheckpointer, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env) ts.Require().NoError(err) diff --git a/storage/storage.go b/storage/storage.go index 8eadbe0..1913d85 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -29,6 +29,7 @@ import ( "context" "encoding/json" "errors" + "io/ioutil" "net/url" "sync" "time" @@ -47,18 +48,20 @@ import ( type ( // LeaserCheckpointer implements the eph.LeaserCheckpointer interface for Azure Storage LeaserCheckpointer struct { - leases map[string]*storageLease - processor *eph.EventProcessorHost - leaseDuration time.Duration - credential Credential - containerURL *azblob.ContainerURL - serviceURL *azblob.ServiceURL - containerName string - accountName string - env azure.Environment - dirtyPartitions map[string]uuid.UUID - leasesMu sync.Mutex - done func() + // LeasePersistenceInterval is the default period of time which dirty leases will be persisted to Azure Storage + LeasePersistenceInterval time.Duration + leases map[string]*storageLease + processor *eph.EventProcessorHost + leaseDuration time.Duration + credential Credential + containerURL *azblob.ContainerURL + serviceURL *azblob.ServiceURL + containerName string + accountName string + env azure.Environment + dirtyPartitions map[string]uuid.UUID + leasesMu sync.Mutex + done func() } storageLease struct { @@ -85,6 +88,10 @@ type ( } ) +const ( + defaultLeasePersistenceInterval = 5 * time.Second +) + // NewStorageLeaserCheckpointer builds an Azure Storage Leaser Checkpointer which handles leasing and checkpointing for // the EventProcessorHost func NewStorageLeaserCheckpointer(credential Credential, accountName, containerName string, env azure.Environment) (*LeaserCheckpointer, error) { @@ -97,15 +104,16 @@ func NewStorageLeaserCheckpointer(credential Credential, accountName, containerN containerURL := svURL.NewContainerURL(containerName) return &LeaserCheckpointer{ - credential: credential, - containerName: containerName, - accountName: accountName, - leaseDuration: eph.DefaultLeaseDuration, - env: env, - serviceURL: &svURL, - containerURL: &containerURL, - leases: make(map[string]*storageLease), - dirtyPartitions: make(map[string]uuid.UUID), + credential: credential, + containerName: containerName, + accountName: accountName, + leaseDuration: eph.DefaultLeaseDuration, + env: env, + serviceURL: &svURL, + containerURL: &containerURL, + leases: make(map[string]*storageLease), + dirtyPartitions: make(map[string]uuid.UUID), + LeasePersistenceInterval: defaultLeasePersistenceInterval, }, nil } @@ -477,7 +485,7 @@ func (sl *LeaserCheckpointer) persistLeases(ctx context.Context) { if err != nil { log.For(ctx).Error(err) } - <-time.After(1 * time.Second) + <-time.After(sl.LeasePersistenceInterval) } } } @@ -596,10 +604,13 @@ func (sl *LeaserCheckpointer) getLease(ctx context.Context, partitionID string) } func (sl *LeaserCheckpointer) leaseFromResponse(res *azblob.DownloadResponse) (*storageLease, error) { - buf := new(bytes.Buffer) - buf.ReadFrom(res.Response().Body) + b, err := ioutil.ReadAll(res.Response().Body) + if err != nil { + return nil, err + } + var lease storageLease - if err := json.Unmarshal(buf.Bytes(), &lease); err != nil { + if err := json.Unmarshal(b, &lease); err != nil { return nil, err } lease.leaser = sl diff --git a/storage/storage_test.go b/storage/storage_test.go index 49ad5a9..0647b3c 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -29,7 +29,6 @@ import ( "github.com/Azure/azure-amqp-common-go/aad" "github.com/Azure/azure-event-hubs-go/eph" - "github.com/Azure/azure-event-hubs-go/internal/test" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/stretchr/testify/assert" ) @@ -189,7 +188,7 @@ func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) { func (ts *testSuite) newLeaser() (*LeaserCheckpointer, func()) { containerName := strings.ToLower(ts.RandomName("stortest", 4)) - cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) + cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars()) ts.Require().NoError(err) leaser, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env) ts.Require().NoError(err)