move test requirements into terraform

This commit is contained in:
David Justice 2018-12-11 16:06:12 -08:00
Родитель a7f5b65899
Коммит eb0f70969a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
15 изменённых файлов: 347 добавлений и 168 удалений

10
.gitignore поставляемый
Просмотреть файл

@ -17,3 +17,13 @@ vendor
.idea
.DS_Store
.env
# Test Infrastructure
terraform.tfvars
*.auto.tfvars
*.tfstate
*.tfstate.backup
.terraform/
.terraform.tfstate.lock.info

Просмотреть файл

@ -27,6 +27,7 @@ jobs:
- export GO111MODULE=on
- make test-cover
- goveralls -coverprofile=cover.out -service=travis-ci
- make destroy
script:
- export GO111MODULE=on
- make

Просмотреть файл

@ -15,7 +15,7 @@ GOSTATICCHECK = $(BIN)/staticcheck
V = 0
Q = $(if $(filter 1,$V),,@)
M = $(shell printf "\033[34;1m▶\033[0m")
TIMEOUT = 360
TIMEOUT = 720
.PHONY: all
all: fmt lint vet tidy build
@ -35,8 +35,8 @@ test-race: ARGS=-race ## Run tests with race detector
test-cover: ARGS=-cover -coverprofile=cover.out -v ## Run tests in verbose mode with coverage
$(TEST_TARGETS): NAME=$(MAKECMDGOALS:test-%=%)
$(TEST_TARGETS): test
check test tests: cyclo lint vet ; $(info $(M) running $(NAME:%=% )tests) @ ## Run tests
$Q $(GO) test -timeout $(TIMEOUT)s $(ARGS) ./...
check test tests: cyclo lint vet terraform.tfstate; $(info $(M) running $(NAME:%=% )tests) @ ## Run tests
$(GO) test -timeout $(TIMEOUT)s $(ARGS) ./...
.PHONY: vet
vet: ; $(info $(M) running vet) @ ## Run vet
@ -64,6 +64,17 @@ fmt: ; $(info $(M) running gofmt…) @ ## Run gofmt on all source files
cyclo: ; $(info $(M) running gocyclo...) @ ## Run gocyclo on all source files
$Q $(GOCYCLO) -over 19 $$($(GO_FILES))
terraform.tfstate: azuredeploy.tf $(wildcard terraform.tfvars) .terraform ; $(info $(M) running terraform...) @ ## Run terraform to provision infrastructure needed for testing
$Q TF_VAR_azure_client_secret="$${AZURE_CLIENT_SECRET}" terraform apply -auto-approve
$Q terraform output > .env
.terraform:
$Q terraform init
.Phony: destroy
destroy-sb: ; $(info $(M) running terraform destroy of the eventhub namespace...)
$(Q) terraform destroy --auto-approve
# Misc
.PHONY: clean

Просмотреть файл

