Merge pull request #11 from devigned/opentracing

Opentracing support
This commit is contained in:
David Justice 2018-04-12 10:37:40 -07:00 коммит произвёл GitHub
Родитель 17f29fcc9c 5fce436f45
Коммит 1c6a1aad33
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 170 добавлений и 50 удалений

38
Gopkg.lock сгенерированный
Просмотреть файл

@ -9,8 +9,8 @@
"autorest/azure",
"autorest/date"
]
revision = "7909b98056dd6f6a9fc9b7745af1810c93c15939"
version = "v10.3.0"
revision = "9ad9326b278af8fa5cc67c30c0ce9a58cc0862b2"
version = "v10.6.0"
[[projects]]
name = "github.com/davecgh/go-spew"
@ -24,6 +24,16 @@
revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e"
version = "v3.2.0"
[[projects]]
name = "github.com/opentracing/opentracing-go"
packages = [
".",
"ext",
"log"
]
revision = "1949ddbfd147afd4d964a9f00b24eb291e0e7c38"
version = "v1.0.2"
[[projects]]
name = "github.com/pkg/errors"
packages = ["."]
@ -36,12 +46,6 @@
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
name = "github.com/sirupsen/logrus"
packages = ["."]
revision = "c155da19408a8799da419ed3eeb0cb5db0ad5dbc"
version = "v1.0.5"
[[projects]]
name = "github.com/stretchr/testify"
packages = ["assert"]
@ -53,19 +57,15 @@
name = "golang.org/x/crypto"
packages = [
"pkcs12",
"pkcs12/internal/rc2",
"ssh/terminal"
"pkcs12/internal/rc2"
]
revision = "80db560fac1fb3e6ac81dbc7f8ae4c061f5257bd"
revision = "d6449816ce06963d9d136eee5a56fca5b0616e7e"
[[projects]]
branch = "master"
name = "golang.org/x/sys"
packages = [
"unix",
"windows"
]
revision = "c488ab1dd8481ef762f96a79a9577c27825be697"
name = "golang.org/x/net"
packages = ["context"]
revision = "61147c48b25b599e5b561d2e9c4f3e1ef489ca41"
[[projects]]
branch = "master"
@ -74,11 +74,11 @@
".",
"internal/testconn"
]
revision = "fc71119dfd03ed44d0aba09806e4a7d1584b74b1"
revision = "ee6eb7ef6e2355b46a573b0b1799f76259f2ea47"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "7c743defd623795eff4a690aaf4d7651db81fae6e00d2e432b1ee18d132377c4"
inputs-digest = "6f11b6cba49c265cac76688eba9003e74961def4603f7dfab086f7a996032fd8"
solver-name = "gps-cdcl"
solver-version = 1

Просмотреть файл

