simplify environmental construction by prefering SAS
This commit is contained in:
Родитель
2229485247
Коммит
7d33131d7f
|
@ -34,8 +34,8 @@
|
|||
"services/storage/mgmt/2017-10-01/storage",
|
||||
"version"
|
||||
]
|
||||
revision = "fad8443a79b0e755c18c3bec29a8d2bedab0b421"
|
||||
version = "v15.3.0"
|
||||
revision = "4650843026a7fdec254a8d9cf893693a254edd0b"
|
||||
version = "v16.2.1"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/Azure/azure-storage-blob-go"
|
||||
|
@ -191,6 +191,6 @@
|
|||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "24f9aedfb2f0a6e602f946d39cff1cb8900a337bedbc7ce18a3ca5b9334d2876"
|
||||
inputs-digest = "9e8d39c2948233a4f9dc40bdfcbee21ae301de912713e0f0c8c6644668aece5b"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
[[constraint]]
|
||||
name = "github.com/Azure/azure-sdk-for-go"
|
||||
version = "15"
|
||||
version = "16"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/Azure/azure-amqp-common-go"
|
||||
|
|
27
hub.go
27
hub.go
|
@ -115,7 +115,6 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
|
|||
// There are two sets of environment variables which can produce a SAS TokenProvider
|
||||
//
|
||||
// 1) Expected Environment Variables:
|
||||
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
|
||||
// - "EVENTHUB_KEY_NAME" the name of the Event Hub key
|
||||
// - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
|
||||
//
|
||||
|
@ -130,32 +129,24 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
|
|||
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
|
||||
// "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
|
||||
//
|
||||
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
|
||||
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP
|
||||
// and port. See: adal.GetMSIVMEndpoint()
|
||||
//
|
||||
//
|
||||
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
|
||||
func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error) {
|
||||
var provider auth.TokenProvider
|
||||
aadProvider, aadErr := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
|
||||
sasProvider, sasErr := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
|
||||
|
||||
if aadErr != nil && sasErr != nil {
|
||||
// both failed
|
||||
return nil, errors.Errorf("neither Azure Active Directory nor SAS token provider could be built - AAD error: %v, SAS error: %v", aadErr, sasErr)
|
||||
provider, sasErr := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
|
||||
if sasErr == nil {
|
||||
return NewHub(namespace, name, provider, opts...)
|
||||
}
|
||||
|
||||
if aadProvider != nil {
|
||||
provider = aadProvider
|
||||
} else {
|
||||
provider = sasProvider
|
||||
provider, aadErr := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
|
||||
if aadErr == nil {
|
||||
return NewHub(namespace, name, provider, opts...)
|
||||
}
|
||||
|
||||
h, err := NewHub(namespace, name, provider, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return h, nil
|
||||
return nil, errors.Errorf("neither Azure Active Directory nor SAS token provider could be built - AAD error: %v, SAS error: %v", aadErr, sasErr)
|
||||
}
|
||||
|
||||
// NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables
|
||||
|
|
44
hub_test.go
44
hub_test.go
|
@ -27,6 +27,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -50,10 +51,23 @@ var (
|
|||
defaultTimeout = 20 * time.Second
|
||||
)
|
||||
|
||||
const (
|
||||
connStr = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=secret;EntityPath=hubName"
|
||||
)
|
||||
|
||||
func TestEventHub(t *testing.T) {
|
||||
suite.Run(t, new(eventHubSuite))
|
||||
}
|
||||
|
||||
func (suite *eventHubSuite) TestNewHubWithNameAndEnvironment() {
|
||||
revert := suite.captureEnv()
|
||||
defer revert()
|
||||
os.Clearenv()
|
||||
suite.NoError(os.Setenv("EVENTHUB_CONNECTION_STRING", connStr))
|
||||
_, err := NewHubWithNamespaceNameAndEnvironment("hello", "world")
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *eventHubSuite) TestSasToken() {
|
||||
tests := map[string]func(*testing.T, *Hub, []string, string){
|
||||
"TestMultiSendAndReceive": testMultiSendAndReceive,
|
||||
|
@ -96,6 +110,10 @@ func (suite *eventHubSuite) TestPartitioned() {
|
|||
|
||||
for name, testFunc := range tests {
|
||||
setupTestTeardown := func(t *testing.T) {
|
||||
//for _, e := range os.Environ() {
|
||||
// pair := strings.Split(e, "=")
|
||||
// fmt.Println(pair[0])
|
||||
//}
|
||||
hubName := suite.RandomName("goehtest", 10)
|
||||
mgmtHub, err := suite.EnsureEventHub(context.Background(), hubName)
|
||||
if err != nil {
|
||||
|
@ -460,9 +478,7 @@ func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubO
|
|||
func (suite *eventHubSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...HubOption) *Hub {
|
||||
opts = append(opts, HubWithEnvironment(suite.Env))
|
||||
client, err := NewHub(suite.Namespace, hubName, provider, opts...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
suite.NoError(err)
|
||||
return client
|
||||
}
|
||||
|
||||
|
@ -485,3 +501,25 @@ func fmtDuration(d time.Duration) string {
|
|||
d = d.Round(time.Second) / time.Second
|
||||
return fmt.Sprintf("%d seconds", d)
|
||||
}
|
||||
|
||||
func restoreEnv(capture map[string]string) error {
|
||||
os.Clearenv()
|
||||
for key, value := range capture {
|
||||
err := os.Setenv(key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (suite *eventHubSuite) captureEnv() func() {
|
||||
capture := make(map[string]string)
|
||||
for _, pair := range os.Environ() {
|
||||
keyValue := strings.Split(pair, "=")
|
||||
capture[keyValue[0]] = strings.Join(keyValue[1:], "=")
|
||||
}
|
||||
return func() {
|
||||
suite.NoError(restoreEnv(capture))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,10 +174,7 @@ func (suite *BaseSuite) DeleteEventHub(ctx context.Context, name string) error {
|
|||
|
||||
func (suite *BaseSuite) deleteAllTaggedEventHubs(ctx context.Context) {
|
||||
client := suite.getEventHubMgmtClient()
|
||||
res, err := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace)
|
||||
if err != nil {
|
||||
suite.FailNow(err.Error())
|
||||
}
|
||||
res, _ := client.ListByNamespace(ctx, ResourceGroupName, suite.Namespace)
|
||||
|
||||
for res.NotDone() {
|
||||
for _, val := range res.Values() {
|
||||
|
@ -286,7 +283,7 @@ func (suite *BaseSuite) ensureNamespace() (*mgmt.EHNamespace, error) {
|
|||
}
|
||||
|
||||
func (suite *BaseSuite) setupTracing() error {
|
||||
if os.Getenv("CI") != "true" {
|
||||
if os.Getenv("TRACING") == "true" {
|
||||
// Sample configuration for testing. Use constant sampling to sample every trace
|
||||
// and enable LogSpan to log every span via configured Logger.
|
||||
cfg := config.Configuration{
|
||||
|
|
Загрузка…
Ссылка в новой задаче