@ -8,8 +8,8 @@ import (
"os"
"time"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-amqp-common-go/aad"
"github.com/Azure/azure-event-hubs-go"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
@ -29,7 +29,10 @@ func main() {
fmt.Print("Enter text: ")
text, _ := reader.ReadString('\n')
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
hub.Send(ctx, eventhub.NewEventFromString(text))
err := hub.Send(ctx, eventhub.NewEventFromString(text))
if err != nil {
log.Fatal(err)
}
if text == "exit\n" {
break
}
@ -95,4 +98,3 @@ func getEventHubMgmtClient() *mgmt.EventHubsClient {
client.Authorizer = a
return &client
}

163
azuredeploy.tf Normal file
Просмотреть файл

@ -0,0 +1,163 @@
variable "location" {
# eastus support AAD authentication, which at the time of writing this is in preview.
# see: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-role-based-access-control
description = "Azure datacenter to deploy to."
default = "eastus"
}
variable "eventhub_name_prefix" {
description = "Input your unique Azure Service Bus Namespace name"
default = "azureehtests"
}
variable "resource_group_name" {
description = "Resource group to provision test infrastructure in."
default = "eventhub-go-tests"
}
variable "azure_client_secret" {
description = "(Optional) piped in from env var so .env will be updated if there is an existing client secret"
default = "foo"
}
# Data resources used to get SubID and Tennant Info
data "azurerm_client_config" "current" {}
resource "random_string" "name" {
keepers = {
# Generate a new id each time we switch to a new resource group
group_name = "${var.resource_group_name}"
}
length = 8
upper = false
special = false
number = false
}
# Create resource group for all of the things
resource "azurerm_resource_group" "test" {
name = "${var.resource_group_name}"
location = "${var.location}"
}
# Create an Event Hub namespace for testing
resource "azurerm_eventhub_namespace" "test" {
name = "${var.eventhub_name_prefix}-${random_string.name.result}"
location = "${azurerm_resource_group.test.location}"
resource_group_name = "${azurerm_resource_group.test.name}"
sku = "standard"
}
resource "azurerm_storage_account" "test" {
name = "${var.eventhub_name_prefix}${random_string.name.result}"
resource_group_name = "${var.resource_group_name}"
location = "${azurerm_resource_group.test.location}"
account_replication_type = "LRS"
account_tier = "Standard"
}
# Generate a random secret fo the service principal
resource "random_string" "secret" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
length = 32
upper = true
special = true
number = true
}
// Application for AAD authentication
resource "azurerm_azuread_application" "test" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
name = "eventhubstest"
homepage = "https://eventhubstest"
identifier_uris = ["https://eventhubstest"]
reply_urls = ["https://eventhubstest"]
available_to_other_tenants = false
oauth2_allow_implicit_flow = true
}
# Create a service principal, which represents a linkage between the AAD application and the password
resource "azurerm_azuread_service_principal" "test" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
application_id = "${azurerm_azuread_application.test.application_id}"
}
# Create a new service principal password which will be the AZURE_CLIENT_SECRET env var
resource "azurerm_azuread_service_principal_password" "test" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
service_principal_id = "${azurerm_azuread_service_principal.test.id}"
value = "${random_string.secret.result}"
end_date = "2030-01-01T01:02:03Z"
}
# This provides the new AAD application the rights to managed, send and receive from the Event Hubs instance
resource "azurerm_role_assignment" "created_service_principal_eh" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}/providers/Microsoft.EventHub/namespaces/${azurerm_eventhub_namespace.test.name}"
role_definition_name = "Owner"
principal_id = "${azurerm_azuread_service_principal.test.id}"
}
# This provides the existing AAD application the rights to managed, send and receive from the Event Hubs instance
resource "azurerm_role_assignment" "existing_service_principal_eh" {
count = "${data.azurerm_client_config.current.service_principal_object_id != "" ? 1 : 0}"
scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}/providers/Microsoft.EventHub/namespaces/${azurerm_eventhub_namespace.test.name}"
role_definition_name = "Owner"
principal_id = "${data.azurerm_client_config.current.service_principal_object_id}"
}
# This provides the new AAD application the rights to managed the resource group
resource "azurerm_role_assignment" "created_service_principal_rg" {
count = "${data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0}"
scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}"
role_definition_name = "Owner"
principal_id = "${azurerm_azuread_service_principal.test.id}"
}
# This provides the existing AAD application the rights to managed the resource group
resource "azurerm_role_assignment" "existing_service_principal_rg" {
count = "${data.azurerm_client_config.current.service_principal_object_id != "" ? 1 : 0}"
scope = "subscriptions/${data.azurerm_client_config.current.subscription_id}/resourceGroups/${azurerm_resource_group.test.name}"
role_definition_name = "Owner"
principal_id = "${data.azurerm_client_config.current.service_principal_object_id}"
}
output "TEST_EVENTHUB_RESOURCE_GROUP" {
value = "${var.resource_group_name}"
}
output "EVENTHUB_CONNECTION_STRING" {
value = "Endpoint=sb://${azurerm_eventhub_namespace.test.name}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${azurerm_eventhub_namespace.test.default_primary_key}"
sensitive = true
}
output "EVENTHUB_NAMESPACE" {
value = "${azurerm_eventhub_namespace.test.name}"
}
output "AZURE_SUBSCRIPTION_ID" {
value = "${data.azurerm_client_config.current.subscription_id}"
}
output "TEST_EVENTHUB_LOCATION" {
value = "${var.location}"
}
output "AZURE_TENANT_ID" {
value = "${data.azurerm_client_config.current.tenant_id}"
}
output "AZURE_CLIENT_ID" {
value = "${azurerm_azuread_application.test.0.application_id == "" ? data.azurerm_client_config.current.client_id : azurerm_azuread_application.test.0.application_id}"
}
output "AZURE_CLIENT_SECRET" {
value = "${azurerm_azuread_service_principal_password.test.0.value == "" ? var.azure_client_secret : azurerm_azuread_service_principal_password.test.0.value}"
sensitive = true
}
output "STORAGE_ACCOUNT_NAME" {
value = "${azurerm_storage_account.test.name}"
}

