From 7d4ce2b83d97d8636b6d34d4b7b51e412266aa9c Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 11 Apr 2018 12:34:45 -0700 Subject: [PATCH 1/3] add opentracing support --- Gopkg.lock | 40 +++++++++++------------ Gopkg.toml | 2 +- aad/jwt.go | 8 ----- cbs/cbs.go | 13 +++++--- changelog.md | 1 + internal/tracing/tracing.go | 31 ++++++++++++++++++ internal/version.go | 6 ++++ log/logger.go | 63 +++++++++++++++++++++++++++++++++++++ rpc/rpc.go | 20 +++++++++--- 9 files changed, 146 insertions(+), 38 deletions(-) create mode 100644 internal/tracing/tracing.go create mode 100644 internal/version.go create mode 100644 log/logger.go diff --git a/Gopkg.lock b/Gopkg.lock index 9e80de2..838632d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -9,8 +9,8 @@ "autorest/azure", "autorest/date" ] - revision = "7909b98056dd6f6a9fc9b7745af1810c93c15939" - version = "v10.3.0" + revision = "9ad9326b278af8fa5cc67c30c0ce9a58cc0862b2" + version = "v10.6.0" [[projects]] name = "github.com/davecgh/go-spew" @@ -24,6 +24,16 @@ revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" version = "v3.2.0" +[[projects]] + name = "github.com/opentracing/opentracing-go" + packages = [ + ".", + "ext", + "log" + ] + revision = "1949ddbfd147afd4d964a9f00b24eb291e0e7c38" + version = "v1.0.2" + [[projects]] name = "github.com/pkg/errors" packages = ["."] @@ -36,12 +46,6 @@ revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" -[[projects]] - name = "github.com/sirupsen/logrus" - packages = ["."] - revision = "c155da19408a8799da419ed3eeb0cb5db0ad5dbc" - version = "v1.0.5" - [[projects]] name = "github.com/stretchr/testify" packages = ["assert"] @@ -53,32 +57,28 @@ name = "golang.org/x/crypto" packages = [ "pkcs12", - "pkcs12/internal/rc2", - "ssh/terminal" + "pkcs12/internal/rc2" ] - revision = "80db560fac1fb3e6ac81dbc7f8ae4c061f5257bd" + revision = "d6449816ce06963d9d136eee5a56fca5b0616e7e" [[projects]] branch = "master" - name = "golang.org/x/sys" - packages = [ - "unix", - "windows" - ] - revision = "c488ab1dd8481ef762f96a79a9577c27825be697" + name = "golang.org/x/net" + packages = ["context"] + revision = "61147c48b25b599e5b561d2e9c4f3e1ef489ca41" [[projects]] - branch = "master" name = "pack.ag/amqp" packages = [ ".", "internal/testconn" ] - revision = "fc71119dfd03ed44d0aba09806e4a7d1584b74b1" + revision = "fba4c617049c3049d8bc7ba9879daca1a1a2044e" + version = "v0.4.1" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "7c743defd623795eff4a690aaf4d7651db81fae6e00d2e432b1ee18d132377c4" + inputs-digest = "eb76d991bf676d1c5f089f094605ff34de0bca4b2647832e2bbf84b2b2f43b76" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 0c55cfb..c3c689b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -4,4 +4,4 @@ [[constraint]] name = "pack.ag/amqp" - branch = "master" + version = "0.4.1" diff --git a/aad/jwt.go b/aad/jwt.go index 0b5013d..6dc3e81 100644 --- a/aad/jwt.go +++ b/aad/jwt.go @@ -36,7 +36,6 @@ import ( "github.com/Azure/azure-amqp-common-go/auth" "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" - log "github.com/sirupsen/logrus" "golang.org/x/crypto/pkcs12" ) @@ -153,7 +152,6 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr // 1.Client Credentials if c.ClientSecret != "" { - log.Debug("creating a token via a service principal client secret") spToken, err := adal.NewServicePrincipalToken(*oauthConfig, c.ClientID, c.ClientSecret, c.ResourceURI) if err != nil { return nil, fmt.Errorf("failed to get oauth token from client credentials: %v", err) @@ -166,7 +164,6 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr // 2. Client Certificate if c.CertificatePath != "" { - log.Debug("creating a token via a service principal client certificate") certData, err := ioutil.ReadFile(c.CertificatePath) if err != nil { return nil, fmt.Errorf("failed to read the certificate file (%s): %v", c.CertificatePath, err) @@ -186,7 +183,6 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr } // 3. By default return MSI - log.Debug("creating a token via MSI") msiEndpoint, err := adal.GetMSIVMEndpoint() if err != nil { return nil, err @@ -206,19 +202,15 @@ func (t *TokenProvider) GetToken(audience string) (*auth.Token, error) { token := t.tokenProvider.Token() expireTicks, err := strconv.ParseInt(token.ExpiresOn, 10, 64) if err != nil { - log.Debugf("%v", token.AccessToken) return nil, err } expires := time.Unix(expireTicks, 0) if expires.Before(time.Now()) { - log.Debug("refreshing AAD token since it has expired") if err := t.tokenProvider.Refresh(); err != nil { - log.Error("refreshing AAD token has failed") return nil, err } token = t.tokenProvider.Token() - log.Debug("refreshing AAD token has succeeded") } return auth.NewToken(auth.CBSTokenTypeJWT, token.AccessToken, token.ExpiresOn), nil diff --git a/cbs/cbs.go b/cbs/cbs.go index 0d81e1a..ac64b1e 100644 --- a/cbs/cbs.go +++ b/cbs/cbs.go @@ -26,11 +26,13 @@ package cbs import ( "context" + "fmt" "time" "github.com/Azure/azure-amqp-common-go/auth" + "github.com/Azure/azure-amqp-common-go/internal/tracing" + "github.com/Azure/azure-amqp-common-go/log" "github.com/Azure/azure-amqp-common-go/rpc" - log "github.com/sirupsen/logrus" "pack.ag/amqp" ) @@ -45,6 +47,9 @@ const ( // NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim") + span.Finish() + link, err := rpc.NewLink(conn, cbsAddress) if err != nil { return err @@ -56,7 +61,7 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro return err } - log.Debugf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry) + log.For(ctx).Debug(fmt.Sprintf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry)) msg := &amqp.Message{ Value: token.Token, ApplicationProperties: map[string]interface{}{ @@ -69,10 +74,10 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro res, err := link.RetryableRPC(ctx, 3, 1*time.Second, msg) if err != nil { - log.Error(err) + log.For(ctx).Error(err) return err } - log.Debugf("negotiated with response code %d and message: %s", res.Code, res.Description) + log.For(ctx).Debug(fmt.Sprintf("negotiated with response code %d and message: %s", res.Code, res.Description)) return nil } diff --git a/changelog.md b/changelog.md index b5fbef9..4047c5e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,7 @@ # Change Log ## `master` +- add opentracing support ## `v0.2.4` - connection string keys are case insensitive diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go new file mode 100644 index 0000000..4a730b9 --- /dev/null +++ b/internal/tracing/tracing.go @@ -0,0 +1,31 @@ +package tracing + +import ( + "context" + "os" + + "github.com/Azure/azure-amqp-common-go/internal" + "github.com/opentracing/opentracing-go" + tag "github.com/opentracing/opentracing-go/ext" +) + +// StartSpanFromContext starts a span given a context and applies common library information +func StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...) + ApplyComponentInfo(span) + return span, ctx +} + +// ApplyComponentInfo applies amqp common library and network info to the span +func ApplyComponentInfo(span opentracing.Span) { + tag.Component.Set(span, "github.com/Azure/azure-amqp-common-go") + span.SetTag("version", common.Version) + applyNetworkInfo(span) +} + +func applyNetworkInfo(span opentracing.Span) { + hostname, err := os.Hostname() + if err == nil { + tag.PeerHostname.Set(span, hostname) + } +} diff --git a/internal/version.go b/internal/version.go new file mode 100644 index 0000000..12beecc --- /dev/null +++ b/internal/version.go @@ -0,0 +1,6 @@ +package common + +const ( + // Version is the semantic version of the library + Version = "0.2.4" +) diff --git a/log/logger.go b/log/logger.go new file mode 100644 index 0000000..de4a958 --- /dev/null +++ b/log/logger.go @@ -0,0 +1,63 @@ +package log + +import ( + "context" + + "github.com/opentracing/opentracing-go" + tag "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" +) + +type ( + // Logger is the interface for opentracing logging + Logger interface { + Info(msg string, fields ...log.Field) + Error(err error, fields ...log.Field) + Fatal(msg string, fields ...log.Field) + Debug(msg string, fields ...log.Field) + } + + spanLogger struct { + span opentracing.Span + } + + nopLogger struct{} +) + +// For will return a logger for a given context +func For(ctx context.Context) Logger { + if span := opentracing.SpanFromContext(ctx); span != nil { + return &spanLogger{ + span: span, + } + } + return new(nopLogger) +} + +func (sl spanLogger) Info(msg string, fields ...log.Field) { + sl.logToSpan("info", msg, fields...) +} + +func (sl spanLogger) Error(err error, fields ...log.Field) { + tag.Error.Set(sl.span, true) + sl.logToSpan("error", err.Error(), fields...) +} + +func (sl spanLogger) Fatal(msg string, fields ...log.Field) { + tag.Error.Set(sl.span, true) + sl.logToSpan("fatal", msg, fields...) +} + +func (sl spanLogger) Debug(msg string, fields ...log.Field) { + sl.logToSpan("debug", msg, fields...) +} + +func (sl spanLogger) logToSpan(level string, msg string, fields ...log.Field) { + fields = append(fields, log.String("event", msg), log.String("level", level)) + sl.span.LogFields(fields...) +} + +func (sl nopLogger) Info(msg string, fields ...log.Field) {} +func (sl nopLogger) Error(err error, fields ...log.Field) {} +func (sl nopLogger) Fatal(msg string, fields ...log.Field) {} +func (sl nopLogger) Debug(msg string, fields ...log.Field) {} diff --git a/rpc/rpc.go b/rpc/rpc.go index d48411f..ee5e900 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -31,9 +31,10 @@ import ( "time" "github.com/Azure/azure-amqp-common-go" + "github.com/Azure/azure-amqp-common-go/internal/tracing" + "github.com/Azure/azure-amqp-common-go/log" "github.com/Azure/azure-amqp-common-go/uuid" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "pack.ag/amqp" ) @@ -102,24 +103,30 @@ func NewLink(conn *amqp.Client, address string) (*Link, error) { // RetryableRPC attempts to retry a request a number of times with delay func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*Response, error) { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC") + span.Finish() + res, err := common.Retry(times, delay, func() (interface{}, error) { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC.retry") + span.Finish() + res, err := l.RPC(ctx, msg) if err != nil { - log.Debugf("error in RPC via link %s: %v", l.id, err) + log.For(ctx).Error(errors.New(fmt.Sprintf("error in RPC via link %s: %v", l.id, err))) return nil, err } switch { case res.Code >= 200 && res.Code < 300: - log.Debugf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description) + log.For(ctx).Debug(fmt.Sprintf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description)) return res, nil case res.Code >= 500: errMessage := fmt.Sprintf("server error link %s: status code %d and description: %s", l.id, res.Code, res.Description) - log.Debugln(errMessage) + log.For(ctx).Error(errors.New(errMessage)) return nil, common.Retryable(errMessage) default: errMessage := fmt.Sprintf("unhandled error link %s: status code %d and description: %s", l.id, res.Code, res.Description) - log.Debugln(errMessage) + log.For(ctx).Error(errors.New(errMessage)) return nil, common.Retryable(errMessage) } }) @@ -134,6 +141,9 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) { l.rpcMu.Lock() defer l.rpcMu.Unlock() + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC") + span.Finish() + if msg.Properties == nil { msg.Properties = &amqp.MessageProperties{} } From 2f4136d617dfd6ad4928ddd28751002b7f2e9d43 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 12 Apr 2018 09:34:51 -0700 Subject: [PATCH 2/3] update amqp and add close context --- Gopkg.lock | 6 +++--- Gopkg.toml | 2 +- cbs/cbs.go | 2 +- changelog.md | 1 + rpc/rpc.go | 35 ++++++++++++++++++++++------------- 5 files changed, 28 insertions(+), 18 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 838632d..3f81646 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -68,17 +68,17 @@ revision = "61147c48b25b599e5b561d2e9c4f3e1ef489ca41" [[projects]] + branch = "master" name = "pack.ag/amqp" packages = [ ".", "internal/testconn" ] - revision = "fba4c617049c3049d8bc7ba9879daca1a1a2044e" - version = "v0.4.1" + revision = "ee6eb7ef6e2355b46a573b0b1799f76259f2ea47" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "eb76d991bf676d1c5f089f094605ff34de0bca4b2647832e2bbf84b2b2f43b76" + inputs-digest = "6f11b6cba49c265cac76688eba9003e74961def4603f7dfab086f7a996032fd8" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index c3c689b..0c55cfb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -4,4 +4,4 @@ [[constraint]] name = "pack.ag/amqp" - version = "0.4.1" + branch = "master" diff --git a/cbs/cbs.go b/cbs/cbs.go index ac64b1e..7f37eea 100644 --- a/cbs/cbs.go +++ b/cbs/cbs.go @@ -54,7 +54,7 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro if err != nil { return err } - defer link.Close() + defer link.Close(ctx) token, err := provider.GetToken(audience) if err != nil { diff --git a/changelog.md b/changelog.md index 4047c5e..7bde35c 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,7 @@ ## `master` - add opentracing support +- upgrade amqp to pull in the changes where close accepts context ## `v0.2.4` - connection string keys are case insensitive diff --git a/rpc/rpc.go b/rpc/rpc.go index ee5e900..9344282 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -177,38 +177,47 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) { } // Close the link receiver, sender and session -func (l *Link) Close() error { - if err := l.closeReceiver(); err != nil { - _ = l.closeSender() - _ = l.closeSession() +func (l *Link) Close(ctx context.Context) error { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.Close") + span.Finish() + + if err := l.closeReceiver(ctx); err != nil { + _ = l.closeSender(ctx) + _ = l.closeSession(ctx) return err } - if err := l.closeSender(); err != nil { - _ = l.closeSession() + if err := l.closeSender(ctx); err != nil { + _ = l.closeSession(ctx) return err } - return l.closeSession() + return l.closeSession(ctx) } -func (l *Link) closeReceiver() error { +func (l *Link) closeReceiver(ctx context.Context) error { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeReceiver") + span.Finish() if l.receiver != nil { - return l.receiver.Close() + return l.receiver.Close(ctx) } return nil } -func (l *Link) closeSender() error { +func (l *Link) closeSender(ctx context.Context) error { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSender") + span.Finish() if l.sender != nil { - return l.sender.Close() + return l.sender.Close(ctx) } return nil } -func (l *Link) closeSession() error { +func (l *Link) closeSession(ctx context.Context) error { + span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSession") + span.Finish() if l.session != nil { - return l.session.Close() + return l.session.Close(ctx) } return nil } From 5fce436f45e12e494deabcd4763f7aeec98d41f0 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 12 Apr 2018 10:24:25 -0700 Subject: [PATCH 3/3] update changelog and version --- changelog.md | 4 +++- internal/version.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/changelog.md b/changelog.md index 7bde35c..34b7469 100644 --- a/changelog.md +++ b/changelog.md @@ -1,8 +1,10 @@ # Change Log ## `master` + +## `v0.3.0` - add opentracing support -- upgrade amqp to pull in the changes where close accepts context +- upgrade amqp to pull in the changes where close accepts context (breaking change) ## `v0.2.4` - connection string keys are case insensitive diff --git a/internal/version.go b/internal/version.go index 12beecc..f288f2c 100644 --- a/internal/version.go +++ b/internal/version.go @@ -2,5 +2,5 @@ package common const ( // Version is the semantic version of the library - Version = "0.2.4" + Version = "0.3.0" )