@ -36,7 +36,6 @@ import (
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/pkcs12"
)
@ -153,7 +152,6 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr
// 1.Client Credentials
if c.ClientSecret != "" {
log.Debug("creating a token via a service principal client secret")
spToken, err := adal.NewServicePrincipalToken(*oauthConfig, c.ClientID, c.ClientSecret, c.ResourceURI)
if err != nil {
return nil, fmt.Errorf("failed to get oauth token from client credentials: %v", err)
@ -166,7 +164,6 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr
// 2. Client Certificate
if c.CertificatePath != "" {
log.Debug("creating a token via a service principal client certificate")
certData, err := ioutil.ReadFile(c.CertificatePath)
if err != nil {
return nil, fmt.Errorf("failed to read the certificate file (%s): %v", c.CertificatePath, err)
@ -186,7 +183,6 @@ func (c *TokenProviderConfiguration) NewServicePrincipalToken() (*adal.ServicePr
}
// 3. By default return MSI
log.Debug("creating a token via MSI")
msiEndpoint, err := adal.GetMSIVMEndpoint()
if err != nil {
return nil, err
@ -206,19 +202,15 @@ func (t *TokenProvider) GetToken(audience string) (*auth.Token, error) {
token := t.tokenProvider.Token()
expireTicks, err := strconv.ParseInt(token.ExpiresOn, 10, 64)
if err != nil {
log.Debugf("%v", token.AccessToken)
return nil, err
}
expires := time.Unix(expireTicks, 0)
if expires.Before(time.Now()) {
log.Debug("refreshing AAD token since it has expired")
if err := t.tokenProvider.Refresh(); err != nil {
log.Error("refreshing AAD token has failed")
return nil, err
}
token = t.tokenProvider.Token()
log.Debug("refreshing AAD token has succeeded")
}
return auth.NewToken(auth.CBSTokenTypeJWT, token.AccessToken, token.ExpiresOn), nil

Просмотреть файл

@ -26,11 +26,13 @@ package cbs
import (
"context"
"fmt"
"time"
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/internal/tracing"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/rpc"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
)
@ -45,18 +47,21 @@ const (
// NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.cbs.NegotiateClaim")
span.Finish()
link, err := rpc.NewLink(conn, cbsAddress)
if err != nil {
return err
}
defer link.Close()
defer link.Close(ctx)
token, err := provider.GetToken(audience)
if err != nil {
return err
}
log.Debugf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry)
log.For(ctx).Debug(fmt.Sprintf("negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry))
msg := &amqp.Message{
Value: token.Token,
ApplicationProperties: map[string]interface{}{
@ -69,10 +74,10 @@ func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, pro
res, err := link.RetryableRPC(ctx, 3, 1*time.Second, msg)
if err != nil {
log.Error(err)
log.For(ctx).Error(err)
return err
}
log.Debugf("negotiated with response code %d and message: %s", res.Code, res.Description)
log.For(ctx).Debug(fmt.Sprintf("negotiated with response code %d and message: %s", res.Code, res.Description))
return nil
}

Просмотреть файл

@ -2,6 +2,10 @@
## `master`
## `v0.3.0`
- add opentracing support
- upgrade amqp to pull in the changes where close accepts context (breaking change)
## `v0.2.4`
- connection string keys are case insensitive

Просмотреть файл

@ -0,0 +1,31 @@
package tracing
import (
"context"
"os"
"github.com/Azure/azure-amqp-common-go/internal"
"github.com/opentracing/opentracing-go"
tag "github.com/opentracing/opentracing-go/ext"
)
// StartSpanFromContext starts a span given a context and applies common library information
func StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...)
ApplyComponentInfo(span)
return span, ctx
}
// ApplyComponentInfo applies amqp common library and network info to the span
func ApplyComponentInfo(span opentracing.Span) {
tag.Component.Set(span, "github.com/Azure/azure-amqp-common-go")
span.SetTag("version", common.Version)
applyNetworkInfo(span)
}
func applyNetworkInfo(span opentracing.Span) {
hostname, err := os.Hostname()
if err == nil {
tag.PeerHostname.Set(span, hostname)
}
}

6
internal/version.go Normal file
Просмотреть файл

@ -0,0 +1,6 @@
package common
const (
// Version is the semantic version of the library
Version = "0.3.0"
)

63
log/logger.go Normal file
Просмотреть файл

@ -0,0 +1,63 @@
package log
import (
"context"
"github.com/opentracing/opentracing-go"
tag "github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
)
type (
// Logger is the interface for opentracing logging
Logger interface {
Info(msg string, fields ...log.Field)
Error(err error, fields ...log.Field)
Fatal(msg string, fields ...log.Field)
Debug(msg string, fields ...log.Field)
}
spanLogger struct {
span opentracing.Span
}
nopLogger struct{}
)
// For will return a logger for a given context
func For(ctx context.Context) Logger {
if span := opentracing.SpanFromContext(ctx); span != nil {
return &spanLogger{
span: span,
}
}
return new(nopLogger)
}
func (sl spanLogger) Info(msg string, fields ...log.Field) {
sl.logToSpan("info", msg, fields...)
}
func (sl spanLogger) Error(err error, fields ...log.Field) {
tag.Error.Set(sl.span, true)
sl.logToSpan("error", err.Error(), fields...)
}
func (sl spanLogger) Fatal(msg string, fields ...log.Field) {
tag.Error.Set(sl.span, true)
sl.logToSpan("fatal", msg, fields...)
}
func (sl spanLogger) Debug(msg string, fields ...log.Field) {
sl.logToSpan("debug", msg, fields...)
}
func (sl spanLogger) logToSpan(level string, msg string, fields ...log.Field) {
fields = append(fields, log.String("event", msg), log.String("level", level))
sl.span.LogFields(fields...)
}
func (sl nopLogger) Info(msg string, fields ...log.Field) {}
func (sl nopLogger) Error(err error, fields ...log.Field) {}
func (sl nopLogger) Fatal(msg string, fields ...log.Field) {}
func (sl nopLogger) Debug(msg string, fields ...log.Field) {}

Просмотреть файл

@ -31,9 +31,10 @@ import (
"time"
"github.com/Azure/azure-amqp-common-go"
"github.com/Azure/azure-amqp-common-go/internal/tracing"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
)
@ -102,24 +103,30 @@ func NewLink(conn *amqp.Client, address string) (*Link, error) {
// RetryableRPC attempts to retry a request a number of times with delay
func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*Response, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC")
span.Finish()
res, err := common.Retry(times, delay, func() (interface{}, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RetryableRPC.retry")
span.Finish()
res, err := l.RPC(ctx, msg)
if err != nil {
log.Debugf("error in RPC via link %s: %v", l.id, err)
log.For(ctx).Error(errors.New(fmt.Sprintf("error in RPC via link %s: %v", l.id, err)))
return nil, err
}
switch {
case res.Code >= 200 && res.Code < 300:
log.Debugf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description)
log.For(ctx).Debug(fmt.Sprintf("successful rpc on link %s: status code %d and description: %s", l.id, res.Code, res.Description))
return res, nil
case res.Code >= 500:
errMessage := fmt.Sprintf("server error link %s: status code %d and description: %s", l.id, res.Code, res.Description)
log.Debugln(errMessage)
log.For(ctx).Error(errors.New(errMessage))
return nil, common.Retryable(errMessage)
default:
errMessage := fmt.Sprintf("unhandled error link %s: status code %d and description: %s", l.id, res.Code, res.Description)
log.Debugln(errMessage)
log.For(ctx).Error(errors.New(errMessage))
return nil, common.Retryable(errMessage)
}
})
@ -134,6 +141,9 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
l.rpcMu.Lock()
defer l.rpcMu.Unlock()
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.RPC")
span.Finish()
if msg.Properties == nil {
msg.Properties = &amqp.MessageProperties{}
}
@ -167,38 +177,47 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
}
// Close the link receiver, sender and session
func (l *Link) Close() error {
if err := l.closeReceiver(); err != nil {
_ = l.closeSender()
_ = l.closeSession()
func (l *Link) Close(ctx context.Context) error {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.Close")
span.Finish()
if err := l.closeReceiver(ctx); err != nil {
_ = l.closeSender(ctx)
_ = l.closeSession(ctx)
return err
}
if err := l.closeSender(); err != nil {
_ = l.closeSession()
if err := l.closeSender(ctx); err != nil {
_ = l.closeSession(ctx)
return err
}
return l.closeSession()
return l.closeSession(ctx)
}
func (l *Link) closeReceiver() error {
func (l *Link) closeReceiver(ctx context.Context) error {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeReceiver")
span.Finish()
if l.receiver != nil {
return l.receiver.Close()
return l.receiver.Close(ctx)
}
return nil
}
func (l *Link) closeSender() error {
func (l *Link) closeSender(ctx context.Context) error {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSender")
span.Finish()
if l.sender != nil {
return l.sender.Close()
return l.sender.Close(ctx)
}
return nil
}
func (l *Link) closeSession() error {
func (l *Link) closeSession(ctx context.Context) error {
span, ctx := tracing.StartSpanFromContext(ctx, "az-amqp-common.rpc.closeSession")
span.Finish()
if l.session != nil {
return l.session.Close()
return l.session.Close(ctx)
}
return nil
}