Просмотреть файл

@ -275,7 +275,9 @@ func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID
delete(h.handlers, string(id))
if len(h.handlers) == 0 {
h.Close(ctx)
if err := h.Close(ctx); err != nil {
log.For(ctx).Error(err)
}
}
}

3
go.mod
Просмотреть файл

@ -9,6 +9,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dimchansky/utfbom v1.0.0 // indirect
github.com/fortytw2/leaktest v1.2.0 // indirect
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/go-homedir v1.0.0 // indirect
@ -21,5 +22,5 @@ require (
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba // indirect
google.golang.org/api v0.0.0-20181018171847-1ee037c97071 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
pack.ag/amqp v0.7.4
pack.ag/amqp v0.10.2
)

6
go.sum
Просмотреть файл

@ -25,6 +25,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
@ -76,5 +78,5 @@ gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
pack.ag/amqp v0.7.4 h1:SKaYf2EzcgN2bN9GTMCNu+xn9aOxMPYNgeyqYvAEtnM=
pack.ag/amqp v0.7.4/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=

Просмотреть файл

@ -283,11 +283,11 @@ func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) {
}
func testSendTooBig(ctx context.Context, t *testing.T, client *Hub, _ string) {
data := make([]byte, 256*1024)
data := make([]byte, 2600*1024)
_, _ = rand.Read(data)
event := NewEvent(data)
err := client.Send(ctx, event)
assert.Error(t, err)
assert.Error(t, err, "encoded message size exceeds max of 1048576")
}
func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) {
@ -448,9 +448,7 @@ func testMultiSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
}
for idx, message := range messages {
if !assert.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx)))) {
assert.FailNow(t, "unable to send message")
}
require.NoError(t, client.Send(ctx, NewEventFromString(message), SendWithMessageID(fmt.Sprintf("%d", idx))))
}
for _, partitionID := range partitionIDs {
@ -562,14 +560,17 @@ func testHubPartitionRuntimeInformation(ctx context.Context, t *testing.T, clien
}
func TestEnvironmentalCreation(t *testing.T) {
os.Setenv("EVENTHUB_NAME", "foo")
require.NoError(t, os.Setenv("EVENTHUB_NAME", "foo"))
_, err := NewHubFromEnvironment()
assert.Nil(t, err)
os.Unsetenv("EVENTHUB_NAME")
require.NoError(t, os.Unsetenv("EVENTHUB_NAME"))
}
func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubOption) (*Hub, func()) {
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&suite.Env))
provider, err := aad.NewJWTProvider(
aad.JWTProviderWithEnvironmentVars(),
aad.JWTProviderWithAzureEnvironment(&suite.Env),
)
if !suite.NoError(err) {
suite.FailNow("unable to make a new JWT provider")
}

Просмотреть файл

@ -39,6 +39,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/joho/godotenv"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"go.opencensus.io/exporter/jaeger"
@ -54,23 +55,17 @@ const (
defaultTimeout = 1 * time.Minute
)
const (
// Location is the Azure geographic location the test suite will use for provisioning
Location = "eastus"
// ResourceGroupName is the name of the resource group the test suite will use for provisioning
ResourceGroupName = "ehtest"
)
type (
// BaseSuite encapsulates a end to end test of Event Hubs with build up and tear down of all EH resources
BaseSuite struct {
suite.Suite
SubscriptionID string
Namespace string
Env azure.Environment
TagID string
closer io.Closer
SubscriptionID string
Namespace string
ResourceGroupName string
Location string
Env azure.Environment
TagID string
closer io.Closer
}
// HubMgmtOption represents an option for configuring an Event Hub.
@ -81,6 +76,7 @@ type (
func init() {
rand.Seed(time.Now().Unix())
loadEnv()
}
// SetupSuite constructs the test suite from the environment and
@ -90,8 +86,10 @@ func (suite *BaseSuite) SetupSuite() {
log.SetLevel(log.DebugLevel)
}
suite.SubscriptionID = mustGetEnv("AZURE_SUBSCRIPTION_ID")
suite.Namespace = mustGetEnv("EVENTHUB_NAMESPACE")
suite.SubscriptionID = MustGetEnv("AZURE_SUBSCRIPTION_ID")
suite.Namespace = MustGetEnv("EVENTHUB_NAMESPACE")
suite.ResourceGroupName = MustGetEnv("TEST_EVENTHUB_RESOURCE_GROUP")
suite.Location = MustGetEnv("TEST_EVENTHUB_LOCATION")
envName := os.Getenv("AZURE_ENVIRONMENT")
suite.TagID = RandomString("tag", 5)
@ -122,7 +120,7 @@ func (suite *BaseSuite) TearDownSuite() {
defer cancel()
suite.deleteAllTaggedEventHubs(ctx)
if suite.closer != nil {
suite.closer.Close()
suite.NoError(suite.closer.Close())
}
}
@ -139,10 +137,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
suite.Require().Len(*model.PartitionIds, 4)
return model, func() {
if model != nil {
err := suite.DeleteEventHub(*model.Name)
if err != nil {
suite.T().Log(err)
}
suite.DeleteEventHub(*model.Name)
}
}
}
@ -150,7 +145,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
// EnsureEventHub creates an Event Hub if it doesn't exist
func (suite *BaseSuite) ensureEventHub(ctx context.Context, name string, opts ...HubMgmtOption) (*mgmt.Model, error) {
client := suite.getEventHubMgmtClient()
hub, err := client.Get(ctx, ResourceGroupName, suite.Namespace, name)
hub, err := client.Get(ctx, suite.ResourceGroupName, suite.Namespace, name)
if err != nil {
newHub := &mgmt.Model{
@ -189,26 +184,26 @@ func (suite *BaseSuite) tryHubCreate(ctx context.Context, client *mgmt.EventHubs
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
_, err := client.CreateOrUpdate(ctx, ResourceGroupName, suite.Namespace, name, *hub)
_, err := client.CreateOrUpdate(ctx, suite.ResourceGroupName, suite.Namespace, name, *hub)
if err != nil {
return mgmt.Model{}, err
}
return client.Get(ctx, ResourceGroupName, suite.Namespace, name)
return client.Get(ctx, suite.ResourceGroupName, suite.Namespace, name)
}
// DeleteEventHub deletes an Event Hub within the given Namespace
func (suite *BaseSuite) DeleteEventHub(name string) error {
func (suite *BaseSuite) DeleteEventHub(name string) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
client := suite.getEventHubMgmtClient()
_, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, name)
return err
_, err := client.Delete(ctx, suite.ResourceGroupName, suite.Namespace, name)
suite.NoError(err)
}
func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
client := suite.getEventHubMgmtClient()
res, err := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20))
res, err := client.ListByNamespace(ctx, suite.ResourceGroupName, suite.Namespace, to.Int32Ptr(0), to.Int32Ptr(20))
if err != nil {
suite.T().Log("error listing namespaces")
suite.T().Error(err)
@ -218,7 +213,7 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
for _, val := range res.Values() {
if strings.Contains(*val.Name, suite.TagID) {
for i := 0; i < 5; i++ {
if _, err := client.Delete(ctx, ResourceGroupName, suite.Namespace, *val.Name); err != nil {
if _, err := client.Delete(ctx, suite.ResourceGroupName, suite.Namespace, *val.Name); err != nil {
suite.T().Logf("error deleting %q", *val.Name)
suite.T().Error(err)
time.Sleep(3 * time.Second)
@ -230,12 +225,12 @@ func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
suite.T().Logf("%q does not contain %q", *val.Name, suite.TagID)
}
}
res.Next()
suite.NoError(res.Next())
}
}
func (suite *BaseSuite) ensureProvisioned(tier mgmt.SkuTier) error {
_, err := ensureResourceGroup(context.Background(), suite.SubscriptionID, ResourceGroupName, Location, suite.Env)
_, err := ensureResourceGroup(context.Background(), suite.SubscriptionID, suite.ResourceGroupName, suite.Location, suite.Env)
if err != nil {
return err
}
@ -332,7 +327,7 @@ func (suite *BaseSuite) getEventHubMgmtClient() *mgmt.EventHubsClient {
}
func (suite *BaseSuite) ensureNamespace() (*mgmt.EHNamespace, error) {
ns, err := ensureNamespace(context.Background(), suite.SubscriptionID, ResourceGroupName, suite.Namespace, Location, suite.Env)
ns, err := ensureNamespace(context.Background(), suite.SubscriptionID, suite.ResourceGroupName, suite.Namespace, suite.Location, suite.Env)
if err != nil {
return nil, err
}
@ -365,7 +360,9 @@ func (suite *BaseSuite) setupTracing() error {
}
exporter, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: "localhost:6831",
ServiceName: "eh-trace",
Process: jaeger.Process{
ServiceName: "eh-tests",
},
})
if err != nil {
return err
@ -374,7 +371,8 @@ func (suite *BaseSuite) setupTracing() error {
return nil
}
func mustGetEnv(key string) string {
// MustGetEnv will panic or return the env var for a given string key
func MustGetEnv(key string) string {
v := os.Getenv(key)
if v == "" {
panic("Env variable '" + key + "' required for integration tests.")
@ -395,3 +393,36 @@ func RandomString(prefix string, length int) string {
}
return prefix + string(b)
}
func loadEnv() {
lookForMe := []string{".env", "../.env", "../../.env"}
var reader io.ReadCloser
for _, env := range lookForMe {
r, err := os.Open(env)
if err == nil {
reader = r
break
}
}
if reader == nil {
log.Fatalf("no .env files were found in %v", lookForMe)
}
defer func() {
if err := reader.Close(); err != nil {
log.Fatal(err)
}
}()
envMap, err := godotenv.Parse(reader)
if err != nil {
log.Fatal(err)
}
for key, val := range envMap {
if err := os.Setenv(key, val); err != nil {
log.Fatal(err)
}
}
}

Просмотреть файл

@ -25,6 +25,7 @@ package eventhub
import (
"context"
"fmt"
"net"
"time"
"github.com/Azure/azure-amqp-common-go/log"
@ -144,7 +145,13 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
return err
}
switch err.(type) {
case *amqp.Error, *amqp.DetachError:
case *amqp.Error, *amqp.DetachError, net.Error:
if netErr, ok := err.(net.Error); ok {
if !netErr.Temporary(){
return netErr
}
}
duration := s.recoveryBackoff.Duration()
log.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error())
time.Sleep(duration)
@ -202,6 +209,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
}
amqpSender, err := amqpSession.NewSender(
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkTargetAddress(s.getAddress()),
)
if err != nil {

Просмотреть файл

@ -24,18 +24,13 @@ package storage
import (
"context"
"errors"
"log"
"net/url"
"strings"
"testing"
"github.com/Azure/azure-amqp-common-go"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2017-10-01/storage"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/stretchr/testify/suite"
)
@ -54,23 +49,18 @@ func TestStorage(t *testing.T) {
func (ts *testSuite) SetupSuite() {
ts.BaseSuite.SetupSuite()
ts.AccountName = strings.ToLower(test.RandomString("ehtest", 6))
ts.Require().NoError(ts.ensureStorageAccount())
ts.AccountName = test.MustGetEnv("STORAGE_ACCOUNT_NAME")
}
func (ts *testSuite) TearDownSuite() {
ts.BaseSuite.TearDownSuite()
err := ts.deleteStorageAccount()
if err != nil {
ts.T().Error(err)
}
}
func (ts *testSuite) TestCredential() {
containerName := "foo"
blobName := "bar"
message := "Hello World!!"
tokenProvider, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
tokenProvider, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
if err != nil {
ts.T().Fatal(err)
}
@ -84,7 +74,11 @@ func (ts *testSuite) TestCredential() {
}
containerURL := azblob.NewContainerURL(*fooURL, pipeline)
defer containerURL.Delete(ctx, azblob.ContainerAccessConditions{})
defer func(){
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
log.Fatal(err, res)
}
}()
_, err = containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
ts.T().Error(err)
@ -95,67 +89,4 @@ func (ts *testSuite) TestCredential() {
if err != nil {
ts.T().Error(err)
}
}
func (ts *testSuite) deleteStorageAccount() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
client := getStorageAccountMgmtClient(ts.SubscriptionID, ts.Env)
_, err := client.Delete(ctx, test.ResourceGroupName, ts.AccountName)
return err
}
func (ts *testSuite) ensureStorageAccount() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
client := getStorageAccountMgmtClient(ts.SubscriptionID, ts.Env)
accounts, err := client.ListByResourceGroup(ctx, test.ResourceGroupName)
if err != nil {
return err
}
if accounts.Response.Response == nil {
return errors.New("response is nil and error is not nil")
}
if accounts.Response.Response != nil && accounts.StatusCode == 404 {
return errors.New("resource group does not exist")
}
for _, account := range *accounts.Value {
if ts.AccountName == *account.Name {
// provisioned, so return
return nil
}
}
res, err := client.Create(ctx, test.ResourceGroupName, ts.AccountName, storage.AccountCreateParameters{
Sku: &storage.Sku{
Name: storage.StandardLRS,
Tier: storage.Standard,
},
Kind: storage.BlobStorage,
Location: common.PtrString(test.Location),
AccountPropertiesCreateParameters: &storage.AccountPropertiesCreateParameters{
AccessTier: storage.Hot,
},
})
if err != nil {
return err
}
return res.WaitForCompletionRef(ctx, client.Client)
}
func getStorageAccountMgmtClient(subscriptionID string, env azure.Environment) *storage.AccountsClient {
client := storage.NewAccountsClientWithBaseURI(env.ResourceManagerEndpoint, subscriptionID)
a, err := azauth.NewAuthorizerFromEnvironment()
if err != nil {
log.Fatal(err)
}
client.Authorizer = a
return &client
}
}

Просмотреть файл

@ -35,7 +35,6 @@ import (
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-storage-blob-go/azblob"
)
@ -55,7 +54,9 @@ func (ts *testSuite) TestSingle() {
ts.Require().NoError(err)
defer func() {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
if err := processor.Close(closeContext); err != nil {
ts.Error(err)
}
cancel()
delHub()
}()
@ -66,12 +67,13 @@ func (ts *testSuite) TestSingle() {
var wg sync.WaitGroup
wg.Add(len(messages))
processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
_, err = processor.RegisterHandler(ctx, func(c context.Context, event *eventhub.Event) error {
wg.Done()
return nil
})
ts.Require().NoError(err)
processor.StartNonBlocking(ctx)
ts.Require().NoError(processor.StartNonBlocking(ctx))
end, _ := ctx.Deadline()
waitUntil(ts.T(), &wg, time.Until(end))
}
@ -85,7 +87,7 @@ func (ts *testSuite) TestMultiple() {
delContainer := ts.newTestContainerByName(*hub.Name)
defer delContainer()
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, *hub.Name, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, *hub.Name, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
numPartitions := len(*hub.PartitionIds)
processors := make(map[string]*eph.EventProcessorHost, numPartitions)
@ -105,7 +107,9 @@ func (ts *testSuite) TestMultiple() {
defer func() {
for _, processor := range processors {
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processor.Close(closeContext)
if err := processor.Close(closeContext); err != nil {
ts.Error(err)
}
cancel()
}
delHub()
@ -140,7 +144,7 @@ func (ts *testSuite) TestMultiple() {
}
closeContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
processors[processorNames[numPartitions-1]].Close(closeContext) // close the last partition
ts.Require().NoError(processors[processorNames[numPartitions-1]].Close(closeContext)) // close the last partition
delete(processors, processorNames[numPartitions-1])
cancel()
@ -175,7 +179,7 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
pipeline := azblob.NewPipeline(cred, azblob.PipelineOptions{})
@ -189,7 +193,9 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() {
return func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
containerURL.Delete(ctx, azblob.ContainerAccessConditions{})
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
ts.NoError(err, res)
}
}
}
@ -197,7 +203,7 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error)
client := ts.newClient(ts.T(), hubName)
defer func() {
closeContext, cancel := context.WithTimeout(context.Background(), 30*time.Second)
client.Close(closeContext)
ts.NoError(client.Close(closeContext))
cancel()
}()
@ -213,13 +219,13 @@ func (ts *testSuite) sendMessages(hubName string, length int) ([]string, error)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client.SendBatch(ctx, eventhub.NewEventBatch(events))
ts.NoError(client.SendBatch(ctx, eventhub.NewEventBatch(events)))
return messages, ctx.Err()
}
func (ts *testSuite) newStorageBackedEPH(hubName, containerName string) (*eph.EventProcessorHost, error) {
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
leaserCheckpointer, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)
ts.Require().NoError(err)

Просмотреть файл

@ -29,6 +29,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/url"
"sync"
"time"
@ -47,18 +48,20 @@ import (
type (
// LeaserCheckpointer implements the eph.LeaserCheckpointer interface for Azure Storage
LeaserCheckpointer struct {
leases map[string]*storageLease
processor *eph.EventProcessorHost
leaseDuration time.Duration
credential Credential
containerURL *azblob.ContainerURL
serviceURL *azblob.ServiceURL
containerName string
accountName string
env azure.Environment
dirtyPartitions map[string]uuid.UUID
leasesMu sync.Mutex
done func()
// LeasePersistenceInterval is the default period of time which dirty leases will be persisted to Azure Storage
LeasePersistenceInterval time.Duration
leases map[string]*storageLease
processor *eph.EventProcessorHost
leaseDuration time.Duration
credential Credential
containerURL *azblob.ContainerURL
serviceURL *azblob.ServiceURL
containerName string
accountName string
env azure.Environment
dirtyPartitions map[string]uuid.UUID
leasesMu sync.Mutex
done func()
}
storageLease struct {
@ -85,6 +88,10 @@ type (
}
)
const (
defaultLeasePersistenceInterval = 5 * time.Second
)
// NewStorageLeaserCheckpointer builds an Azure Storage Leaser Checkpointer which handles leasing and checkpointing for
// the EventProcessorHost
func NewStorageLeaserCheckpointer(credential Credential, accountName, containerName string, env azure.Environment) (*LeaserCheckpointer, error) {
@ -97,15 +104,16 @@ func NewStorageLeaserCheckpointer(credential Credential, accountName, containerN
containerURL := svURL.NewContainerURL(containerName)
return &LeaserCheckpointer{
credential: credential,
containerName: containerName,
accountName: accountName,
leaseDuration: eph.DefaultLeaseDuration,
env: env,
serviceURL: &svURL,
containerURL: &containerURL,
leases: make(map[string]*storageLease),
dirtyPartitions: make(map[string]uuid.UUID),
credential: credential,
containerName: containerName,
accountName: accountName,
leaseDuration: eph.DefaultLeaseDuration,
env: env,
serviceURL: &svURL,
containerURL: &containerURL,
leases: make(map[string]*storageLease),
dirtyPartitions: make(map[string]uuid.UUID),
LeasePersistenceInterval: defaultLeasePersistenceInterval,
}, nil
}
@ -477,7 +485,7 @@ func (sl *LeaserCheckpointer) persistLeases(ctx context.Context) {
if err != nil {
log.For(ctx).Error(err)
}
<-time.After(1 * time.Second)
<-time.After(sl.LeasePersistenceInterval)
}
}
}
@ -596,10 +604,13 @@ func (sl *LeaserCheckpointer) getLease(ctx context.Context, partitionID string)
}
func (sl *LeaserCheckpointer) leaseFromResponse(res *azblob.DownloadResponse) (*storageLease, error) {
buf := new(bytes.Buffer)
buf.ReadFrom(res.Response().Body)
b, err := ioutil.ReadAll(res.Response().Body)
if err != nil {
return nil, err
}
var lease storageLease
if err := json.Unmarshal(buf.Bytes(), &lease); err != nil {
if err := json.Unmarshal(b, &lease); err != nil {
return nil, err
}
lease.leaser = sl

Просмотреть файл

@ -29,7 +29,6 @@ import (
"github.com/Azure/azure-amqp-common-go/aad"
"github.com/Azure/azure-event-hubs-go/eph"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/stretchr/testify/assert"
)
@ -189,7 +188,7 @@ func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) {
func (ts *testSuite) newLeaser() (*LeaserCheckpointer, func()) {
containerName := strings.ToLower(ts.RandomName("stortest", 4))
cred, err := NewAADSASCredential(ts.SubscriptionID, test.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
leaser, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)
ts.Require().NoError(err)